diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 39cd02d101..84f1572806 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -18,6 +18,8 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::tenant::Timeline; use crate::tenant::storage_layer::Layer; +use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; +use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; pub(crate) enum BatchWriterResult { Produced(ResidentLayer), @@ -139,14 +141,24 @@ impl BatchLayerWriter { generated_layers.push(BatchWriterResult::Discarded(layer_key)); } else { let res = match inner { - LayerWriterWrapper::Delta(writer) => { - writer.finish(layer_key.key_range.end, ctx).await - } - LayerWriterWrapper::Image(writer) => { - writer - .finish_with_end_key(layer_key.key_range.end, ctx) - .await - } + LayerWriterWrapper::Delta(writer) => writer + .finish(layer_key.key_range.end, ctx) + .await + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => { + anyhow::anyhow!("flush task cancelled") + } + DeltaLayerWriterError::Other(err) => err, + }), + LayerWriterWrapper::Image(writer) => writer + .finish_with_end_key(layer_key.key_range.end, ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + anyhow::anyhow!("flush task cancelled") + } + ImageLayerWriterError::Other(err) => err, + }), }; let layer = match res { Ok((desc, path)) => { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5594a596b1..3833651c94 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -545,7 +545,7 @@ impl DeltaLayerWriterInner { self, key_end: Key, ctx: &RequestContext, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + ) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; let file = self @@ -554,17 +554,24 @@ impl DeltaLayerWriterInner { BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ), ctx, ) - .await?; + .await + .map_err(|e| match e { + BlobWriterError::Cancelled => DeltaLayerWriterError::Cancelled, + BlobWriterError::Other(err) => DeltaLayerWriterError::Other(err), + })?; // Write out the index - let (index_root_blk, block_buf) = self.tree.finish()?; + let (index_root_blk, block_buf) = self + .tree + .finish() + .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; let mut offset = index_start_blk as u64 * PAGE_SZ as u64; // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 // Should we just replace BlockBuf::blocks with one big buffer for buf in block_buf.blocks { let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; - res?; + res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; offset += PAGE_SZ as u64; } assert!(self.lsn_range.start < self.lsn_range.end); @@ -581,24 +588,29 @@ impl DeltaLayerWriterInner { }; // Writes summary at the first block (offset 0). - let buf = summary.ser_into_page()?; + let buf = summary + .ser_into_page() + .map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; - res?; + res.map_err(|e| DeltaLayerWriterError::Other(anyhow::anyhow!(e)))?; - let metadata = file - .metadata() - .await - .context("get file metadata to determine size")?; + let metadata = file.metadata().await.map_err(|e| { + DeltaLayerWriterError::Other(anyhow::anyhow!( + "get file metadata to determine size: {}", + e + )) + })?; // 5GB limit for objects without multipart upload (which we don't want to use) // Make it a little bit below to account for differing GB units // https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html - ensure!( - metadata.len() <= S3_UPLOAD_LIMIT, - "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", - file.path(), - metadata.len() - ); + if metadata.len() > S3_UPLOAD_LIMIT { + return Err(DeltaLayerWriterError::Other(anyhow::anyhow!( + "Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!", + file.path(), + metadata.len() + ))); + } // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't @@ -613,9 +625,9 @@ impl DeltaLayerWriterInner { ); // fsync the file - file.sync_all() - .await - .maybe_fatal_err("delta_layer sync_all")?; + file.sync_all().await.map_err(|e| { + DeltaLayerWriterError::Other(anyhow::anyhow!("delta_layer sync_all: {}", e)) + })?; trace!("created delta layer {}", self.path); @@ -737,7 +749,7 @@ impl DeltaLayerWriter { mut self, key_end: Key, ctx: &RequestContext, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + ) -> Result<(PersistentLayerDesc, Utf8PathBuf), DeltaLayerWriterError> { self.inner.take().unwrap().finish(key_end, ctx).await } @@ -751,6 +763,14 @@ impl DeltaLayerWriter { } } +#[derive(Debug, thiserror::Error)] +pub enum DeltaLayerWriterError { + #[error("flush task cancelled")] + Cancelled, + #[error(transparent)] + Other(anyhow::Error), +} + #[derive(thiserror::Error, Debug)] pub enum RewriteSummaryError { #[error("magic mismatch")] diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8050152b33..67281ee3bc 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -342,6 +342,14 @@ impl ImageLayer { } } +#[derive(Debug, thiserror::Error)] +pub enum ImageLayerWriterError { + #[error("flush task cancelled")] + Cancelled, + #[error(transparent)] + Other(anyhow::Error), +} + #[derive(thiserror::Error, Debug)] pub enum RewriteSummaryError { #[error("magic mismatch")] @@ -931,7 +939,7 @@ impl ImageLayerWriterInner { self, ctx: &RequestContext, end_key: Option, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + ) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> { let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32; // Calculate compression ratio @@ -954,17 +962,24 @@ impl ImageLayerWriterInner { BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ), ctx, ) - .await?; + .await + .map_err(|e| match e { + BlobWriterError::Cancelled => ImageLayerWriterError::Cancelled, + BlobWriterError::Other(err) => ImageLayerWriterError::Other(err), + })?; // Write out the index let mut offset = index_start_blk as u64 * PAGE_SZ as u64; - let (index_root_blk, block_buf) = self.tree.finish()?; + let (index_root_blk, block_buf) = self + .tree + .finish() + .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; // TODO(yuchen): https://github.com/neondatabase/neon/issues/10092 // Should we just replace BlockBuf::blocks with one big buffer? for buf in block_buf.blocks { let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await; - res?; + res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; offset += PAGE_SZ as u64; } @@ -987,14 +1002,18 @@ impl ImageLayerWriterInner { }; // Writes summary at the first block (offset 0). - let buf = summary.ser_into_page()?; + let buf = summary + .ser_into_page() + .map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await; - res?; + res.map_err(|e| ImageLayerWriterError::Other(anyhow::anyhow!(e)))?; - let metadata = file - .metadata() - .await - .context("get metadata to determine file size")?; + let metadata = file.metadata().await.map_err(|e| { + ImageLayerWriterError::Other(anyhow::anyhow!( + "get metadata to determine file size: {}", + e + )) + })?; let desc = PersistentLayerDesc::new_img( self.tenant_shard_id, @@ -1017,9 +1036,9 @@ impl ImageLayerWriterInner { // set inner.file here. The first read will have to re-open it. // fsync the file - file.sync_all() - .await - .maybe_fatal_err("image_layer sync_all")?; + file.sync_all().await.map_err(|e| { + ImageLayerWriterError::Other(anyhow::anyhow!("image_layer sync_all: {}", e)) + })?; trace!("created image layer {}", self.path); @@ -1138,7 +1157,7 @@ impl ImageLayerWriter { pub(crate) async fn finish( mut self, ctx: &RequestContext, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + ) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> { self.inner.take().unwrap().finish(ctx, None).await } @@ -1147,7 +1166,7 @@ impl ImageLayerWriter { mut self, end_key: Key, ctx: &RequestContext, - ) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> { + ) -> Result<(PersistentLayerDesc, Utf8PathBuf), ImageLayerWriterError> { self.inner.take().unwrap().finish(ctx, Some(end_key)).await } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9086d29d50..f0142a0d40 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -15,6 +15,8 @@ use super::{ GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline, }; +use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; +use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; use crate::tenant::timeline::DeltaEntry; use crate::walredo::RedoAttemptType; @@ -1569,10 +1571,12 @@ impl Timeline { .await?; if keys_written > 0 { - let (desc, path) = image_layer_writer - .finish(ctx) - .await - .map_err(CompactionError::Other)?; + let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } + ImageLayerWriterError::Other(err) => CompactionError::Other(err), + })?; let new_layer = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; info!(layer=%new_layer, "rewrote layer, {} -> {} bytes", @@ -2140,7 +2144,12 @@ impl Timeline { .unwrap() .finish(prev_key.unwrap().next(), ctx) .await - .map_err(CompactionError::Other)?; + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } + DeltaLayerWriterError::Other(err) => CompactionError::Other(err), + })?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -2217,7 +2226,12 @@ impl Timeline { let (desc, path) = writer .finish(prev_key.unwrap().next(), ctx) .await - .map_err(CompactionError::Other)?; + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => { + CompactionError::Other(anyhow::anyhow!("flush task cancelled")) + } + DeltaLayerWriterError::Other(err) => CompactionError::Other(err), + })?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; new_layers.push(new_delta); @@ -4253,7 +4267,15 @@ impl TimelineAdaptor { unfinished_image_layer, } = outcome { - let (desc, path) = unfinished_image_layer.finish(ctx).await?; + let (desc, path) = unfinished_image_layer + .finish(ctx) + .await + .map_err(|e| match e { + ImageLayerWriterError::Cancelled => { + CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled")) + } + ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err), + })?; let image_layer = Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_images.push(image_layer); diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 649b33e294..e5a2758f05 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::sync::Arc; +use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; use anyhow::Context; use bytes::Bytes; use http_utils::error::ApiError; @@ -816,7 +817,12 @@ async fn copy_lsn_prefix( let (desc, path) = writer .finish(reused_highest_key, ctx) .await - .map_err(Error::Prepare)?; + .map_err(|e| match e { + DeltaLayerWriterError::Cancelled => { + Error::Prepare(anyhow::anyhow!("flush task cancelled")) + } + DeltaLayerWriterError::Other(err) => Error::Prepare(err), + })?; let copied = Layer::finish_creating(target_timeline.conf, target_timeline, desc, &path) .map_err(Error::Prepare)?;