diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 3833651c94..15695d6ef7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -483,12 +483,15 @@ impl DeltaLayerWriterInner { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), DeltaLayerWriterError> { + let val_ser = Value::ser(&val) + .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + let (_, res) = self .put_value_bytes( key, lsn, - Value::ser(&val)?.slice_len(), + val_ser.slice_len(), val.will_init(), ctx, ) @@ -503,25 +506,34 @@ impl DeltaLayerWriterInner { val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (FullSlice, anyhow::Result<()>) + ) -> (FullSlice, Result<(), DeltaLayerWriterError>) where Buf: IoBuf + Send, { - assert!( - self.lsn_range.start <= lsn, - "lsn_start={}, lsn={}", - self.lsn_range.start, - lsn - ); + if self.lsn_range.start > lsn { + return (val, Err(DeltaLayerWriterError::Other(anyhow::anyhow!( + "lsn_start={}, lsn={}", + self.lsn_range.start, + lsn + )))); + } + // We don't want to use compression in delta layer creation let compression = ImageCompressionAlgorithm::Disabled; let (val, res) = self .blob_writer .write_blob_maybe_compressed(val, ctx, compression) .await; + let off = match res { Ok((off, _)) => off, - Err(e) => return (val, Err(anyhow::anyhow!(e))), + Err(e) => return (val, Err(match e { + crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { + crate::tenant::blob_io::BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled, + crate::tenant::blob_io::BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err), + }, + other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)), + })), }; let blob_ref = BlobRef::new(off, will_init); @@ -531,7 +543,7 @@ impl DeltaLayerWriterInner { self.num_keys += 1; - (val, res.map_err(|e| anyhow::anyhow!(e))) + (val, res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))) } fn size(&self) -> u64 { @@ -712,7 +724,7 @@ impl DeltaLayerWriter { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), DeltaLayerWriterError> { self.inner .as_mut() .unwrap() @@ -727,7 +739,7 @@ impl DeltaLayerWriter { val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (FullSlice, anyhow::Result<()>) + ) -> (FullSlice, Result<(), DeltaLayerWriterError>) where Buf: IoBuf + Send, { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 67281ee3bc..853e8c16ef 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -864,8 +864,11 @@ impl ImageLayerWriterInner { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { - ensure!(self.key_range.contains(&key)); + ) -> Result<(), ImageLayerWriterError> { + if !self.key_range.contains(&key) { + return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range"))); + } + let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; @@ -875,7 +878,14 @@ impl ImageLayerWriterInner { .write_blob_maybe_compressed(img.slice_len(), ctx, compression) .await; // TODO: re-use the buffer for `img` further upstack - let (off, compression_info) = res?; + let (off, compression_info) = res.map_err(|e| match e { + crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { + crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled, + crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err), + }, + other => ImageLayerWriterError::Other(anyhow::anyhow!(other)), + })?; + if compression_info.compressed_size.is_some() { // The image has been considered for compression at least self.uncompressed_bytes_eligible += uncompressed_len; @@ -887,7 +897,8 @@ impl ImageLayerWriterInner { let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, off)?; + self.tree.append(&keybuf, off) + .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; #[cfg(feature = "testing")] { @@ -907,8 +918,10 @@ impl ImageLayerWriterInner { key: Key, raw_with_header: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { - ensure!(self.key_range.contains(&key)); + ) -> Result<(), ImageLayerWriterError> { + if !self.key_range.contains(&key) { + return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range"))); + } // NB: we don't update the (un)compressed metrics, since we can't determine them without // decompressing the image. This seems okay. @@ -918,11 +931,18 @@ impl ImageLayerWriterInner { .blob_writer .write_blob_raw(raw_with_header.slice_len(), ctx) .await; - let offset = res?; + let offset = res.map_err(|e| match e { + crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { + crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled, + crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err), + }, + other => ImageLayerWriterError::Other(anyhow::anyhow!(other)), + })?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, offset)?; + self.tree.append(&keybuf, offset) + .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; #[cfg(feature = "testing")] { @@ -1118,7 +1138,7 @@ impl ImageLayerWriter { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), ImageLayerWriterError> { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } @@ -1133,7 +1153,7 @@ impl ImageLayerWriter { key: Key, raw_with_header: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), ImageLayerWriterError> { self.inner .as_mut() .unwrap() diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 5d558e66cc..bad447b5d9 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -582,6 +582,14 @@ impl InMemoryLayer { }) } + #[derive(Debug, thiserror::Error)] + pub enum InMemoryLayerError { + #[error("flush task cancelled")] + Cancelled, + #[error(transparent)] + Other(anyhow::Error), + } + /// Write path. /// /// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from. @@ -591,7 +599,7 @@ impl InMemoryLayer { &self, serialized_batch: SerializedValueBatch, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), InMemoryLayerError> { let mut inner = self.inner.write().await; self.assert_writable(); @@ -605,7 +613,8 @@ impl InMemoryLayer { } = serialized_batch; // Write the batch to the file - inner.file.write_raw(&raw, ctx).await?; + inner.file.write_raw(&raw, ctx).await + .map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?; let new_size = inner.file.len(); let expected_new_len = base_offset