From 7a576d723cb0934a8817f7c99aeee90f48840375 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 16:29:57 +0000 Subject: [PATCH] Extend error handling to put_image, put_value_*, and put_batch methods Co-Authored-By: christian@neon.tech --- .../storage_layer/batch_split_writer.rs | 16 +++- .../src/tenant/storage_layer/delta_layer.rs | 59 ++++++++------- .../src/tenant/storage_layer/image_layer.rs | 34 ++++++--- .../tenant/storage_layer/inmemory_layer.rs | 24 +++++- pageserver/src/tenant/timeline.rs | 46 ++++++++++-- pageserver/src/tenant/timeline/compaction.rs | 74 ++++++++++++++----- 6 files changed, 187 insertions(+), 66 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 84f1572806..fc1676646c 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -274,7 +274,13 @@ impl<'a> SplitImageLayerWriter<'a> { ); self.start_key = key; } - self.inner.put_image(key, img, ctx).await + self.inner + .put_image(key, img, ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"), + ImageLayerWriterError::Other(err) => err, + }) } pub(crate) async fn finish_with_discard_fn( @@ -417,7 +423,13 @@ impl<'a> SplitDeltaLayerWriter<'a> { } self.last_key_written = key; let (_, inner) = self.inner.as_mut().unwrap(); - inner.put_value(key, lsn, val, ctx).await + inner + .put_value(key, lsn, val, ctx) + .await + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"), + DeltaLayerWriterError::Other(err) => err, + }) } pub(crate) async fn finish_with_discard_fn( diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 15695d6ef7..f42d6d1db7 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -484,17 +484,11 @@ impl DeltaLayerWriterInner { val: Value, ctx: &RequestContext, ) -> Result<(), DeltaLayerWriterError> { - let val_ser = Value::ser(&val) - .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; - + let val_ser = + Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + let (_, res) = self - .put_value_bytes( - key, - lsn, - val_ser.slice_len(), - val.will_init(), - ctx, - ) + .put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx) .await; res } @@ -511,29 +505,41 @@ impl DeltaLayerWriterInner { Buf: IoBuf + Send, { if self.lsn_range.start > lsn { - return (val, Err(DeltaLayerWriterError::Other(anyhow::anyhow!( - "lsn_start={}, lsn={}", - self.lsn_range.start, - lsn - )))); + return ( + val, + Err(DeltaLayerWriterError::Other(anyhow::anyhow!( + "lsn_start={}, lsn={}", + self.lsn_range.start, + lsn + ))), + ); } - + // We don't want to use compression in delta layer creation let compression = ImageCompressionAlgorithm::Disabled; let (val, res) = self .blob_writer .write_blob_maybe_compressed(val, ctx, compression) .await; - + let off = match res { Ok((off, _)) => off, - Err(e) => return (val, Err(match e { - crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { - crate::tenant::blob_io::BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled, - crate::tenant::blob_io::BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err), - }, - other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)), - })), + Err(e) => { + return ( + val, + Err(match e { + crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { + crate::tenant::blob_io::BlobWriterError::Cancelled => { + DeltaLayerWriterError::Cancelled + } + crate::tenant::blob_io::BlobWriterError::Other(err) => { + DeltaLayerWriterError::Other(err) + } + }, + other => DeltaLayerWriterError::Other(anyhow::anyhow!(other)), + }), + ); + } }; let blob_ref = BlobRef::new(off, will_init); @@ -543,7 +549,10 @@ impl DeltaLayerWriterInner { self.num_keys += 1; - (val, res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))) + ( + val, + res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e))), + ) } fn size(&self) -> u64 { diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 853e8c16ef..42aa247556 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -866,9 +866,11 @@ impl ImageLayerWriterInner { ctx: &RequestContext, ) -> Result<(), ImageLayerWriterError> { if !self.key_range.contains(&key) { - return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range"))); + return Err(ImageLayerWriterError::Other(anyhow::anyhow!( + "key not in range" + ))); } - + let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; @@ -880,12 +882,16 @@ impl ImageLayerWriterInner { // TODO: re-use the buffer for `img` further upstack let (off, compression_info) = res.map_err(|e| match e { crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { - crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled, - crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err), + crate::tenant::blob_io::BlobWriterError::Cancelled => { + ImageLayerWriterError::Cancelled + } + crate::tenant::blob_io::BlobWriterError::Other(err) => { + ImageLayerWriterError::Other(err) + } }, other => ImageLayerWriterError::Other(anyhow::anyhow!(other)), })?; - + if compression_info.compressed_size.is_some() { // The image has been considered for compression at least self.uncompressed_bytes_eligible += uncompressed_len; @@ -897,7 +903,8 @@ impl ImageLayerWriterInner { let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, off) + self.tree + .append(&keybuf, off) .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; #[cfg(feature = "testing")] @@ -920,7 +927,9 @@ impl ImageLayerWriterInner { ctx: &RequestContext, ) -> Result<(), ImageLayerWriterError> { if !self.key_range.contains(&key) { - return Err(ImageLayerWriterError::Other(anyhow::anyhow!("key not in range"))); + return Err(ImageLayerWriterError::Other(anyhow::anyhow!( + "key not in range" + ))); } // NB: we don't update the (un)compressed metrics, since we can't determine them without @@ -933,15 +942,20 @@ impl ImageLayerWriterInner { .await; let offset = res.map_err(|e| match e { crate::tenant::blob_io::WriteBlobError::Flush(blob_err) => match blob_err { - crate::tenant::blob_io::BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled, - crate::tenant::blob_io::BlobWriterError::Other(err) => ImageLayerWriterError::Other(err), + crate::tenant::blob_io::BlobWriterError::Cancelled => { + ImageLayerWriterError::Cancelled + } + crate::tenant::blob_io::BlobWriterError::Other(err) => { + ImageLayerWriterError::Other(err) + } }, other => ImageLayerWriterError::Other(anyhow::anyhow!(other)), })?; let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, offset) + self.tree + .append(&keybuf, offset) .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; #[cfg(feature = "testing")] diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 586b82d6a5..e601fab82a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -26,6 +26,7 @@ use utils::lsn::Lsn; use utils::vec_map::VecMap; use wal_decoder::serialized_batch::{SerializedValueBatch, SerializedValueMeta, ValueMeta}; +use super::delta_layer::DeltaLayerWriterError; use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState}; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64, u64_to_usize}; use crate::config::PageServerConf; @@ -615,7 +616,10 @@ impl InMemoryLayer { } = serialized_batch; // Write the batch to the file - inner.file.write_raw(&raw, ctx).await + inner + .file + .write_raw(&raw, ctx) + .await .map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?; let new_size = inner.file.len(); @@ -648,7 +652,8 @@ impl InMemoryLayer { batch_offset, len, will_init, - })?; + }) + .map_err(|e| InMemoryLayerError::Other(anyhow::anyhow!(e)))?; let vec_map = inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0; @@ -805,14 +810,25 @@ impl InMemoryLayer { ctx, ) .await; - res?; + res.map_err(|e| match e { + DeltaLayerWriterError::Cancelled => { + anyhow::anyhow!("flush task cancelled") + } + DeltaLayerWriterError::Other(err) => err, + })?; } } } } // MAX is used here because we identify L0 layers by full key range - let (desc, path) = delta_layer_writer.finish(Key::MAX, ctx).await?; + let (desc, path) = delta_layer_writer + .finish(Key::MAX, ctx) + .await + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"), + DeltaLayerWriterError::Other(err) => err, + })?; // Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``. // diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cfeab77598..e6c505b3c7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -119,6 +119,8 @@ use crate::tenant::gc_result::GcResult; use crate::tenant::layer_map::LayerMap; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; +use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; +use crate::tenant::storage_layer::inmemory_layer::InMemoryLayerError; use crate::tenant::storage_layer::inmemory_layer::IndexEntry; use crate::tenant::storage_layer::{ AsLayerDesc, BatchLayerWriter, DeltaLayerWriter, EvictionError, ImageLayerName, @@ -5232,7 +5234,17 @@ impl Timeline { }; // Write all the keys we just read into our new image layer. - image_layer_writer.put_image(img_key, img, ctx).await?; + image_layer_writer + .put_image(img_key, img, ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => CreateImageLayersError::Other( + anyhow::anyhow!("flush task cancelled"), + ), + ImageLayerWriterError::Other(err) => { + CreateImageLayersError::Other(err) + } + })?; wrote_keys = true; } } @@ -5329,7 +5341,15 @@ impl Timeline { // TODO: split image layers to avoid too large layer files. Too large image files are not handled // on the normal data path either. - image_layer_writer.put_image(k, v, ctx).await?; + image_layer_writer + .put_image(k, v, ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled")) + } + ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err), + })?; } if wrote_any_image { @@ -6922,9 +6942,22 @@ impl Timeline { ) .await?; for (key, img) in images { - image_layer_writer.put_image(key, img, ctx).await?; + image_layer_writer + .put_image(key, img, ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + anyhow::anyhow!("flush task cancelled") + } + ImageLayerWriterError::Other(err) => err, + })?; } - let (desc, path) = image_layer_writer.finish(ctx).await?; + let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + anyhow::anyhow!("flush task cancelled") + } + ImageLayerWriterError::Other(err) => err, + })?; let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; info!("force created image layer {}", image_layer.local_path()); { @@ -7378,7 +7411,10 @@ impl TimelineWriter<'_> { state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn)); } - res + res.map_err(|e| match e { + InMemoryLayerError::Cancelled => anyhow::anyhow!("flush task cancelled"), + InMemoryLayerError::Other(err) => err, + }) } #[cfg(test)] diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index f0142a0d40..d201d7ece0 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -15,7 +15,6 @@ use super::{ GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline, }; -use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; use crate::tenant::timeline::DeltaEntry; @@ -1549,7 +1548,13 @@ impl Timeline { ctx, ) .await - .map_err(CompactionError::Other)?; + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })?; // Safety of layer rewrites: // - We are writing to a different local file path than we are reading from, so the old Layer @@ -1571,11 +1576,12 @@ impl Timeline { .await?; if keys_written > 0 { - let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e { - ImageLayerWriterError::Cancelled => { + let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| { + if e.to_string().contains("flush task cancelled") { CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) } - ImageLayerWriterError::Other(err) => CompactionError::Other(err), })?; let new_layer = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -2144,11 +2150,12 @@ impl Timeline { .unwrap() .finish(prev_key.unwrap().next(), ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => { + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) } - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), })?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -2208,7 +2215,13 @@ impl Timeline { .unwrap() .put_value(key, lsn, value, ctx) .await - .map_err(CompactionError::Other)?; + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })?; } else { let owner = self.shard_identity.get_shard_number(&key); @@ -2226,11 +2239,12 @@ impl Timeline { let (desc, path) = writer .finish(prev_key.unwrap().next(), ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => { + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) } - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), })?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -3696,8 +3710,13 @@ impl Timeline { let (desc, path) = delta_writer_before .finish(job_desc.compaction_key_range.start, ctx) .await - .context("failed to finish delta layer writer") - .map_err(CompactionError::Other)?; + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })?; let layer = Layer::finish_creating(self.conf, self, desc, &path) .context("failed to finish creating delta layer") .map_err(CompactionError::Other)?; @@ -3707,8 +3726,13 @@ impl Timeline { let (desc, path) = delta_writer_after .finish(key.key_range.end, ctx) .await - .context("failed to finish delta layer writer") - .map_err(CompactionError::Other)?; + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })?; let layer = Layer::finish_creating(self.conf, self, desc, &path) .context("failed to finish creating delta layer") .map_err(CompactionError::Other)?; @@ -3727,8 +3751,13 @@ impl Timeline { writer .finish_with_discard_fn(self, ctx, end_key, discard) .await - .context("failed to finish image layer writer") - .map_err(CompactionError::Other)? + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })? } else { drop(writer); Vec::new() @@ -3741,8 +3770,13 @@ impl Timeline { delta_layer_writer .finish_with_discard_fn(self, ctx, discard) .await - .context("failed to finish delta layer writer") - .map_err(CompactionError::Other)? + .map_err(|e| { + if e.to_string().contains("flush task cancelled") { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } else { + CompactionError::Other(anyhow::anyhow!(e)) + } + })? } else { drop(delta_layer_writer); Vec::new()