fix(pageserver): correctly handle collect_keyspace errors (#10976)

## Problem

ref https://github.com/neondatabase/neon/issues/10927

## Summary of changes

* Implement `is_critical` and `is_cancel` over `CompactionError`.
* Revisit all places that uses `CollectKeyspaceError` to ensure they are
handled correctly.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-02-26 11:09:50 -06:00
committed by GitHub
parent 14347630a4
commit a138a6de9b
4 changed files with 58 additions and 40 deletions

View File

@@ -3141,11 +3141,13 @@ impl Tenant {
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
CompactionError::CollectKeySpaceError(err) => {
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
self.compaction_circuit_breaker
.lock()
.unwrap()

View File

@@ -289,15 +289,14 @@ fn log_compaction_error(
) {
use CompactionError::*;
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::tenant::PageReconstructError;
use crate::tenant::upload_queue::NotInitialized;
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {

View File

@@ -1864,16 +1864,25 @@ impl Timeline {
};
// Signal compaction failure to avoid L0 flush stalls when it's broken.
match result {
match &result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(CompactionError::Other(_)) | Err(CompactionError::CollectKeySpaceError(_)) => {
Err(e) if e.is_cancel() => {}
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Other(_)) => {
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
Err(CompactionError::CollectKeySpaceError(_)) => {
// Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch.
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
// Don't change the current value on offload failure or shutdown. We don't want to
// abruptly stall nor resume L0 flushes in these cases.
Err(CompactionError::Offload(_)) => {}
Err(CompactionError::ShuttingDown) => {}
Err(CompactionError::AlreadyRunning(_)) => {}
};
result
@@ -4688,10 +4697,7 @@ impl Timeline {
));
}
let (dense_ks, sparse_ks) = self
.collect_keyspace(lsn, ctx)
.await
.map_err(CompactionError::CollectKeySpaceError)?;
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(&self.shard_identity, partition_size);
let sparse_partitioning = SparseKeyPartitioning {
parts: vec![sparse_ks],
@@ -5417,13 +5423,42 @@ pub(crate) enum CompactionError {
Offload(OffloadError),
/// Compaction cannot be done right now; page reconstruction and so on.
#[error("Failed to collect keyspace: {0}")]
CollectKeySpaceError(CollectKeySpaceError),
CollectKeySpaceError(#[from] CollectKeySpaceError),
#[error(transparent)]
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
}
impl CompactionError {
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
matches!(
self,
Self::ShuttingDown
| Self::AlreadyRunning(_)
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
)
}
/// Critical errors that indicate data corruption.
pub fn is_critical(&self) -> bool {
matches!(
self,
Self::CollectKeySpaceError(
CollectKeySpaceError::Decode(_)
| CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
)
)
)
}
}
impl From<OffloadError> for CompactionError {
fn from(e: OffloadError) -> Self {
match e {
@@ -5433,18 +5468,6 @@ impl From<OffloadError> for CompactionError {
}
}
impl From<CollectKeySpaceError> for CompactionError {
fn from(err: CollectKeySpaceError) -> Self {
match err {
CollectKeySpaceError::Cancelled
| CollectKeySpaceError::PageRead(PageReconstructError::Cancelled) => {
CompactionError::ShuttingDown
}
e => CompactionError::Other(e.into()),
}
}
}
impl From<super::upload_queue::NotInitialized> for CompactionError {
fn from(value: super::upload_queue::NotInitialized) -> Self {
match value {

View File

@@ -8,6 +8,13 @@ use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
Timeline,
};
use anyhow::{Context, anyhow, bail};
use bytes::Bytes;
use enumset::EnumSet;
@@ -31,15 +38,8 @@ use utils::critical;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, PageReconstructError,
RecordedDuration, Timeline,
};
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
@@ -975,18 +975,12 @@ impl Timeline {
// Suppress errors when cancelled.
Err(_) if self.cancel.is_cancelled() => {}
Err(CompactionError::ShuttingDown) => {}
Err(CompactionError::CollectKeySpaceError(CollectKeySpaceError::Cancelled)) => {}
Err(err) if err.is_cancel() => {}
// Alert on critical errors that indicate data corruption.
Err(
err @ CompactionError::CollectKeySpaceError(
CollectKeySpaceError::Decode(_)
| CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
),
),
) => critical!("could not compact, repartitioning keyspace failed: {err:?}"),
Err(err) if err.is_critical() => {
critical!("could not compact, repartitioning keyspace failed: {err:?}");
}
// Log other errors. No partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple