From aac1f8efb1086b6db7c599c26912920f08d479b3 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 9 Jul 2025 10:41:36 +0200 Subject: [PATCH] refactor(compaction): eliminate `CompactionError::CollectKeyspaceError` variant (#12517) The only differentiated handling of it is for `is_critical`, which in turn is a `matches!()` on several variants of the `enum CollectKeyspaceError` which is the value contained insided `CompactionError::CollectKeyspaceError`. This PR introduces a new error for `repartition()`, allowing its immediate callers to inspect it like `is_critical` did. A drive-by fix is more precise classification of WaitLsnError::BadState when mapping to `tonic::Status`. refs - https://databricks.atlassian.net/browse/LKB-182 --- pageserver/src/http/routes.rs | 1 - pageserver/src/pgdatadir_mapping.rs | 17 +++ pageserver/src/tenant.rs | 7 - pageserver/src/tenant/tasks.rs | 3 +- pageserver/src/tenant/timeline.rs | 129 ++++++++++++------- pageserver/src/tenant/timeline/compaction.rs | 43 +++++-- 6 files changed, 130 insertions(+), 70 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0e40dbcd15..2995a37089 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2502,7 +2502,6 @@ async fn timeline_checkpoint_handler( .map_err(|e| match e { CompactionError::ShuttingDown => ApiError::ShuttingDown, - CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)), CompactionError::Other(e) => ApiError::InternalServerError(e), } )?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 31f38d485f..8532a6938f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -141,6 +141,23 @@ pub(crate) enum CollectKeySpaceError { Cancelled, } +impl CollectKeySpaceError { + pub(crate) fn is_cancel(&self) -> bool { + match self { + CollectKeySpaceError::Decode(_) => false, + CollectKeySpaceError::PageRead(e) => e.is_cancel(), + CollectKeySpaceError::Cancelled => true, + } + } + pub(crate) fn into_anyhow(self) -> anyhow::Error { + match self { + CollectKeySpaceError::Decode(e) => anyhow::Error::new(e), + CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e), + CollectKeySpaceError::Cancelled => anyhow::Error::new(self), + } + } +} + impl From for CollectKeySpaceError { fn from(err: PageReconstructError) -> Self { match err { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b0969a96c1..f576119db8 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3324,13 +3324,6 @@ impl TenantShard { match err { err if err.is_cancel() => {} CompactionError::ShuttingDown => (), - CompactionError::CollectKeySpaceError(err) => { - // CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch. - self.compaction_circuit_breaker - .lock() - .unwrap() - .fail(&CIRCUIT_BREAKERS_BROKEN, err); - } CompactionError::Other(err) => { self.compaction_circuit_breaker .lock() diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 2ae6b7ff3d..bcece5589a 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -318,7 +318,6 @@ pub(crate) fn log_compaction_error( let level = match err { e if e.is_cancel() => return, ShuttingDown => return, - CollectKeySpaceError(_) => Level::ERROR, _ if task_cancelled => Level::INFO, Other(err) => { let root_cause = err.root_cause(); @@ -328,7 +327,7 @@ pub(crate) fn log_compaction_error( .is_some_and(|e| e.is_stopping()); let timeline = root_cause .downcast_ref::() - .is_some_and(|e| e.is_stopping()); + .is_some_and(|e| e.is_cancel()); let buffered_writer_flush_task_canelled = root_cause .downcast_ref::() .is_some_and(|e| e.is_cancel()); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4a08172337..6088f40669 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -585,6 +585,28 @@ pub(crate) enum PageReconstructError { MissingKey(Box), } +impl PageReconstructError { + pub(crate) fn is_cancel(&self) -> bool { + match self { + PageReconstructError::Other(_) => false, + PageReconstructError::AncestorLsnTimeout(e) => e.is_cancel(), + PageReconstructError::Cancelled => true, + PageReconstructError::WalRedo(_) => false, + PageReconstructError::MissingKey(_) => false, + } + } + #[allow(dead_code)] // we use the is_cancel + into_anyhow pattern in quite a few places, this one will follow soon enough + pub(crate) fn into_anyhow(self) -> anyhow::Error { + match self { + PageReconstructError::Other(e) => e, + PageReconstructError::AncestorLsnTimeout(e) => e.into_anyhow(), + PageReconstructError::Cancelled => anyhow::Error::new(self), + PageReconstructError::WalRedo(e) => e, + PageReconstructError::MissingKey(_) => anyhow::Error::new(self), + } + } +} + impl From for PageReconstructError { fn from(value: anyhow::Error) -> Self { // with walingest.rs many PageReconstructError are wrapped in as anyhow::Error @@ -738,17 +760,6 @@ impl std::fmt::Display for MissingKeyError { } } -impl PageReconstructError { - /// Returns true if this error indicates a tenant/timeline shutdown alike situation - pub(crate) fn is_stopping(&self) -> bool { - use PageReconstructError::*; - match self { - Cancelled => true, - Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false, - } - } -} - #[derive(thiserror::Error, Debug)] pub(crate) enum CreateImageLayersError { #[error("timeline shutting down")] @@ -951,13 +962,35 @@ pub enum WaitLsnError { Timeout(String), } +impl WaitLsnError { + pub(crate) fn is_cancel(&self) -> bool { + match self { + WaitLsnError::Shutdown => true, + WaitLsnError::BadState(timeline_state) => match timeline_state { + TimelineState::Loading => false, + TimelineState::Active => false, + TimelineState::Stopping => true, + TimelineState::Broken { .. } => false, + }, + WaitLsnError::Timeout(_) => false, + } + } + pub(crate) fn into_anyhow(self) -> anyhow::Error { + match self { + WaitLsnError::Shutdown => anyhow::Error::new(self), + WaitLsnError::BadState(_) => anyhow::Error::new(self), + WaitLsnError::Timeout(_) => anyhow::Error::new(self), + } + } +} + impl From for tonic::Status { fn from(err: WaitLsnError) -> Self { use tonic::Code; - let code = match &err { - WaitLsnError::Timeout(_) => Code::Internal, - WaitLsnError::BadState(_) => Code::Internal, - WaitLsnError::Shutdown => Code::Unavailable, + let code = if err.is_cancel() { + Code::Unavailable + } else { + Code::Internal }; tonic::Status::new(code, err.to_string()) } @@ -1084,6 +1117,26 @@ enum ImageLayerCreationOutcome { Skip, } +enum RepartitionError { + Other(anyhow::Error), + CollectKeyspace(CollectKeySpaceError), +} + +impl RepartitionError { + fn is_cancel(&self) -> bool { + match self { + RepartitionError::Other(_) => false, + RepartitionError::CollectKeyspace(e) => e.is_cancel(), + } + } + fn into_anyhow(self) -> anyhow::Error { + match self { + RepartitionError::Other(e) => e, + RepartitionError::CollectKeyspace(e) => e.into_anyhow(), + } + } +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -2070,10 +2123,6 @@ impl Timeline { Err(CompactionError::Other(_)) => { self.compaction_failed.store(true, AtomicOrdering::Relaxed) } - Err(CompactionError::CollectKeySpaceError(_)) => { - // Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch. - self.compaction_failed.store(true, AtomicOrdering::Relaxed) - } }; result @@ -4963,7 +5012,7 @@ impl Timeline { ctx, ) .await - .map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?; + .map_err(|e| FlushLayerError::from_anyhow(self, e.into_anyhow()))?; if self.cancel.is_cancelled() { return Err(FlushLayerError::Cancelled); @@ -5213,18 +5262,18 @@ impl Timeline { partition_size: u64, flags: EnumSet, ctx: &RequestContext, - ) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> { + ) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), RepartitionError> { let Ok(mut guard) = self.partitioning.try_write_guard() else { // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline. // The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()` // and hence before the compaction task starts. - return Err(CompactionError::Other(anyhow!( + return Err(RepartitionError::Other(anyhow!( "repartition() called concurrently" ))); }; let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read(); if lsn < *partition_lsn { - return Err(CompactionError::Other(anyhow!( + return Err(RepartitionError::Other(anyhow!( "repartition() called with LSN going backwards, this should not happen" ))); } @@ -5245,7 +5294,10 @@ impl Timeline { )); } - let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?; + let (dense_ks, sparse_ks) = self + .collect_keyspace(lsn, ctx) + .await + .map_err(RepartitionError::CollectKeyspace)?; let dense_partitioning = dense_ks.partition( &self.shard_identity, partition_size, @@ -6010,9 +6062,6 @@ impl Drop for Timeline { pub(crate) enum CompactionError { #[error("The timeline or pageserver is shutting down")] ShuttingDown, - /// Compaction cannot be done right now; page reconstruction and so on. - #[error("Failed to collect keyspace: {0}")] - CollectKeySpaceError(#[from] CollectKeySpaceError), #[error(transparent)] Other(anyhow::Error), } @@ -6020,27 +6069,15 @@ pub(crate) enum CompactionError { impl CompactionError { /// Errors that can be ignored, i.e., cancel and shutdown. pub fn is_cancel(&self) -> bool { - matches!( - self, - Self::ShuttingDown - | Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled) - | Self::CollectKeySpaceError(CollectKeySpaceError::PageRead( - PageReconstructError::Cancelled - )) - ) + matches!(self, Self::ShuttingDown) } - /// Critical errors that indicate data corruption. - pub fn is_critical(&self) -> bool { - matches!( - self, - Self::CollectKeySpaceError( - CollectKeySpaceError::Decode(_) - | CollectKeySpaceError::PageRead( - PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_), - ) - ) - ) + pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self { + if err.is_cancel() { + Self::ShuttingDown + } else { + Self::Other(err.into_anyhow()) + } } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 2c0b98c1e2..c263df1eb2 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -16,7 +16,8 @@ use super::{ Timeline, }; -use crate::tenant::timeline::DeltaEntry; +use crate::pgdatadir_mapping::CollectKeySpaceError; +use crate::tenant::timeline::{DeltaEntry, RepartitionError}; use crate::walredo::RedoAttemptType; use anyhow::{Context, anyhow}; use bytes::Bytes; @@ -64,7 +65,7 @@ use crate::tenant::timeline::{ DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer, ResidentLayer, drop_layer_manager_rlock, }; -use crate::tenant::{DeltaLayer, MaybeOffloaded}; +use crate::tenant::{DeltaLayer, MaybeOffloaded, PageReconstructError}; use crate::virtual_file::{MaybeFatalIo, VirtualFile}; /// Maximum number of deltas before generating an image layer in bottom-most compaction. @@ -572,7 +573,7 @@ impl GcCompactionQueue { match res { Ok(res) => Ok(res), Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown), - Err(CompactionError::CollectKeySpaceError(_) | CompactionError::Other(_)) => { + Err(CompactionError::Other(_)) => { // There are some cases where traditional gc might collect some layer // files causing gc-compaction cannot read the full history of the key. // This needs to be resolved in the long-term by improving the compaction @@ -1417,22 +1418,33 @@ impl Timeline { } // Suppress errors when cancelled. - Err(_) if self.cancel.is_cancelled() => {} + // + // Log other errors but continue. Failure to repartition is normal, if the timeline was just created + // as an empty timeline. Also in unit tests, when we use the timeline as a simple + // key-value store, ignoring the datadir layout. Log the error but continue. + // + // TODO: + // 1. shouldn't we return early here if we observe cancellation + // 2. Experiment: can we stop checking self.cancel here? + Err(_) if self.cancel.is_cancelled() => {} // TODO: try how we fare removing this branch Err(err) if err.is_cancel() => {} - - // Alert on critical errors that indicate data corruption. - Err(err) if err.is_critical() => { + Err(RepartitionError::CollectKeyspace( + e @ CollectKeySpaceError::Decode(_) + | e @ CollectKeySpaceError::PageRead( + PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_), + ), + )) => { + // Alert on critical errors that indicate data corruption. critical_timeline!( self.tenant_shard_id, self.timeline_id, - "could not compact, repartitioning keyspace failed: {err:?}" + "could not compact, repartitioning keyspace failed: {e:?}" ); } - - // Log other errors. No partitioning? This is normal, if the timeline was just created - // as an empty timeline. Also in unit tests, when we use the timeline as a simple - // key-value store, ignoring the datadir layout. Log the error but continue. - Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"), + Err(e) => error!( + "could not compact, repartitioning keyspace failed: {:?}", + e.into_anyhow() + ), }; let partition_count = self.partitioning.read().0.0.parts.len(); @@ -2518,7 +2530,10 @@ impl Timeline { return Err(CompactionError::ShuttingDown); } - let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?; + let (dense_ks, _sparse_ks) = self + .collect_keyspace(end_lsn, ctx) + .await + .map_err(CompactionError::from_collect_keyspace)?; // TODO(chi): ignore sparse_keyspace for now, compact it in the future. let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));