mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Extend error handling to put_image, put_value_*, and put_batch methods
Co-Authored-By: christian@neon.tech <christian@neon.tech>
This commit is contained in:
@@ -483,12 +483,15 @@ impl DeltaLayerWriterInner {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
let val_ser = Value::ser(&val)
|
||||
.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
let (_, res) = self
|
||||
.put_value_bytes(
|
||||
key,
|
||||
lsn,
|
||||
Value::ser(&val)?.slice_len(),
|
||||
val_ser.slice_len(),
|
||||
val.will_init(),
|
||||
ctx,
|
||||
)
|
||||
@@ -503,25 +506,34 @@ impl DeltaLayerWriterInner {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
assert!(
|
||||
self.lsn_range.start <= lsn,
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
);
|
||||
if self.lsn_range.start > lsn {
|
||||
return (val, Err(DeltaLayerWriterError::Other(anyhow::anyhow!(
|
||||
"lsn_start={}, lsn={}",
|
||||
self.lsn_range.start,
|
||||
lsn
|
||||
))));
|
||||
}
|
||||
|
||||
// We don't want to use compression in delta layer creation
|
||||
let compression = ImageCompressionAlgorithm::Disabled;
|
||||
let (val, res) = self
|
||||
.blob_writer
|
||||
.write_blob_maybe_compressed(val, ctx, compression)
|
||||
.await;
|
||||
|
||||
let off = match res {
|
||||
Ok((off, _)) => off,
|
||||
Err(e) => return (val, Err(anyhow::anyhow!(e))),
|
||||
Err(e) => return (val, Err(match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled,
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err),
|
||||
},
|
||||
other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})),
|
||||
};
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
@@ -531,7 +543,7 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
self.num_keys += 1;
|
||||
|
||||
(val, res.map_err(|e| anyhow::anyhow!(e)))
|
||||
(val, res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))))
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
@@ -712,7 +724,7 @@ impl DeltaLayerWriter {
|
||||
lsn: Lsn,
|
||||
val: Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), DeltaLayerWriterError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
@@ -727,7 +739,7 @@ impl DeltaLayerWriter {
|
||||
val: FullSlice<Buf>,
|
||||
will_init: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, anyhow::Result<()>)
|
||||
) -> (FullSlice<Buf>, Result<(), DeltaLayerWriterError>)
|
||||
where
|
||||
Buf: IoBuf + Send,
|
||||
{
|
||||
|
||||
@@ -864,8 +864,11 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range")));
|
||||
}
|
||||
|
||||
let compression = self.conf.image_compression;
|
||||
let uncompressed_len = img.len() as u64;
|
||||
self.uncompressed_bytes += uncompressed_len;
|
||||
@@ -875,7 +878,14 @@ impl ImageLayerWriterInner {
|
||||
.write_blob_maybe_compressed(img.slice_len(), ctx, compression)
|
||||
.await;
|
||||
// TODO: re-use the buffer for `img` further upstack
|
||||
let (off, compression_info) = res?;
|
||||
let (off, compression_info) = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
if compression_info.compressed_size.is_some() {
|
||||
// The image has been considered for compression at least
|
||||
self.uncompressed_bytes_eligible += uncompressed_len;
|
||||
@@ -887,7 +897,8 @@ impl ImageLayerWriterInner {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, off)?;
|
||||
self.tree.append(&keybuf, off)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -907,8 +918,10 @@ impl ImageLayerWriterInner {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
if !self.key_range.contains(&key) {
|
||||
return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range")));
|
||||
}
|
||||
|
||||
// NB: we don't update the (un)compressed metrics, since we can't determine them without
|
||||
// decompressing the image. This seems okay.
|
||||
@@ -918,11 +931,18 @@ impl ImageLayerWriterInner {
|
||||
.blob_writer
|
||||
.write_blob_raw(raw_with_header.slice_len(), ctx)
|
||||
.await;
|
||||
let offset = res?;
|
||||
let offset = res.map_err(|e| match e {
|
||||
crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err {
|
||||
crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled,
|
||||
crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err),
|
||||
},
|
||||
other => ImageLayerWriterError::Other(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
self.tree.append(&keybuf, offset)?;
|
||||
self.tree.append(&keybuf, offset)
|
||||
.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
@@ -1118,7 +1138,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
self.inner.as_mut().unwrap().put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
@@ -1133,7 +1153,7 @@ impl ImageLayerWriter {
|
||||
key: Key,
|
||||
raw_with_header: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), ImageLayerWriterError> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
|
||||
@@ -582,6 +582,14 @@ impl InMemoryLayer {
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum InMemoryLayerError {
|
||||
#[error("flush task cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
/// Write path.
|
||||
///
|
||||
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
|
||||
@@ -591,7 +599,7 @@ impl InMemoryLayer {
|
||||
&self,
|
||||
serialized_batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), InMemoryLayerError> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
|
||||
@@ -605,7 +613,8 @@ impl InMemoryLayer {
|
||||
} = serialized_batch;
|
||||
|
||||
// Write the batch to the file
|
||||
inner.file.write_raw(&raw, ctx).await?;
|
||||
inner.file.write_raw(&raw, ctx).await
|
||||
.map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?;
|
||||
let new_size = inner.file.len();
|
||||
|
||||
let expected_new_len = base_offset
|
||||
|
||||
Reference in New Issue
Block a user