mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
3 Commits
quantumish
...
problame/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
318700600d | ||
|
|
75ef17e2f7 | ||
|
|
93026b7899 |
@@ -86,6 +86,14 @@ pub enum GateError {
|
||||
GateClosed,
|
||||
}
|
||||
|
||||
impl GateError {
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
match self {
|
||||
GateError::GateClosed => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Gate {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
||||
@@ -99,6 +99,7 @@ use crate::tenant::remote_timeline_client::{
|
||||
INITDB_PATH, MaybeDeletedIndexPart, remote_initdb_archive_path,
|
||||
};
|
||||
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use crate::tenant::timeline::CheckOtherForCancel;
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
@@ -3261,11 +3262,11 @@ impl TenantShard {
|
||||
|
||||
/// Trips the compaction circuit breaker if appropriate.
|
||||
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
|
||||
if err.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) {
|
||||
return;
|
||||
}
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::ShuttingDown => unreachable!("is_cancel"),
|
||||
CompactionError::Offload(_) => {}
|
||||
CompactionError::CollectKeySpaceError(err) => {
|
||||
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
|
||||
@@ -3280,7 +3281,7 @@ impl TenantShard {
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::AlreadyRunning(_) => {}
|
||||
CompactionError::AlreadyRunning(_) => unreachable!("is_cancel, but XXX why?"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,11 +55,11 @@ pub struct BatchLayerWriter {
|
||||
}
|
||||
|
||||
impl BatchLayerWriter {
|
||||
pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
pub fn new(conf: &'static PageServerConf) -> Self {
|
||||
Self {
|
||||
generated_layer_writers: Vec::new(),
|
||||
conf,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_unfinished_image_writer(
|
||||
@@ -209,6 +209,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
// XXX make this lazy like in SplitDeltaLayerWriter?
|
||||
inner: ImageLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
@@ -223,7 +224,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
batches: BatchLayerWriter::new(conf),
|
||||
lsn,
|
||||
start_key,
|
||||
gate,
|
||||
@@ -319,7 +320,7 @@ pub struct SplitDeltaLayerWriter<'a> {
|
||||
}
|
||||
|
||||
impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -327,8 +328,8 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
) -> Self {
|
||||
Self {
|
||||
target_layer_size,
|
||||
inner: None,
|
||||
conf,
|
||||
@@ -336,10 +337,10 @@ impl<'a> SplitDeltaLayerWriter<'a> {
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
batches: BatchLayerWriter::new(conf).await?,
|
||||
batches: BatchLayerWriter::new(conf),
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
@@ -510,9 +511,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &ctx)
|
||||
@@ -590,9 +589,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
const N: usize = 2000;
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
@@ -692,9 +689,7 @@ mod tests {
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
image_writer
|
||||
.put_image(get_key(0), get_img(0), &ctx)
|
||||
@@ -770,9 +765,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
for i in 0..N {
|
||||
let i = i as u32;
|
||||
|
||||
@@ -22,8 +22,8 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::compaction::CompactionOutcome;
|
||||
use crate::tenant::timeline::{CheckOtherForCancel, CompactionError};
|
||||
use crate::tenant::{TenantShard, TenantState};
|
||||
|
||||
/// Semaphore limiting concurrent background tasks (across all tenants).
|
||||
@@ -292,35 +292,12 @@ pub(crate) fn log_compaction_error(
|
||||
task_cancelled: bool,
|
||||
degrade_to_warning: bool,
|
||||
) {
|
||||
use CompactionError::*;
|
||||
let is_cancel = err.is_cancel(CheckOtherForCancel::Yes);
|
||||
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let is_stopping = upload_queue || timeline;
|
||||
|
||||
if is_stopping {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
}
|
||||
}
|
||||
let level = if is_cancel || task_cancelled {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
};
|
||||
|
||||
if let Some((error_count, sleep_duration)) = retry_info {
|
||||
|
||||
@@ -75,7 +75,7 @@ use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::seqwait::SeqWait;
|
||||
use utils::simple_rcu::{Rcu, RcuReadGuard};
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::gate::{Gate, GateError, GateGuard};
|
||||
use utils::{completion, critical, fs_ext, pausable_failpoint};
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
@@ -116,6 +116,7 @@ use crate::pgdatadir_mapping::{
|
||||
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
use crate::tenant::config::AttachmentMode;
|
||||
use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
@@ -130,6 +131,7 @@ use crate::tenant::storage_layer::{
|
||||
};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use crate::walingest::WalLagCooldown;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
@@ -760,7 +762,7 @@ pub(crate) enum CreateImageLayersError {
|
||||
PageReconstructError(#[source] PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<layer_manager::Shutdown> for CreateImageLayersError {
|
||||
@@ -2061,9 +2063,10 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Signal compaction failure to avoid L0 flush stalls when it's broken.
|
||||
// XXX this looks an awful lot like the circuit breaker code? Can we dedupe classification?
|
||||
match &result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(e) if e.is_cancel() => {}
|
||||
Err(e) if e.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) => {}
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
@@ -5530,7 +5533,7 @@ impl Timeline {
|
||||
self.should_check_if_image_layers_required(lsn)
|
||||
};
|
||||
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf).await?;
|
||||
let mut batch_image_writer = BatchLayerWriter::new(self.conf);
|
||||
|
||||
let mut all_generated = true;
|
||||
|
||||
@@ -5634,7 +5637,8 @@ impl Timeline {
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(CreateImageLayersError::Other)?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
@@ -5729,7 +5733,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let image_layers = batch_image_writer.finish(self, ctx).await?;
|
||||
let image_layers = batch_image_writer
|
||||
.finish(self, ctx)
|
||||
.await
|
||||
.map_err(CreateImageLayersError::Other)?;
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
@@ -5931,19 +5938,61 @@ pub(crate) enum CompactionError {
|
||||
AlreadyRunning(&'static str),
|
||||
}
|
||||
|
||||
/// Whether [`CompactionError::is_cancel`] should inspect the
|
||||
/// [`CompactionError::Other`] anyhow Error's root cause for
|
||||
/// typical causes of cancellation.
|
||||
pub(crate) enum CheckOtherForCancel {
|
||||
No,
|
||||
Yes,
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(
|
||||
pub fn is_cancel(&self, check_other: CheckOtherForCancel) -> bool {
|
||||
if matches!(
|
||||
self,
|
||||
Self::ShuttingDown
|
||||
| Self::AlreadyRunning(_)
|
||||
| Self::AlreadyRunning(_) // XXX why do we treat AlreadyRunning as cancel?
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
let root_cause = match &check_other {
|
||||
CheckOtherForCancel::No => return false,
|
||||
CheckOtherForCancel::Yes => {
|
||||
if let Self::Other(other) = self {
|
||||
other.root_cause()
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed
|
||||
}
|
||||
|
||||
/// Critical errors that indicate data corruption.
|
||||
|
||||
@@ -11,9 +11,9 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::LayerManager;
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
|
||||
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
|
||||
Timeline,
|
||||
CheckOtherForCancel, CompactFlags, CompactOptions, CompactionError, CreateImageLayersError,
|
||||
DurationRecorder, GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus,
|
||||
RecordedDuration, Timeline,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
@@ -1396,7 +1396,7 @@ impl Timeline {
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
Err(err) if err.is_cancel(CheckOtherForCancel::No) => {}
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(err) if err.is_critical() => {
|
||||
@@ -3516,10 +3516,7 @@ impl Timeline {
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
)
|
||||
.await
|
||||
.context("failed to create delta layer writer")
|
||||
.map_err(CompactionError::Other)?;
|
||||
);
|
||||
|
||||
#[derive(Default)]
|
||||
struct RewritingLayers {
|
||||
@@ -4297,7 +4294,8 @@ impl TimelineAdaptor {
|
||||
self.timeline.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.map_err(CreateImageLayersError::Other)?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
@@ -4306,7 +4304,10 @@ impl TimelineAdaptor {
|
||||
});
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: self.get_keyspace(key_range, lsn, ctx).await?,
|
||||
ranges: self
|
||||
.get_keyspace(key_range, lsn, ctx)
|
||||
.await
|
||||
.map_err(CreateImageLayersError::Other)?,
|
||||
};
|
||||
// TODO set proper (stateful) start. The create_image_layer_for_rel_blocks function mostly
|
||||
let outcome = self
|
||||
@@ -4325,9 +4326,13 @@ impl TimelineAdaptor {
|
||||
unfinished_image_layer,
|
||||
} = outcome
|
||||
{
|
||||
let (desc, path) = unfinished_image_layer.finish(ctx).await?;
|
||||
let (desc, path) = unfinished_image_layer
|
||||
.finish(ctx)
|
||||
.await
|
||||
.map_err(CreateImageLayersError::Other)?;
|
||||
let image_layer =
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?;
|
||||
Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)
|
||||
.map_err(CreateImageLayersError::Other)?;
|
||||
self.new_images.push(image_layer);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user