From 486d9f0c4dae6b17c1dcee5ec94ea7cdad531248 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 19:17:57 +0000 Subject: [PATCH] Address PR comments: restore maybe_fatal_err, use anyhow::Error::new, add TODO comments, fix string matching Co-Authored-By: christian@neon.tech --- pageserver/src/tenant/blob_io.rs | 4 +-- .../src/tenant/storage_layer/delta_layer.rs | 26 +++++++------- pageserver/src/tenant/timeline.rs | 3 ++ pageserver/src/tenant/timeline/compaction.rs | 36 +++++++------------ .../src/tenant/timeline/detach_ancestor.rs | 4 +-- 5 files changed, 30 insertions(+), 43 deletions(-) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 5bfea0d1c1..d65e791e4c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -92,7 +92,7 @@ impl Header { #[derive(Debug, thiserror::Error)] pub enum BlobWriterError { - #[error("flush task cancelled")] + #[error("cancelled")] Cancelled, #[error(transparent)] Other(anyhow::Error), @@ -247,7 +247,7 @@ where ctx: &RequestContext, flush_task_span: tracing::Span, ) -> Result { - let gate_token = gate.enter().map_err(|e| BlobWriterError::Other(e.into()))?; + let gate_token = gate.enter().map_err(|_| BlobWriterError::Cancelled)?; Ok(Self { io_buf: Some(BytesMut::new()), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index f42d6d1db7..1cb9db7568 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -485,7 +485,7 @@ impl DeltaLayerWriterInner { ctx: &RequestContext, ) -> Result<(), DeltaLayerWriterError> { let val_ser = - Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + Value::ser(&val).map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?; let (_, res) = self .put_value_bytes(key, lsn, val_ser.slice_len(), val.will_init(), ctx) @@ -585,14 +585,14 @@ impl DeltaLayerWriterInner { let (index_root_blk, block_buf) = self .tree .finish() - .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + .map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?; let mut offset = index_start_blk as u64 * PAGE_SZ as u64; // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 // Should we just replace BlockBuf::blocks with one big buffer for buf in block_buf.blocks { let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; - res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?; offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -611,15 +611,12 @@ impl DeltaLayerWriterInner { // Writes summary at the first block (offset 0). let buf = summary .ser_into_page() - .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + .map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?; let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; - res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; + res.map_err(|e| DeltaLayerWriterError::Other(anyhow::Error::new(e)))?; let metadata = file.metadata().await.map_err(|e| { - DeltaLayerWriterError::Other(anyhow::anyhow!( - "get file metadata to determine size: {}", - e - )) + DeltaLayerWriterError::Other(anyhow::Error::new(e)) })?; // 5GB limit for objects without multipart upload (which we don't want to use) @@ -632,6 +629,7 @@ impl DeltaLayerWriterInner { metadata.len() ))); } + // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't @@ -646,9 +644,9 @@ impl DeltaLayerWriterInner { ); // fsync the file - file.sync_all().await.map_err(|e| { - DeltaLayerWriterError::Other(anyhow::anyhow!("delta_layer sync_all: {}", e)) - })?; + file.sync_all() + .await + .maybe_fatal_err("delta_layer sync_all")?; trace!("created delta layer {}", self.path); @@ -786,7 +784,7 @@ impl DeltaLayerWriter { #[derive(Debug, thiserror::Error)] pub enum DeltaLayerWriterError { - #[error("flush task cancelled")] + #[error("cancelled")] Cancelled, #[error(transparent)] Other(anyhow::Error), @@ -802,7 +800,7 @@ pub enum RewriteSummaryError { impl From for RewriteSummaryError { fn from(e: std::io::Error) -> Self { - Self::Other(anyhow::anyhow!(e)) + Self::Other(anyhow::Error::new(e)) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index e6c505b3c7..4ab532348d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5863,6 +5863,8 @@ pub(crate) enum CompactionError { Other(anyhow::Error), #[error("Compaction already running: {0}")] AlreadyRunning(&'static str), + #[error("cancelled")] + Cancelled, } impl CompactionError { @@ -5877,6 +5879,7 @@ impl CompactionError { PageReconstructError::Cancelled )) | Self::Offload(OffloadError::Cancelled) + | Self::Cancelled ) } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9bf99442eb..ae3f3c1886 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1549,12 +1549,9 @@ impl Timeline { ctx, ) .await - .map_err(|e| { - if e.to_string().contains("flush task cancelled") { - CompactionError::Other(anyhow::anyhow!("flush task cancelled")) - } else { - CompactionError::Other(e) - } + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => CompactionError::Cancelled, + ImageLayerWriterError::Other(err) => CompactionError::Other(err), })?; // Safety of layer rewrites: @@ -2237,12 +2234,9 @@ impl Timeline { let (desc, path) = writer .finish(prev_key.unwrap().next(), ctx) .await - .map_err(|e| { - if e.to_string().contains("flush task cancelled") { - CompactionError::Other(anyhow::anyhow!("flush task cancelled")) - } else { - CompactionError::Other(e.into()) - } + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, + DeltaLayerWriterError::Other(err) => CompactionError::Other(err), })?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -3747,12 +3741,9 @@ impl Timeline { writer .finish_with_discard_fn(self, ctx, end_key, discard) .await - .map_err(|e| { - if e.to_string().contains("flush task cancelled") { - CompactionError::Other(anyhow::anyhow!("flush task cancelled")) - } else { - CompactionError::Other(e.into()) - } + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => CompactionError::Cancelled, + ImageLayerWriterError::Other(err) => CompactionError::Other(err), })? } else { drop(writer); @@ -3766,12 +3757,9 @@ impl Timeline { delta_layer_writer .finish_with_discard_fn(self, ctx, discard) .await - .map_err(|e| { - if e.to_string().contains("flush task cancelled") { - CompactionError::Other(anyhow::anyhow!("flush task cancelled")) - } else { - CompactionError::Other(e.into()) - } + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, + DeltaLayerWriterError::Other(err) => CompactionError::Other(err), })? } else { drop(delta_layer_writer); diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index e5a2758f05..36998272a0 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -818,9 +818,7 @@ async fn copy_lsn_prefix( .finish(reused_highest_key, ctx) .await .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => { - Error::Prepare(anyhow::anyhow!("flush task cancelled")) - } + DeltaLayerWriterError::Cancelled => Error::ShuttingDown, DeltaLayerWriterError::Other(err) => Error::Prepare(err), })?; let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path)