From 1d1502bc167a2d0372756650581b4666597120c8 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 8 May 2025 08:57:53 +0200 Subject: [PATCH] fix(pageserver): `flush task cancelled` errors during timeline shutdown (#11853) # Refs - fixes https://github.com/neondatabase/neon/issues/11762 # Problem PR #10993 introduced internal retries for BufferedWriter flushes. PR #11052 added cancellation sensitivity to that retry loop. That cancellation sensitivity is an error path that didn't exist before. The result is that during timeline shutdown, after we `Timeline::cancel`, compaction can now fail with error `flush task cancelled`. The problem with that: 1. We mis-classify this as an `error!`-worthy event. 2. This causes tests to become flaky because the error is not in global `allowed_errors`. Technically we also trip the `compaction_circuit_breaker` because the resulting `CompactionError` is variant `::Other`. But since this is Timeline shutdown, is doesn't matter practically speaking. # Solution / Changes - Log the anyhow stack trace when classifying a compaction error as `error!`. This was helpful to identify sources of `flush task cancelled` errors. We only log at `error!` level in exceptional circumstances, so, it's ok to have bit verbose logs. - Introduce typed errors along the `BufferedWriter::write_*`=> `BlobWriter::write_blob` => `{Delta,Image}LayerWriter::put_*` => `Split{Delta,Image}LayerWriter::put_{value,image}` chain. - Proper mapping to `CompactionError`/`CreateImageLayersError` via new `From` impls. I am usually opposed to any magic `From` impls, but, it's how most of the compaction code works today. # Testing The symptoms are most prevalent in `test_runner/regress/test_branch_and_gc.py::test_branch_and_gc`. Before this PR, I was able to reproduce locally 1 or 2 times per 400 runs using `DEFAULT_PG_VERSION=15 BUILD_TYPE=release poetry run pytest --count 400 -n 8`. After this PR, it doesn't reproduce anymore after 2000 runs. # Future Work Technically the ingest path is also exposed to this new source of errors because `InMemoryLayer` is backed by `BufferedWriter`. But we haven't seen it occur in flaky tests yet. Details and a fix in - https://github.com/neondatabase/neon/pull/11851 --- pageserver/src/tenant/blob_io.rs | 27 ++++++++++++++----- pageserver/src/tenant/storage_layer.rs | 1 + .../storage_layer/batch_split_writer.rs | 18 ++++++++----- .../src/tenant/storage_layer/delta_layer.rs | 25 +++++++++++------ pageserver/src/tenant/storage_layer/errors.rs | 24 +++++++++++++++++ .../src/tenant/storage_layer/image_layer.rs | 20 ++++++++++---- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/tenant/timeline.rs | 20 ++++++++++++++ pageserver/src/tenant/timeline/compaction.rs | 3 +-- .../owned_buffers_io/write/flush.rs | 13 +++++++++ 10 files changed, 124 insertions(+), 29 deletions(-) create mode 100644 pageserver/src/tenant/storage_layer/errors.rs diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 8cf3c548c9..ed541c4f12 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -94,10 +94,23 @@ impl Header { pub enum WriteBlobError { #[error(transparent)] Flush(FlushTaskError), - #[error("blob too large ({len} bytes)")] - BlobTooLarge { len: usize }, #[error(transparent)] - WriteBlobRaw(anyhow::Error), + Other(anyhow::Error), +} + +impl WriteBlobError { + pub fn is_cancel(&self) -> bool { + match self { + WriteBlobError::Flush(e) => e.is_cancel(), + WriteBlobError::Other(_) => false, + } + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + WriteBlobError::Flush(e) => e.into_anyhow(), + WriteBlobError::Other(e) => e, + } + } } impl BlockCursor<'_> { @@ -327,7 +340,9 @@ where return ( ( io_buf.slice_len(), - Err(WriteBlobError::BlobTooLarge { len }), + Err(WriteBlobError::Other(anyhow::anyhow!( + "blob too large ({len} bytes)" + ))), ), srcbuf, ); @@ -391,7 +406,7 @@ where // Verify the header, to ensure we don't write invalid/corrupt data. let header = match Header::decode(&raw_with_header) .context("decoding blob header") - .map_err(WriteBlobError::WriteBlobRaw) + .map_err(WriteBlobError::Other) { Ok(header) => header, Err(err) => return (raw_with_header, Err(err)), @@ -401,7 +416,7 @@ where let raw_len = raw_with_header.len(); return ( raw_with_header, - Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!( + Err(WriteBlobError::Other(anyhow::anyhow!( "header length mismatch: {header_total_len} != {raw_len}" ))), ); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 796ad01e54..5dfa961b71 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -2,6 +2,7 @@ pub mod batch_split_writer; pub mod delta_layer; +pub mod errors; pub mod filter_iterator; pub mod image_layer; pub mod inmemory_layer; diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 39cd02d101..51f2e909a2 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -10,6 +10,7 @@ use utils::id::TimelineId; use utils::lsn::Lsn; use utils::shard::TenantShardId; +use super::errors::PutError; use super::layer::S3_UPLOAD_LIMIT; use super::{ DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer, @@ -235,7 +236,7 @@ impl<'a> SplitImageLayerWriter<'a> { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), PutError> { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -253,7 +254,8 @@ impl<'a> SplitImageLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?; + .await + .map_err(PutError::Other)?; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); self.batches.add_unfinished_image_writer( prev_image_writer, @@ -346,7 +348,7 @@ impl<'a> SplitDeltaLayerWriter<'a> { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), PutError> { // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate // number, and therefore the final layer size could be a little bit larger or smaller than the target. // @@ -366,7 +368,8 @@ impl<'a> SplitDeltaLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?, + .await + .map_err(PutError::Other)?, )); } let (_, inner) = self.inner.as_mut().unwrap(); @@ -386,7 +389,8 @@ impl<'a> SplitDeltaLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?; + .await + .map_err(PutError::Other)?; let (start_key, prev_delta_writer) = self.inner.replace((key, next_delta_writer)).unwrap(); self.batches.add_unfinished_delta_writer( @@ -396,11 +400,11 @@ impl<'a> SplitDeltaLayerWriter<'a> { ); } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. - anyhow::bail!( + return Err(PutError::Other(anyhow::anyhow!( "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", key, inner.estimated_size() - ); + ))); } } self.last_key_written = key; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 11875ac653..2c1b27c8d5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -55,6 +55,7 @@ use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use super::errors::PutError; use super::{ AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer, ValuesReconstructState, @@ -477,12 +478,15 @@ impl DeltaLayerWriterInner { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), PutError> { let (_, res) = self .put_value_bytes( key, lsn, - Value::ser(&val)?.slice_len(), + Value::ser(&val) + .map_err(anyhow::Error::new) + .map_err(PutError::Other)? + .slice_len(), val.will_init(), ctx, ) @@ -497,7 +501,7 @@ impl DeltaLayerWriterInner { val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (FullSlice, anyhow::Result<()>) + ) -> (FullSlice, Result<(), PutError>) where Buf: IoBuf + Send, { @@ -513,19 +517,24 @@ impl DeltaLayerWriterInner { .blob_writer .write_blob_maybe_compressed(val, ctx, compression) .await; + let res = res.map_err(PutError::WriteBlob); let off = match res { Ok((off, _)) => off, - Err(e) => return (val, Err(anyhow::anyhow!(e))), + Err(e) => return (val, Err(e)), }; let blob_ref = BlobRef::new(off, will_init); let delta_key = DeltaKey::from_key_lsn(&key, lsn); - let res = self.tree.append(&delta_key.0, blob_ref.0); + let res = self + .tree + .append(&delta_key.0, blob_ref.0) + .map_err(anyhow::Error::new) + .map_err(PutError::Other); self.num_keys += 1; - (val, res.map_err(|e| anyhow::anyhow!(e))) + (val, res) } fn size(&self) -> u64 { @@ -694,7 +703,7 @@ impl DeltaLayerWriter { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), PutError> { self.inner .as_mut() .unwrap() @@ -709,7 +718,7 @@ impl DeltaLayerWriter { val: FullSlice, will_init: bool, ctx: &RequestContext, - ) -> (FullSlice, anyhow::Result<()>) + ) -> (FullSlice, Result<(), PutError>) where Buf: IoBuf + Send, { diff --git a/pageserver/src/tenant/storage_layer/errors.rs b/pageserver/src/tenant/storage_layer/errors.rs new file mode 100644 index 0000000000..591e489faa --- /dev/null +++ b/pageserver/src/tenant/storage_layer/errors.rs @@ -0,0 +1,24 @@ +use crate::tenant::blob_io::WriteBlobError; + +#[derive(Debug, thiserror::Error)] +pub enum PutError { + #[error(transparent)] + WriteBlob(WriteBlobError), + #[error(transparent)] + Other(anyhow::Error), +} + +impl PutError { + pub fn is_cancel(&self) -> bool { + match self { + PutError::WriteBlob(e) => e.is_cancel(), + PutError::Other(_) => false, + } + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + PutError::WriteBlob(e) => e.into_anyhow(), + PutError::Other(e) => e, + } + } +} diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index d684230572..740f53f928 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -53,6 +53,7 @@ use utils::bin_ser::SerializeError; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use super::errors::PutError; use super::layer_name::ImageLayerName; use super::{ AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer, @@ -842,8 +843,14 @@ impl ImageLayerWriterInner { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { - ensure!(self.key_range.contains(&key)); + ) -> Result<(), PutError> { + if !self.key_range.contains(&key) { + return Err(PutError::Other(anyhow::anyhow!( + "key {:?} not in range {:?}", + key, + self.key_range + ))); + } let compression = self.conf.image_compression; let uncompressed_len = img.len() as u64; self.uncompressed_bytes += uncompressed_len; @@ -853,7 +860,7 @@ impl ImageLayerWriterInner { .write_blob_maybe_compressed(img.slice_len(), ctx, compression) .await; // TODO: re-use the buffer for `img` further upstack - let (off, compression_info) = res?; + let (off, compression_info) = res.map_err(PutError::WriteBlob)?; if compression_info.compressed_size.is_some() { // The image has been considered for compression at least self.uncompressed_bytes_eligible += uncompressed_len; @@ -865,7 +872,10 @@ impl ImageLayerWriterInner { let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - self.tree.append(&keybuf, off)?; + self.tree + .append(&keybuf, off) + .map_err(anyhow::Error::new) + .map_err(PutError::Other)?; #[cfg(feature = "testing")] { @@ -1085,7 +1095,7 @@ impl ImageLayerWriter { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), PutError> { self.inner.as_mut().unwrap().put_image(key, img, ctx).await } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 1112a5330b..4709a6d616 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -340,7 +340,7 @@ pub(crate) fn log_compaction_error( } else { match level { Level::ERROR if degrade_to_warning => warn!("Compaction failed and discarded: {err:#}"), - Level::ERROR => error!("Compaction failed: {err:#}"), + Level::ERROR => error!("Compaction failed: {err:?}"), Level::INFO => info!("Compaction failed: {err:#}"), level => unimplemented!("unexpected level {level:?}"), } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cfeab77598..c8d897d074 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -987,6 +987,16 @@ impl From for CreateImageLayersError { } } +impl From for CreateImageLayersError { + fn from(e: super::storage_layer::errors::PutError) -> Self { + if e.is_cancel() { + CreateImageLayersError::Cancelled + } else { + CreateImageLayersError::Other(e.into_anyhow()) + } + } +} + impl From for CreateImageLayersError { fn from(e: GetVectoredError) -> Self { match e { @@ -5923,6 +5933,16 @@ impl From for CompactionError { } } +impl From for CompactionError { + fn from(e: super::storage_layer::errors::PutError) -> Self { + if e.is_cancel() { + CompactionError::ShuttingDown + } else { + CompactionError::Other(e.into_anyhow()) + } + } +} + #[serde_as] #[derive(serde::Serialize)] struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index d0c13d86ce..07cd274a41 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -2204,8 +2204,7 @@ impl Timeline { .as_mut() .unwrap() .put_value(key, lsn, value, ctx) - .await - .map_err(CompactionError::Other)?; + .await?; } else { let owner = self.shard_identity.get_shard_number(&key); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index b41a9f6cd2..ac9867e8b4 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -247,6 +247,19 @@ pub enum FlushTaskError { Cancelled, } +impl FlushTaskError { + pub fn is_cancel(&self) -> bool { + match self { + FlushTaskError::Cancelled => true, + } + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + FlushTaskError::Cancelled => anyhow::anyhow!(self), + } + } +} + impl FlushBackgroundTask where Buf: IoBufAligned + Send + Sync,