mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
refactor(compaction): eliminate CompactionError::CollectKeyspaceError variant (#12517)
The only differentiated handling of it is for `is_critical`, which in turn is a `matches!()` on several variants of the `enum CollectKeyspaceError` which is the value contained insided `CompactionError::CollectKeyspaceError`. This PR introduces a new error for `repartition()`, allowing its immediate callers to inspect it like `is_critical` did. A drive-by fix is more precise classification of WaitLsnError::BadState when mapping to `tonic::Status`. refs - https://databricks.atlassian.net/browse/LKB-182
This commit is contained in:
committed by
GitHub
parent
43dbded8c8
commit
aac1f8efb1
@@ -2502,7 +2502,6 @@ async fn timeline_checkpoint_handler(
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
)?;
|
||||
|
||||
@@ -141,6 +141,23 @@ pub(crate) enum CollectKeySpaceError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl CollectKeySpaceError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
CollectKeySpaceError::Decode(_) => false,
|
||||
CollectKeySpaceError::PageRead(e) => e.is_cancel(),
|
||||
CollectKeySpaceError::Cancelled => true,
|
||||
}
|
||||
}
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
|
||||
CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
|
||||
CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CollectKeySpaceError {
|
||||
fn from(err: PageReconstructError) -> Self {
|
||||
match err {
|
||||
|
||||
@@ -3324,13 +3324,6 @@ impl TenantShard {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::CollectKeySpaceError(err) => {
|
||||
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
|
||||
@@ -318,7 +318,6 @@ pub(crate) fn log_compaction_error(
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
@@ -328,7 +327,7 @@ pub(crate) fn log_compaction_error(
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
|
||||
@@ -585,6 +585,28 @@ pub(crate) enum PageReconstructError {
|
||||
MissingKey(Box<MissingKeyError>),
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
PageReconstructError::Other(_) => false,
|
||||
PageReconstructError::AncestorLsnTimeout(e) => e.is_cancel(),
|
||||
PageReconstructError::Cancelled => true,
|
||||
PageReconstructError::WalRedo(_) => false,
|
||||
PageReconstructError::MissingKey(_) => false,
|
||||
}
|
||||
}
|
||||
#[allow(dead_code)] // we use the is_cancel + into_anyhow pattern in quite a few places, this one will follow soon enough
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
PageReconstructError::Other(e) => e,
|
||||
PageReconstructError::AncestorLsnTimeout(e) => e.into_anyhow(),
|
||||
PageReconstructError::Cancelled => anyhow::Error::new(self),
|
||||
PageReconstructError::WalRedo(e) => e,
|
||||
PageReconstructError::MissingKey(_) => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for PageReconstructError {
|
||||
fn from(value: anyhow::Error) -> Self {
|
||||
// with walingest.rs many PageReconstructError are wrapped in as anyhow::Error
|
||||
@@ -738,17 +760,6 @@ impl std::fmt::Display for MissingKeyError {
|
||||
}
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use PageReconstructError::*;
|
||||
match self {
|
||||
Cancelled => true,
|
||||
Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CreateImageLayersError {
|
||||
#[error("timeline shutting down")]
|
||||
@@ -951,13 +962,35 @@ pub enum WaitLsnError {
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
impl WaitLsnError {
|
||||
pub(crate) fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
WaitLsnError::Shutdown => true,
|
||||
WaitLsnError::BadState(timeline_state) => match timeline_state {
|
||||
TimelineState::Loading => false,
|
||||
TimelineState::Active => false,
|
||||
TimelineState::Stopping => true,
|
||||
TimelineState::Broken { .. } => false,
|
||||
},
|
||||
WaitLsnError::Timeout(_) => false,
|
||||
}
|
||||
}
|
||||
pub(crate) fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
WaitLsnError::Shutdown => anyhow::Error::new(self),
|
||||
WaitLsnError::BadState(_) => anyhow::Error::new(self),
|
||||
WaitLsnError::Timeout(_) => anyhow::Error::new(self),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WaitLsnError> for tonic::Status {
|
||||
fn from(err: WaitLsnError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
let code = if err.is_cancel() {
|
||||
Code::Unavailable
|
||||
} else {
|
||||
Code::Internal
|
||||
};
|
||||
tonic::Status::new(code, err.to_string())
|
||||
}
|
||||
@@ -1084,6 +1117,26 @@ enum ImageLayerCreationOutcome {
|
||||
Skip,
|
||||
}
|
||||
|
||||
enum RepartitionError {
|
||||
Other(anyhow::Error),
|
||||
CollectKeyspace(CollectKeySpaceError),
|
||||
}
|
||||
|
||||
impl RepartitionError {
|
||||
fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
RepartitionError::Other(_) => false,
|
||||
RepartitionError::CollectKeyspace(e) => e.is_cancel(),
|
||||
}
|
||||
}
|
||||
fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
RepartitionError::Other(e) => e,
|
||||
RepartitionError::CollectKeyspace(e) => e.into_anyhow(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -2070,10 +2123,6 @@ impl Timeline {
|
||||
Err(CompactionError::Other(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
Err(CompactionError::CollectKeySpaceError(_)) => {
|
||||
// Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch.
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
};
|
||||
|
||||
result
|
||||
@@ -4963,7 +5012,7 @@ impl Timeline {
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e.into_anyhow()))?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
@@ -5213,18 +5262,18 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
|
||||
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), RepartitionError> {
|
||||
let Ok(mut guard) = self.partitioning.try_write_guard() else {
|
||||
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
|
||||
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
||||
// and hence before the compaction task starts.
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
return Err(RepartitionError::Other(anyhow!(
|
||||
"repartition() called concurrently"
|
||||
)));
|
||||
};
|
||||
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
|
||||
if lsn < *partition_lsn {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
return Err(RepartitionError::Other(anyhow!(
|
||||
"repartition() called with LSN going backwards, this should not happen"
|
||||
)));
|
||||
}
|
||||
@@ -5245,7 +5294,10 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
|
||||
let (dense_ks, sparse_ks) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.await
|
||||
.map_err(RepartitionError::CollectKeyspace)?;
|
||||
let dense_partitioning = dense_ks.partition(
|
||||
&self.shard_identity,
|
||||
partition_size,
|
||||
@@ -6010,9 +6062,6 @@ impl Drop for Timeline {
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
#[error("Failed to collect keyspace: {0}")]
|
||||
CollectKeySpaceError(#[from] CollectKeySpaceError),
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -6020,27 +6069,15 @@ pub(crate) enum CompactionError {
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::ShuttingDown
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
)
|
||||
matches!(self, Self::ShuttingDown)
|
||||
}
|
||||
|
||||
/// Critical errors that indicate data corruption.
|
||||
pub fn is_critical(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::CollectKeySpaceError(
|
||||
CollectKeySpaceError::Decode(_)
|
||||
| CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
|
||||
)
|
||||
)
|
||||
)
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::ShuttingDown
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,8 @@ use super::{
|
||||
Timeline,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
use crate::pgdatadir_mapping::CollectKeySpaceError;
|
||||
use crate::tenant::timeline::{DeltaEntry, RepartitionError};
|
||||
use crate::walredo::RedoAttemptType;
|
||||
use anyhow::{Context, anyhow};
|
||||
use bytes::Bytes;
|
||||
@@ -64,7 +65,7 @@ use crate::tenant::timeline::{
|
||||
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
|
||||
ResidentLayer, drop_layer_manager_rlock,
|
||||
};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded};
|
||||
use crate::tenant::{DeltaLayer, MaybeOffloaded, PageReconstructError};
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
@@ -572,7 +573,7 @@ impl GcCompactionQueue {
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(CompactionError::CollectKeySpaceError(_) | CompactionError::Other(_)) => {
|
||||
Err(CompactionError::Other(_)) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
@@ -1417,22 +1418,33 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
//
|
||||
// Log other errors but continue. Failure to repartition is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
|
||||
// key-value store, ignoring the datadir layout. Log the error but continue.
|
||||
//
|
||||
// TODO:
|
||||
// 1. shouldn't we return early here if we observe cancellation
|
||||
// 2. Experiment: can we stop checking self.cancel here?
|
||||
Err(_) if self.cancel.is_cancelled() => {} // TODO: try how we fare removing this branch
|
||||
Err(err) if err.is_cancel() => {}
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(err) if err.is_critical() => {
|
||||
Err(RepartitionError::CollectKeyspace(
|
||||
e @ CollectKeySpaceError::Decode(_)
|
||||
| e @ CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
|
||||
),
|
||||
)) => {
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"could not compact, repartitioning keyspace failed: {err:?}"
|
||||
"could not compact, repartitioning keyspace failed: {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Log other errors. No partitioning? This is normal, if the timeline was just created
|
||||
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
|
||||
// key-value store, ignoring the datadir layout. Log the error but continue.
|
||||
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
|
||||
Err(e) => error!(
|
||||
"could not compact, repartitioning keyspace failed: {:?}",
|
||||
e.into_anyhow()
|
||||
),
|
||||
};
|
||||
|
||||
let partition_count = self.partitioning.read().0.0.parts.len();
|
||||
@@ -2518,7 +2530,10 @@ impl Timeline {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
|
||||
let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
|
||||
let (dense_ks, _sparse_ks) = self
|
||||
.collect_keyspace(end_lsn, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::from_collect_keyspace)?;
|
||||
// TODO(chi): ignore sparse_keyspace for now, compact it in the future.
|
||||
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user