Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
318700600d refactor: lift inspection of CompactionError::Other(x) => x.root_cause() into CompactionError::is_cancel
There are a couple of places that call CompactionError::is_cancel but
don't check the Other variant for root cause. But they should, because
some cancellations are observed by code that results in ::Other errors.

I don't think there's a _serious_ case where this causes problems.
The worst case one is the circuit breaker which we do currently trip
on ::Other errors that are due to cancellation. Tripped circuit breaker
on shutting down timelines doesn't really matter practically, but
it's unaesthetic and might cause noise down the line, so, this
PR fixes that at least.

In any way, this PR forces future callers of is_cancel() to explicitly
recognize the suboptimal state of affairs wrt error handling in compaction,
thereby hopefully preventing errors of this kind from creeping in.

(The _right_ solution for the compaction code probably is the approach
I took in #11853: keep using anyhow but have a unified way / pattern
of bubbling up cancellation, so that we don't need to perform the downcast
trickery).
2025-06-27 14:10:24 +02:00
Christian Schwarz
75ef17e2f7 refactor: force explicit mapping to CreateImageLayersError::Other 2025-06-27 13:31:49 +02:00
Christian Schwarz
93026b7899 address most types of error that happen during creation of delta layers 2025-06-27 13:17:19 +02:00
6 changed files with 110 additions and 77 deletions

View File

@@ -86,6 +86,14 @@ pub enum GateError {
GateClosed,
}
impl GateError {
pub fn is_cancel(&self) -> bool {
match self {
GateError::GateClosed => true,
}
}
}
impl Default for Gate {
fn default() -> Self {
Self {

View File

@@ -99,6 +99,7 @@ use crate::tenant::remote_timeline_client::{
INITDB_PATH, MaybeDeletedIndexPart, remote_initdb_archive_path,
};
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer};
use crate::tenant::timeline::CheckOtherForCancel;
use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
@@ -3261,11 +3262,11 @@ impl TenantShard {
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
if err.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) {
return;
}
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::ShuttingDown => unreachable!("is_cancel"),
CompactionError::Offload(_) => {}
CompactionError::CollectKeySpaceError(err) => {
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
@@ -3280,7 +3281,7 @@ impl TenantShard {
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::AlreadyRunning(_) => {}
CompactionError::AlreadyRunning(_) => unreachable!("is_cancel, but XXX why?"),
}
}

View File

@@ -55,11 +55,11 @@ pub struct BatchLayerWriter {
}
impl BatchLayerWriter {
pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
Ok(Self {
pub fn new(conf: &'static PageServerConf) -> Self {
Self {
generated_layer_writers: Vec::new(),
conf,
})
}
}
pub fn add_unfinished_image_writer(
@@ -209,6 +209,7 @@ impl<'a> SplitImageLayerWriter<'a> {
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
// XXX make this lazy like in SplitDeltaLayerWriter?
inner: ImageLayerWriter::new(
conf,
timeline_id,
@@ -223,7 +224,7 @@ impl<'a> SplitImageLayerWriter<'a> {
conf,
timeline_id,
tenant_shard_id,
batches: BatchLayerWriter::new(conf).await?,
batches: BatchLayerWriter::new(conf),
lsn,
start_key,
gate,
@@ -319,7 +320,7 @@ pub struct SplitDeltaLayerWriter<'a> {
}
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
pub fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
@@ -327,8 +328,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
) -> Self {
Self {
target_layer_size,
inner: None,
conf,
@@ -336,10 +337,10 @@ impl<'a> SplitDeltaLayerWriter<'a> {
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
batches: BatchLayerWriter::new(conf),
gate,
cancel,
})
}
}
pub async fn put_value(
@@ -510,9 +511,7 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
);
image_writer
.put_image(get_key(0), get_img(0), &ctx)
@@ -590,9 +589,7 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
);
const N: usize = 2000;
for i in 0..N {
let i = i as u32;
@@ -692,9 +689,7 @@ mod tests {
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
);
image_writer
.put_image(get_key(0), get_img(0), &ctx)
@@ -770,9 +765,7 @@ mod tests {
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
);
for i in 0..N {
let i = i as u32;

View File

@@ -22,8 +22,8 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::{CheckOtherForCancel, CompactionError};
use crate::tenant::{TenantShard, TenantState};
/// Semaphore limiting concurrent background tasks (across all tenants).
@@ -292,35 +292,12 @@ pub(crate) fn log_compaction_error(
task_cancelled: bool,
degrade_to_warning: bool,
) {
use CompactionError::*;
let is_cancel = err.is_cancel(CheckOtherForCancel::Yes);
use crate::tenant::PageReconstructError;
use crate::tenant::upload_queue::NotInitialized;
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let is_stopping = upload_queue || timeline;
if is_stopping {
Level::INFO
} else {
Level::ERROR
}
}
let level = if is_cancel || task_cancelled {
Level::INFO
} else {
Level::ERROR
};
if let Some((error_count, sleep_duration)) = retry_info {

View File

@@ -75,7 +75,7 @@ use utils::postgres_client::PostgresClientProtocol;
use utils::rate_limit::RateLimit;
use utils::seqwait::SeqWait;
use utils::simple_rcu::{Rcu, RcuReadGuard};
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::gate::{Gate, GateError, GateGuard};
use utils::{completion, critical, fs_ext, pausable_failpoint};
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
@@ -116,6 +116,7 @@ use crate::pgdatadir_mapping::{
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
};
use crate::task_mgr::TaskKind;
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
@@ -130,6 +131,7 @@ use crate::tenant::storage_layer::{
};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use crate::walingest::WalLagCooldown;
use crate::walredo::RedoAttemptType;
@@ -760,7 +762,7 @@ pub(crate) enum CreateImageLayersError {
PageReconstructError(#[source] PageReconstructError),
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
}
impl From<layer_manager::Shutdown> for CreateImageLayersError {
@@ -2061,9 +2063,10 @@ impl Timeline {
};
// Signal compaction failure to avoid L0 flush stalls when it's broken.
// XXX this looks an awful lot like the circuit breaker code? Can we dedupe classification?
match &result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(e) if e.is_cancel() => {}
Err(e) if e.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) => {}
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
@@ -5530,7 +5533,7 @@ impl Timeline {
self.should_check_if_image_layers_required(lsn)
};
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
let mut batch_image_writer = BatchLayerWriter::new(self.conf);
let mut all_generated = true;
@@ -5634,7 +5637,8 @@ impl Timeline {
self.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(CreateImageLayersError::Other)?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(CreateImageLayersError::Other(anyhow::anyhow!(
@@ -5729,7 +5733,10 @@ impl Timeline {
}
}
let image_layers = batch_image_writer.finish(self, ctx).await?;
let image_layers = batch_image_writer
.finish(self, ctx)
.await
.map_err(CreateImageLayersError::Other)?;
let mut guard = self.layers.write().await;
@@ -5931,19 +5938,61 @@ pub(crate) enum CompactionError {
AlreadyRunning(&'static str),
}
/// Whether [`CompactionError::is_cancel`] should inspect the
/// [`CompactionError::Other`] anyhow Error's root cause for
/// typical causes of cancellation.
pub(crate) enum CheckOtherForCancel {
No,
Yes,
}
impl CompactionError {
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
matches!(
pub fn is_cancel(&self, check_other: CheckOtherForCancel) -> bool {
if matches!(
self,
Self::ShuttingDown
| Self::AlreadyRunning(_)
| Self::AlreadyRunning(_) // XXX why do we treat AlreadyRunning as cancel?
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
)
) {
return true;
}
let root_cause = match &check_other {
CheckOtherForCancel::No => return false,
CheckOtherForCancel::Yes => {
if let Self::Other(other) = self {
other.root_cause()
} else {
return false;
}
}
};
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed
}
/// Critical errors that indicate data corruption.

View File

@@ -11,9 +11,9 @@ use std::time::{Duration, Instant};
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
Timeline,
CheckOtherForCancel, CompactFlags, CompactOptions, CompactionError, CreateImageLayersError,
DurationRecorder, GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus,
RecordedDuration, Timeline,
};
use crate::tenant::timeline::DeltaEntry;
@@ -1396,7 +1396,7 @@ impl Timeline {
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel() => {}
Err(err) if err.is_cancel(CheckOtherForCancel::No) => {}
// Alert on critical errors that indicate data corruption.
Err(err) if err.is_critical() => {
@@ -3516,10 +3516,7 @@ impl Timeline {
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
.context("failed to create delta layer writer")
.map_err(CompactionError::Other)?;
);
#[derive(Default)]
struct RewritingLayers {
@@ -4297,7 +4294,8 @@ impl TimelineAdaptor {
self.timeline.cancel.clone(),
ctx,
)
.await?;
.await
.map_err(CreateImageLayersError::Other)?;
fail_point!("image-layer-writer-fail-before-finish", |_| {
Err(CreateImageLayersError::Other(anyhow::anyhow!(
@@ -4306,7 +4304,10 @@ impl TimelineAdaptor {
});
let keyspace = KeySpace {
ranges: self.get_keyspace(key_range, lsn, ctx).await?,
ranges: self
.get_keyspace(key_range, lsn, ctx)
.await
.map_err(CreateImageLayersError::Other)?,
};
// TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
let outcome = self
@@ -4325,9 +4326,13 @@ impl TimelineAdaptor {
unfinished_image_layer,
} = outcome
{
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
let (desc, path) = unfinished_image_layer
.finish(ctx)
.await
.map_err(CreateImageLayersError::Other)?;
let image_layer =
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)
.map_err(CreateImageLayersError::Other)?;
self.new_images.push(image_layer);
}