diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3612686b5d..767bba49e2 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -79,8 +79,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa use crate::tenant::timeline::layer_manager::LayerManagerLockHolder; use crate::tenant::timeline::offload::{OffloadError, offload_timeline}; use crate::tenant::timeline::{ - CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline, - WaitLsnTimeout, WaitLsnWaiter, import_pgdata, + CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout, + WaitLsnWaiter, import_pgdata, }; use crate::tenant::{ GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError, @@ -2500,9 +2500,10 @@ async fn timeline_checkpoint_handler( .compact(&cancel, flags, &ctx) .await .map_err(|e| - match e { - CompactionError::ShuttingDown => ApiError::ShuttingDown, - CompactionError::Other(e) => ApiError::InternalServerError(e), + if e.is_cancel() { + ApiError::ShuttingDown + } else { + ApiError::InternalServerError(e.into_anyhow()) } )?; } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f576119db8..240ba36236 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3291,7 +3291,7 @@ impl TenantShard { // Ignore this, we likely raced with unarchival. OffloadError::NotArchived => Ok(()), OffloadError::AlreadyInProgress => Ok(()), - OffloadError::Cancelled => Err(CompactionError::ShuttingDown), + OffloadError::Cancelled => Err(CompactionError::new_cancelled()), // don't break the anyhow chain OffloadError::Other(err) => Err(CompactionError::Other(err)), })?; @@ -3321,16 +3321,13 @@ impl TenantShard { /// Trips the compaction circuit breaker if appropriate. pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) { - match err { - err if err.is_cancel() => {} - CompactionError::ShuttingDown => (), - CompactionError::Other(err) => { - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, err); - } + if err.is_cancel() { + return; } + self.compaction_circuit_breaker + .lock() + .unwrap() + .fail(&CIRCUIT_BREAKERS_BROKEN, err); } /// Cancel scheduled compaction tasks diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index bcece5589a..08fc7d61a5 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -17,17 +17,14 @@ use tracing::*; use utils::backoff::exponential_backoff_duration; use utils::completion::Barrier; use utils::pausable_failpoint; -use utils::sync::gate::GateError; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS}; use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind}; -use crate::tenant::blob_io::WriteBlobError; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::compaction::CompactionOutcome; use crate::tenant::{TenantShard, TenantState}; -use crate::virtual_file::owned_buffers_io::write::FlushTaskError; /// Semaphore limiting concurrent background tasks (across all tenants). /// @@ -310,45 +307,12 @@ pub(crate) fn log_compaction_error( task_cancelled: bool, degrade_to_warning: bool, ) { - use CompactionError::*; + let is_cancel = err.is_cancel(); - use crate::tenant::PageReconstructError; - use crate::tenant::upload_queue::NotInitialized; - - let level = match err { - e if e.is_cancel() => return, - ShuttingDown => return, - _ if task_cancelled => Level::INFO, - Other(err) => { - let root_cause = err.root_cause(); - - let upload_queue = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_stopping()); - let timeline = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let buffered_writer_flush_task_canelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let write_blob_cancelled = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let gate_closed = root_cause - .downcast_ref::() - .is_some_and(|e| e.is_cancel()); - let is_stopping = upload_queue - || timeline - || buffered_writer_flush_task_canelled - || write_blob_cancelled - || gate_closed; - - if is_stopping { - Level::INFO - } else { - Level::ERROR - } - } + let level = if is_cancel || task_cancelled { + Level::INFO + } else { + Level::ERROR }; if let Some((error_count, sleep_duration)) = retry_info { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6088f40669..0a026d288e 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1002,7 +1002,7 @@ impl From for tonic::Status { impl From for CompactionError { fn from(e: CreateImageLayersError) -> Self { match e { - CreateImageLayersError::Cancelled => CompactionError::ShuttingDown, + CreateImageLayersError::Cancelled => CompactionError::new_cancelled(), CreateImageLayersError::Other(e) => { CompactionError::Other(e.context("create image layers")) } @@ -2117,12 +2117,7 @@ impl Timeline { match &result { Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed), Err(e) if e.is_cancel() => {} - Err(CompactionError::ShuttingDown) => { - // Covered by the `Err(e) if e.is_cancel()` branch. - } - Err(CompactionError::Other(_)) => { - self.compaction_failed.store(true, AtomicOrdering::Relaxed) - } + Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed), }; result @@ -6057,26 +6052,88 @@ impl Drop for Timeline { } } -/// Top-level failure to compact. -#[derive(Debug, thiserror::Error)] -pub(crate) enum CompactionError { - #[error("The timeline or pageserver is shutting down")] - ShuttingDown, - #[error(transparent)] - Other(anyhow::Error), -} +pub(crate) use compaction_error::CompactionError; +/// In a private mod to enforce that [`CompactionError::is_cancel`] is used +/// instead of `match`ing on [`CompactionError::ShuttingDown`]. +mod compaction_error { + use utils::sync::gate::GateError; -impl CompactionError { - /// Errors that can be ignored, i.e., cancel and shutdown. - pub fn is_cancel(&self) -> bool { - matches!(self, Self::ShuttingDown) + use crate::{ + pgdatadir_mapping::CollectKeySpaceError, + tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized}, + virtual_file::owned_buffers_io::write::FlushTaskError, + }; + + /// Top-level failure to compact. Use [`Self::is_cancel`]. + #[derive(Debug, thiserror::Error)] + pub(crate) enum CompactionError { + /// Use [`Self::is_cancel`] instead of checking for this variant. + #[error("The timeline or pageserver is shutting down")] + #[allow(private_interfaces)] + ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`]. + #[error(transparent)] + Other(anyhow::Error), } - pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { - if err.is_cancel() { - Self::ShuttingDown - } else { - Self::Other(err.into_anyhow()) + #[derive(Debug)] + struct ForbidMatching; + + impl CompactionError { + pub fn new_cancelled() -> Self { + Self::ShuttingDown(ForbidMatching) + } + /// Errors that can be ignored, i.e., cancel and shutdown. + pub fn is_cancel(&self) -> bool { + let other = match self { + CompactionError::ShuttingDown(_) => return true, + CompactionError::Other(other) => other, + }; + + // The write path of compaction in particular often lacks differentiated + // handling errors stemming from cancellation from other errors. + // So, if requested, we also check the ::Other variant by downcasting. + // The list below has been found empirically from flaky tests and production logs. + // The process is simple: on ::Other(), compaction will print the enclosed + // anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the + // line where the write path / compaction code does undifferentiated error handling + // from a non-anyhow type to an anyhow type. Add the type to the list of downcasts + // below, following the same is_cancel() pattern. + + let root_cause = other.root_cause(); + + let upload_queue = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_stopping()); + let timeline = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let buffered_writer_flush_task_canelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let write_blob_cancelled = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + let gate_closed = root_cause + .downcast_ref::() + .is_some_and(|e| e.is_cancel()); + upload_queue + || timeline + || buffered_writer_flush_task_canelled + || write_blob_cancelled + || gate_closed + } + pub fn into_anyhow(self) -> anyhow::Error { + match self { + CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self), + CompactionError::Other(e) => e, + } + } + pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { + if err.is_cancel() { + Self::new_cancelled() + } else { + Self::Other(err.into_anyhow()) + } } } } @@ -6088,7 +6145,7 @@ impl From for CompactionError { CompactionError::Other(anyhow::anyhow!(value)) } super::upload_queue::NotInitialized::ShuttingDown - | super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown, + | super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(), } } } @@ -6098,7 +6155,7 @@ impl From for CompactionError { match e { super::storage_layer::layer::DownloadError::TimelineShutdown | super::storage_layer::layer::DownloadError::DownloadCancelled => { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads | super::storage_layer::layer::DownloadError::DownloadRequired @@ -6117,14 +6174,14 @@ impl From for CompactionError { impl From for CompactionError { fn from(_: layer_manager::Shutdown) -> Self { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } } impl From for CompactionError { fn from(e: super::storage_layer::errors::PutError) -> Self { if e.is_cancel() { - CompactionError::ShuttingDown + CompactionError::new_cancelled() } else { CompactionError::Other(e.into_anyhow()) } @@ -6223,7 +6280,7 @@ impl Timeline { let mut guard = tokio::select! { guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard, _ = self.cancel.cancelled() => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index c263df1eb2..18a0ca852d 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -572,8 +572,8 @@ impl GcCompactionQueue { } match res { Ok(res) => Ok(res), - Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown), - Err(CompactionError::Other(_)) => { + Err(e) if e.is_cancel() => Err(e), + Err(_) => { // There are some cases where traditional gc might collect some layer // files causing gc-compaction cannot read the full history of the key. // This needs to be resolved in the long-term by improving the compaction @@ -1260,7 +1260,7 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let target_file_size = self.get_checkpoint_distance(); @@ -1624,7 +1624,7 @@ impl Timeline { for (i, layer) in layers_to_rewrite.into_iter().enumerate() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total); @@ -1722,7 +1722,7 @@ impl Timeline { Ok(()) => {}, Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)), Err(WaitCompletionError::UploadQueueShutDownOrStopped) => { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } }, // Don't wait if there's L0 compaction to do. We don't need to update the outcome @@ -1985,7 +1985,7 @@ impl Timeline { let mut all_keys = Vec::new(); for l in deltas_to_compact.iter() { if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; let keys = delta @@ -2078,7 +2078,7 @@ impl Timeline { stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now(); if self.cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); @@ -2186,7 +2186,7 @@ impl Timeline { // avoid hitting the cancellation token on every key. in benches, we end up // shuffling an order of million keys per layer, this means we'll check it // around tens of times per layer. - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let same_key = prev_key == Some(key); @@ -2271,7 +2271,7 @@ impl Timeline { if writer.is_none() { if self.cancel.is_cancelled() { // to be somewhat responsive to cancellation, check for each new layer - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } // Create writer if not initiaized yet writer = Some( @@ -2527,7 +2527,7 @@ impl Timeline { // Is the timeline being deleted? if self.is_stopping() { trace!("Dropping out of compaction on timeline shutdown"); - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let (dense_ks, _sparse_ks) = self @@ -3189,7 +3189,7 @@ impl Timeline { let gc_lock = async { tokio::select! { guard = self.gc_lock.lock() => Ok(guard), - _ = cancel.cancelled() => Err(CompactionError::ShuttingDown), + _ = cancel.cancelled() => Err(CompactionError::new_cancelled()), } }; @@ -3462,7 +3462,7 @@ impl Timeline { } total_layer_size += layer.layer_desc().file_size; if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0 && self @@ -3609,7 +3609,7 @@ impl Timeline { } if cancel.is_cancelled() { - return Err(CompactionError::ShuttingDown); + return Err(CompactionError::new_cancelled()); } let should_yield = yield_for_l0