mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
impr(compaction): unify checking of CompactionError for cancellation reason (#12392)
There are a couple of places that call `CompactionError::is_cancel` but don't check the `::Other` variant via downcasting for root cause being cancellation. The only place that does it is `log_compaction_error`. It's sad we have to do it, but, until we get around cleaning up all the culprits, a step forward is to unify the behavior so that all places that inspect a `CompactionError` for cancellation reason follow the same behavior. Thus, this PR ... - moves the downcasting checks against the `::Other` variant from `log_compaction_error` into `is_cancel()` and - enforces via type system that `.is_cancel()` is used to check whether a CompactionError is due to cancellation (matching on the `CompactionError::ShuttingDown` will cause a compile-time error). I don't think there's a _serious_ case right now where matching instead of using `is_cancel` causes problems. The worst case I could find is the circuit breaker and `compaction_failed`, which don't really matter if we're shutting down the timeline anyway. But it's unaesthetic and might cause log/alert noise down the line, so, this PR fixes that at least. Refs - https://databricks.atlassian.net/browse/LKB-182 - slack conversation about this PR: https://databricks.slack.com/archives/C09254R641L/p1751284317955159
This commit is contained in:
committed by
GitHub
parent
0b639ba608
commit
2edd59aefb
@@ -79,8 +79,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa
|
||||
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
|
||||
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
|
||||
use crate::tenant::timeline::{
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
|
||||
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
|
||||
CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout,
|
||||
WaitLsnWaiter, import_pgdata,
|
||||
};
|
||||
use crate::tenant::{
|
||||
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
|
||||
@@ -2500,9 +2500,10 @@ async fn timeline_checkpoint_handler(
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.await
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
if e.is_cancel() {
|
||||
ApiError::ShuttingDown
|
||||
} else {
|
||||
ApiError::InternalServerError(e.into_anyhow())
|
||||
}
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -3291,7 +3291,7 @@ impl TenantShard {
|
||||
// Ignore this, we likely raced with unarchival.
|
||||
OffloadError::NotArchived => Ok(()),
|
||||
OffloadError::AlreadyInProgress => Ok(()),
|
||||
OffloadError::Cancelled => Err(CompactionError::ShuttingDown),
|
||||
OffloadError::Cancelled => Err(CompactionError::new_cancelled()),
|
||||
// don't break the anyhow chain
|
||||
OffloadError::Other(err) => Err(CompactionError::Other(err)),
|
||||
})?;
|
||||
@@ -3321,16 +3321,13 @@ impl TenantShard {
|
||||
|
||||
/// Trips the compaction circuit breaker if appropriate.
|
||||
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
if err.is_cancel() {
|
||||
return;
|
||||
}
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
|
||||
/// Cancel scheduled compaction tasks
|
||||
|
||||
@@ -17,17 +17,14 @@ use tracing::*;
|
||||
use utils::backoff::exponential_backoff_duration;
|
||||
use utils::completion::Barrier;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::GateError;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::compaction::CompactionOutcome;
|
||||
use crate::tenant::{TenantShard, TenantState};
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
|
||||
/// Semaphore limiting concurrent background tasks (across all tenants).
|
||||
///
|
||||
@@ -310,45 +307,12 @@ pub(crate) fn log_compaction_error(
|
||||
task_cancelled: bool,
|
||||
degrade_to_warning: bool,
|
||||
) {
|
||||
use CompactionError::*;
|
||||
let is_cancel = err.is_cancel();
|
||||
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let is_stopping = upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed;
|
||||
|
||||
if is_stopping {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
}
|
||||
}
|
||||
let level = if is_cancel || task_cancelled {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
};
|
||||
|
||||
if let Some((error_count, sleep_duration)) = retry_info {
|
||||
|
||||
@@ -1002,7 +1002,7 @@ impl From<WaitLsnError> for tonic::Status {
|
||||
impl From<CreateImageLayersError> for CompactionError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
|
||||
CreateImageLayersError::Cancelled => CompactionError::new_cancelled(),
|
||||
CreateImageLayersError::Other(e) => {
|
||||
CompactionError::Other(e.context("create image layers"))
|
||||
}
|
||||
@@ -2117,12 +2117,7 @@ impl Timeline {
|
||||
match &result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(e) if e.is_cancel() => {}
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Other(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed),
|
||||
};
|
||||
|
||||
result
|
||||
@@ -6057,26 +6052,88 @@ impl Drop for Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
pub(crate) use compaction_error::CompactionError;
|
||||
/// In a private mod to enforce that [`CompactionError::is_cancel`] is used
|
||||
/// instead of `match`ing on [`CompactionError::ShuttingDown`].
|
||||
mod compaction_error {
|
||||
use utils::sync::gate::GateError;
|
||||
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(self, Self::ShuttingDown)
|
||||
use crate::{
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized},
|
||||
virtual_file::owned_buffers_io::write::FlushTaskError,
|
||||
};
|
||||
|
||||
/// Top-level failure to compact. Use [`Self::is_cancel`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CompactionError {
|
||||
/// Use [`Self::is_cancel`] instead of checking for this variant.
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
#[allow(private_interfaces)]
|
||||
ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`].
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::ShuttingDown
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
#[derive(Debug)]
|
||||
struct ForbidMatching;
|
||||
|
||||
impl CompactionError {
|
||||
pub fn new_cancelled() -> Self {
|
||||
Self::ShuttingDown(ForbidMatching)
|
||||
}
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
let other = match self {
|
||||
CompactionError::ShuttingDown(_) => return true,
|
||||
CompactionError::Other(other) => other,
|
||||
};
|
||||
|
||||
// The write path of compaction in particular often lacks differentiated
|
||||
// handling errors stemming from cancellation from other errors.
|
||||
// So, if requested, we also check the ::Other variant by downcasting.
|
||||
// The list below has been found empirically from flaky tests and production logs.
|
||||
// The process is simple: on ::Other(), compaction will print the enclosed
|
||||
// anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the
|
||||
// line where the write path / compaction code does undifferentiated error handling
|
||||
// from a non-anyhow type to an anyhow type. Add the type to the list of downcasts
|
||||
// below, following the same is_cancel() pattern.
|
||||
|
||||
let root_cause = other.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self),
|
||||
CompactionError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::new_cancelled()
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6088,7 +6145,7 @@ impl From<super::upload_queue::NotInitialized> for CompactionError {
|
||||
CompactionError::Other(anyhow::anyhow!(value))
|
||||
}
|
||||
super::upload_queue::NotInitialized::ShuttingDown
|
||||
| super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown,
|
||||
| super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6098,7 +6155,7 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
|
||||
match e {
|
||||
super::storage_layer::layer::DownloadError::TimelineShutdown
|
||||
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
}
|
||||
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
|
||||
| super::storage_layer::layer::DownloadError::DownloadRequired
|
||||
@@ -6117,14 +6174,14 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
|
||||
|
||||
impl From<layer_manager::Shutdown> for CompactionError {
|
||||
fn from(_: layer_manager::Shutdown) -> Self {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CompactionError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
} else {
|
||||
CompactionError::Other(e.into_anyhow())
|
||||
}
|
||||
@@ -6223,7 +6280,7 @@ impl Timeline {
|
||||
let mut guard = tokio::select! {
|
||||
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -572,8 +572,8 @@ impl GcCompactionQueue {
|
||||
}
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(CompactionError::Other(_)) => {
|
||||
Err(e) if e.is_cancel() => Err(e),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
@@ -1260,7 +1260,7 @@ impl Timeline {
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
trace!("Dropping out of compaction on timeline shutdown");
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
@@ -1624,7 +1624,7 @@ impl Timeline {
|
||||
|
||||
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
|
||||
@@ -1722,7 +1722,7 @@ impl Timeline {
|
||||
Ok(()) => {},
|
||||
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
|
||||
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
},
|
||||
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
|
||||
@@ -1985,7 +1985,7 @@ impl Timeline {
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
let keys = delta
|
||||
@@ -2078,7 +2078,7 @@ impl Timeline {
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
@@ -2186,7 +2186,7 @@ impl Timeline {
|
||||
// avoid hitting the cancellation token on every key. in benches, we end up
|
||||
// shuffling an order of million keys per layer, this means we'll check it
|
||||
// around tens of times per layer.
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let same_key = prev_key == Some(key);
|
||||
@@ -2271,7 +2271,7 @@ impl Timeline {
|
||||
if writer.is_none() {
|
||||
if self.cancel.is_cancelled() {
|
||||
// to be somewhat responsive to cancellation, check for each new layer
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
// Create writer if not initiaized yet
|
||||
writer = Some(
|
||||
@@ -2527,7 +2527,7 @@ impl Timeline {
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
trace!("Dropping out of compaction on timeline shutdown");
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let (dense_ks, _sparse_ks) = self
|
||||
@@ -3189,7 +3189,7 @@ impl Timeline {
|
||||
let gc_lock = async {
|
||||
tokio::select! {
|
||||
guard = self.gc_lock.lock() => Ok(guard),
|
||||
_ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
|
||||
_ = cancel.cancelled() => Err(CompactionError::new_cancelled()),
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3462,7 +3462,7 @@ impl Timeline {
|
||||
}
|
||||
total_layer_size += layer.layer_desc().file_size;
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
let should_yield = yield_for_l0
|
||||
&& self
|
||||
@@ -3609,7 +3609,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let should_yield = yield_for_l0
|
||||
|
||||
Reference in New Issue
Block a user