diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8b6500b020..c4d71b9a27 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2469,6 +2469,7 @@ async fn timeline_checkpoint_handler( .map_err(|e| match e { CompactionError::ShuttingDown => ApiError::ShuttingDown, + CompactionError::Cancelled => ApiError::ShuttingDown, CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)), CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)), CompactionError::Other(e) => ApiError::InternalServerError(e), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e59db74479..1a69b7d3b2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3198,6 +3198,7 @@ impl TenantShard { match err { err if err.is_cancel() => {} CompactionError::ShuttingDown => (), + CompactionError::Cancelled => (), // Offload failures don't trip the circuit breaker, since they're cheap to retry and // shouldn't block compaction. CompactionError::Offload(_) => {} diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index fc1676646c..13bfff31a6 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -18,8 +18,14 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::tenant::Timeline; use crate::tenant::storage_layer::Layer; -use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; -use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; + +#[derive(Debug, thiserror::Error)] +pub enum BatchSplitWriterError { + #[error("cancelled")] + Cancelled, + #[error(transparent)] + Other(anyhow::Error), +} pub(crate) enum BatchWriterResult { Produced(ResidentLayer), @@ -99,7 +105,7 @@ impl BatchLayerWriter { self, tline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result> { + ) -> Result, BatchSplitWriterError> { let res = self .finish_with_discard_fn(tline, ctx, |_| async { false }) .await?; @@ -117,7 +123,7 @@ impl BatchLayerWriter { tline: &Arc, ctx: &RequestContext, discard_fn: D, - ) -> anyhow::Result> + ) -> Result, BatchSplitWriterError> where D: Fn(&PersistentLayerKey) -> F, F: Future, @@ -144,21 +150,11 @@ impl BatchLayerWriter { LayerWriterWrapper::Delta(writer) => writer .finish(layer_key.key_range.end, ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => { - anyhow::anyhow!("flush task cancelled") - } - DeltaLayerWriterError::Other(err) => err, - }), + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))), LayerWriterWrapper::Image(writer) => writer .finish_with_end_key(layer_key.key_range.end, ctx) .await - .map_err(|e| match e { - ImageLayerWriterError::Cancelled => { - anyhow::anyhow!("flush task cancelled") - } - ImageLayerWriterError::Other(err) => err, - }), + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))), }; let layer = match res { Ok((desc, path)) => { @@ -167,7 +163,7 @@ impl BatchLayerWriter { Err(e) => { tokio::fs::remove_file(&path).await.ok(); clean_up_layers(generated_layers); - return Err(e); + return Err(BatchSplitWriterError::Other(e)); } } } @@ -247,7 +243,7 @@ impl<'a> SplitImageLayerWriter<'a> { key: Key, img: Bytes, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), BatchSplitWriterError> { // The current estimation is an upper bound of the space that the key/image could take // because we did not consider compression in this estimation. The resulting image layer // could be smaller than the target size. @@ -265,7 +261,8 @@ impl<'a> SplitImageLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?; + .await + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?; let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer); self.batches.add_unfinished_image_writer( prev_image_writer, @@ -277,10 +274,7 @@ impl<'a> SplitImageLayerWriter<'a> { self.inner .put_image(key, img, ctx) .await - .map_err(|e| match e { - ImageLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"), - ImageLayerWriterError::Other(err) => err, - }) + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))) } pub(crate) async fn finish_with_discard_fn( @@ -289,7 +283,7 @@ impl<'a> SplitImageLayerWriter<'a> { ctx: &RequestContext, end_key: Key, discard_fn: D, - ) -> anyhow::Result> + ) -> Result, BatchSplitWriterError> where D: Fn(&PersistentLayerKey) -> F, F: Future, @@ -309,7 +303,7 @@ impl<'a> SplitImageLayerWriter<'a> { tline: &Arc, ctx: &RequestContext, end_key: Key, - ) -> anyhow::Result> { + ) -> Result, BatchSplitWriterError> { self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false }) .await } @@ -364,7 +358,7 @@ impl<'a> SplitDeltaLayerWriter<'a> { lsn: Lsn, val: Value, ctx: &RequestContext, - ) -> anyhow::Result<()> { + ) -> Result<(), BatchSplitWriterError> { // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate // number, and therefore the final layer size could be a little bit larger or smaller than the target. // @@ -384,7 +378,8 @@ impl<'a> SplitDeltaLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?, + .await + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?, )); } let (_, inner) = self.inner.as_mut().unwrap(); @@ -404,7 +399,8 @@ impl<'a> SplitDeltaLayerWriter<'a> { self.cancel.clone(), ctx, ) - .await?; + .await + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e)))?; let (start_key, prev_delta_writer) = self.inner.replace((key, next_delta_writer)).unwrap(); self.batches.add_unfinished_delta_writer( @@ -414,11 +410,11 @@ impl<'a> SplitDeltaLayerWriter<'a> { ); } else if inner.estimated_size() >= S3_UPLOAD_LIMIT { // We have to produce a very large file b/c a key is updated too often. - anyhow::bail!( + return Err(BatchSplitWriterError::Other(anyhow::anyhow!( "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced", key, inner.estimated_size() - ); + ))); } } self.last_key_written = key; @@ -426,10 +422,7 @@ impl<'a> SplitDeltaLayerWriter<'a> { inner .put_value(key, lsn, val, ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => anyhow::anyhow!("flush task cancelled"), - DeltaLayerWriterError::Other(err) => err, - }) + .map_err(|e| BatchSplitWriterError::Other(anyhow::anyhow!(e))) } pub(crate) async fn finish_with_discard_fn( @@ -437,7 +430,7 @@ impl<'a> SplitDeltaLayerWriter<'a> { tline: &Arc, ctx: &RequestContext, discard_fn: D, - ) -> anyhow::Result> + ) -> Result, BatchSplitWriterError> where D: Fn(&PersistentLayerKey) -> F, F: Future, @@ -463,7 +456,7 @@ impl<'a> SplitDeltaLayerWriter<'a> { self, tline: &Arc, ctx: &RequestContext, - ) -> anyhow::Result> { + ) -> Result, BatchSplitWriterError> { self.finish_with_discard_fn(tline, ctx, |_| async { false }) .await } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 1d399c92ed..e5f9cfb511 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -37,7 +37,7 @@ use std::sync::atomic::AtomicU64; use crate::tenant::blob_io::BlobWriterError; -use anyhow::{Context, Result, bail, ensure}; +use anyhow::{Context, Result, bail}; use camino::{Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; @@ -78,7 +78,7 @@ use crate::tenant::vectored_blob_io::{ use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode}; -use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 42aa247556..727a6cbc4a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -35,7 +35,7 @@ use std::sync::atomic::AtomicU64; use crate::tenant::blob_io::BlobWriterError; -use anyhow::{Context, Result, bail, ensure}; +use anyhow::{Context, Result, bail}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; @@ -76,7 +76,7 @@ use crate::tenant::vectored_blob_io::{ use crate::virtual_file::TempVirtualFile; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode}; -use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile}; +use crate::virtual_file::{self, IoBuffer, IoBufferMut, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; /// diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 1112a5330b..01090c5a93 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -300,6 +300,7 @@ pub(crate) fn log_compaction_error( let level = match err { e if e.is_cancel() => return, ShuttingDown => return, + Cancelled => return, Offload(_) => Level::ERROR, AlreadyRunning(_) => Level::ERROR, CollectKeySpaceError(_) => Level::ERROR, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4ab532348d..e3113ad889 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -775,6 +775,21 @@ impl From for CreateImageLayersError { } } +impl From + for CreateImageLayersError +{ + fn from(err: crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError) -> Self { + match err { + crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Cancelled => { + Self::Cancelled + } + crate::tenant::storage_layer::batch_split_writer::BatchSplitWriterError::Other(err) => { + Self::Other(err) + } + } + } +} + #[derive(thiserror::Error, Debug, Clone)] pub(crate) enum FlushLayerError { /// Timeline cancellation token was cancelled @@ -2043,6 +2058,9 @@ impl Timeline { Err(CompactionError::ShuttingDown) => { // Covered by the `Err(e) if e.is_cancel()` branch. } + Err(CompactionError::Cancelled) => { + // Covered by the `Err(e) if e.is_cancel()` branch. + } Err(CompactionError::AlreadyRunning(_)) => { // Covered by the `Err(e) if e.is_cancel()` branch. } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 0e5f0a3b6f..9d20990541 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -15,8 +15,6 @@ use super::{ GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration, Timeline, }; -use crate::tenant::storage_layer::delta_layer::DeltaLayerWriterError; -use crate::tenant::storage_layer::image_layer::ImageLayerWriterError; use crate::tenant::timeline::DeltaEntry; use crate::walredo::RedoAttemptType; @@ -1549,10 +1547,7 @@ impl Timeline { ctx, ) .await - .map_err(|e| match e { - ImageLayerWriterError::Cancelled => CompactionError::Cancelled, - ImageLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; // Safety of layer rewrites: // - We are writing to a different local file path than we are reading from, so the old Layer @@ -1574,10 +1569,10 @@ impl Timeline { .await?; if keys_written > 0 { - let (desc, path) = image_layer_writer.finish(ctx).await.map_err(|e| match e { - ImageLayerWriterError::Cancelled => CompactionError::Cancelled, - ImageLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + let (desc, path) = image_layer_writer + .finish(ctx) + .await + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; let new_layer = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; info!(layer=%new_layer, "rewrote layer, {} -> {} bytes", @@ -2145,10 +2140,7 @@ impl Timeline { .unwrap() .finish(prev_key.unwrap().next(), ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; @@ -2207,10 +2199,7 @@ impl Timeline { .unwrap() .put_value(key, lsn, value, ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; } else { let owner = self.shard_identity.get_shard_number(&key); @@ -2228,10 +2217,7 @@ impl Timeline { let (desc, path) = writer .finish(prev_key.unwrap().next(), ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; let new_delta = Layer::finish_creating(self.conf, self, desc, &path) .map_err(CompactionError::Other)?; new_layers.push(new_delta); @@ -3696,10 +3682,7 @@ impl Timeline { let (desc, path) = delta_writer_before .finish(job_desc.compaction_key_range.start, ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; let layer = Layer::finish_creating(self.conf, self, desc, &path) .context("failed to finish creating delta layer") .map_err(CompactionError::Other)?; @@ -3709,10 +3692,7 @@ impl Timeline { let (desc, path) = delta_writer_after .finish(key.key_range.end, ctx) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })?; + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))?; let layer = Layer::finish_creating(self.conf, self, desc, &path) .context("failed to finish creating delta layer") .map_err(CompactionError::Other)?; @@ -3731,10 +3711,7 @@ impl Timeline { writer .finish_with_discard_fn(self, ctx, end_key, discard) .await - .map_err(|e| match e { - ImageLayerWriterError::Cancelled => CompactionError::Cancelled, - ImageLayerWriterError::Other(err) => CompactionError::Other(err), - })? + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))? } else { drop(writer); Vec::new() @@ -3747,10 +3724,7 @@ impl Timeline { delta_layer_writer .finish_with_discard_fn(self, ctx, discard) .await - .map_err(|e| match e { - DeltaLayerWriterError::Cancelled => CompactionError::Cancelled, - DeltaLayerWriterError::Other(err) => CompactionError::Other(err), - })? + .map_err(|e| CompactionError::Other(anyhow::anyhow!(e)))? } else { drop(delta_layer_writer); Vec::new() @@ -4278,12 +4252,7 @@ impl TimelineAdaptor { let (desc, path) = unfinished_image_layer .finish(ctx) .await - .map_err(|e| match e { - ImageLayerWriterError::Cancelled => { - CreateImageLayersError::Other(anyhow::anyhow!("flush task cancelled")) - } - ImageLayerWriterError::Other(err) => CreateImageLayersError::Other(err), - })?; + .map_err(|e| CreateImageLayersError::Other(anyhow::anyhow!(e)))?; let image_layer = Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?; self.new_images.push(image_layer);