mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
Compare commits
3 Commits
iddm/commu
...
problame/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b245e74f1 | ||
|
|
a4b3e3f9a4 | ||
|
|
83b149f43c |
@@ -1280,7 +1280,7 @@ impl ComputeNode {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let mut sync_handle = maybe_cgexec(&self.params.pgbin)
|
||||
.args(["--sync-safekeepers"])
|
||||
.args(["--sync-safekeepers"]) // CF walproposer.c:289
|
||||
.env("PGDATA", &self.params.pgdata) // we cannot use -D in this mode
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
|
||||
@@ -162,9 +162,9 @@ impl WalStreamDecoderHandler for WalStreamDecoder {
|
||||
// Fast path for the common case that the whole record fits on the page.
|
||||
let pageleft = self.lsn.remaining_in_block() as u32;
|
||||
if self.inputbuf.remaining() >= xl_tot_len as usize && xl_tot_len <= pageleft {
|
||||
self.lsn += xl_tot_len as u64;
|
||||
self.lsn += xl_tot_len as u64; /* we set self.lsn to the exclusive end of the record */
|
||||
let recordbuf = self.inputbuf.copy_to_bytes(xl_tot_len as usize);
|
||||
return Ok(Some(self.complete_record(recordbuf)?));
|
||||
return Ok(Some(self.complete_record(recordbuf)?)); // this returns (start LSN of NEXT!? record, this record's Bytes), wtf
|
||||
} else {
|
||||
// Need to assemble the record from pieces. Remember the size of the
|
||||
// record, and loop back. On next iterations, we will reach the branch
|
||||
|
||||
@@ -80,8 +80,8 @@ pub struct InterpretedWalRecord {
|
||||
/// by the pageserver
|
||||
pub batch: SerializedValueBatch,
|
||||
/// Byte offset within WAL for the start of the next PG WAL record.
|
||||
/// Usually this is the end LSN of the current record, but in case of
|
||||
/// XLOG SWITCH records it will be within the next segment.
|
||||
/// Usually this is the byte following the last byte of this record here,
|
||||
/// but in case of XLOG SWITCH records it will be within the next segment.
|
||||
pub next_record_lsn: Lsn,
|
||||
/// Whether to flush all uncommitted modifications to the storage engine
|
||||
/// before ingesting this record. This is currently only used for legacy PG
|
||||
@@ -213,6 +213,7 @@ pub struct XactCommon {
|
||||
pub origin_id: u16,
|
||||
// Fields below are only used for logging
|
||||
pub xl_xid: TransactionId,
|
||||
// The `next_record_lsn` returned by wal decoder when we decoded this record.
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -255,6 +256,7 @@ pub enum XlogRecord {
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct RawXlogRecord {
|
||||
pub info: u8,
|
||||
// The `next_record_lsn` returned by wal decoder when we decoded this record.
|
||||
pub lsn: Lsn,
|
||||
pub buf: Bytes,
|
||||
}
|
||||
|
||||
@@ -80,6 +80,8 @@ impl Eq for OrderedValueMeta {}
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct SerializedValueMeta {
|
||||
pub key: CompactKey,
|
||||
// The `next_record_lsn` emitted by the wal_decoder for the WAL record
|
||||
// that corresponds to this value.
|
||||
pub lsn: Lsn,
|
||||
/// Starting offset of the value for the (key, LSN) tuple
|
||||
/// in [`SerializedValueBatch::raw`]
|
||||
@@ -92,6 +94,8 @@ pub struct SerializedValueMeta {
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct ObservedValueMeta {
|
||||
pub key: CompactKey,
|
||||
// The `next_record_lsn` emitted by the wal_decoder for the WAL record
|
||||
// that corresponds to this value.
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
@@ -109,7 +113,12 @@ pub struct SerializedValueBatch {
|
||||
/// by LSN. Note that entries for a key do not have to be contiguous.
|
||||
pub metadata: Vec<ValueMeta>,
|
||||
|
||||
/// The highest LSN of any value in the batch
|
||||
/// The highest LSN of any value in the batch.
|
||||
///
|
||||
/// The "LSN of a Value" is the `next_record_lsn` that the wal_decoder
|
||||
/// emitted for that value, i.e., the LSN of a Value is
|
||||
/// an LSN that is >= the next byte after the last byte of this value's
|
||||
/// WAL record.
|
||||
pub max_lsn: Lsn,
|
||||
|
||||
/// Number of values encoded by [`Self::raw`]
|
||||
|
||||
@@ -1494,7 +1494,7 @@ pub struct DatadirModification<'a> {
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
/// Current LSN of the modification
|
||||
/// Current LSN of the modification.
|
||||
lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
|
||||
@@ -102,6 +102,7 @@ use crate::tenant::remote_timeline_client::{
|
||||
INITDB_PATH, MaybeDeletedIndexPart, remote_initdb_archive_path,
|
||||
};
|
||||
use crate::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use crate::tenant::timeline::CheckOtherForCancel;
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
@@ -3313,9 +3314,11 @@ impl TenantShard {
|
||||
|
||||
/// Trips the compaction circuit breaker if appropriate.
|
||||
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
|
||||
if err.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) {
|
||||
return;
|
||||
}
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::ShuttingDown => unreachable!("is_cancel"),
|
||||
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
|
||||
// shouldn't block compaction.
|
||||
CompactionError::Offload(_) => {}
|
||||
@@ -3332,7 +3335,7 @@ impl TenantShard {
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
CompactionError::AlreadyRunning(_) => {}
|
||||
CompactionError::AlreadyRunning(_) => unreachable!("is_cancel, but XXX why?"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,17 +17,14 @@ use tracing::*;
|
||||
use utils::backoff::exponential_backoff_duration;
|
||||
use utils::completion::Barrier;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::GateError;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::compaction::CompactionOutcome;
|
||||
use crate::tenant::timeline::{CheckOtherForCancel, CompactionError};
|
||||
use crate::tenant::{TenantShard, TenantState};
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
|
||||
/// Semaphore limiting concurrent background tasks (across all tenants).
|
||||
///
|
||||
@@ -295,48 +292,12 @@ pub(crate) fn log_compaction_error(
|
||||
task_cancelled: bool,
|
||||
degrade_to_warning: bool,
|
||||
) {
|
||||
use CompactionError::*;
|
||||
let is_cancel = err.is_cancel(CheckOtherForCancel::Yes);
|
||||
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
Offload(_) => Level::ERROR,
|
||||
AlreadyRunning(_) => Level::ERROR,
|
||||
CollectKeySpaceError(_) => Level::ERROR,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let is_stopping = upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed;
|
||||
|
||||
if is_stopping {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
}
|
||||
}
|
||||
let level = if is_cancel || task_cancelled {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
};
|
||||
|
||||
if let Some((error_count, sleep_duration)) = retry_info {
|
||||
|
||||
@@ -77,7 +77,7 @@ use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::seqwait::SeqWait;
|
||||
use utils::simple_rcu::{Rcu, RcuReadGuard};
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::gate::{Gate, GateError, GateGuard};
|
||||
use utils::{completion, critical, fs_ext, pausable_failpoint};
|
||||
#[cfg(test)]
|
||||
use wal_decoder::models::value::Value;
|
||||
@@ -119,6 +119,7 @@ use crate::pgdatadir_mapping::{
|
||||
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
use crate::tenant::config::AttachmentMode;
|
||||
use crate::tenant::gc_result::GcResult;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
@@ -133,6 +134,7 @@ use crate::tenant::storage_layer::{
|
||||
};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
use crate::walingest::WalLagCooldown;
|
||||
use crate::walredo::RedoAttemptType;
|
||||
@@ -2062,9 +2064,10 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Signal compaction failure to avoid L0 flush stalls when it's broken.
|
||||
// XXX this looks an awful lot like the circuit breaker code? Can we dedupe classification?
|
||||
match &result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(e) if e.is_cancel() => {}
|
||||
Err(e) if e.is_cancel(CheckOtherForCancel::No /* XXX flip this to Yes so that all the Other() errors that are cancel don't trip the circuit breaker? */) => {}
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
@@ -5995,19 +5998,61 @@ pub(crate) enum CompactionError {
|
||||
AlreadyRunning(&'static str),
|
||||
}
|
||||
|
||||
/// Whether [`CompactionError::is_cancel`] should inspect the
|
||||
/// [`CompactionError::Other`] anyhow Error's root cause for
|
||||
/// typical causes of cancellation.
|
||||
pub(crate) enum CheckOtherForCancel {
|
||||
No,
|
||||
Yes,
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(
|
||||
pub fn is_cancel(&self, check_other: CheckOtherForCancel) -> bool {
|
||||
if matches!(
|
||||
self,
|
||||
Self::ShuttingDown
|
||||
| Self::AlreadyRunning(_)
|
||||
| Self::AlreadyRunning(_) // XXX why do we treat AlreadyRunning as cancel?
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
|
||||
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
|
||||
PageReconstructError::Cancelled
|
||||
))
|
||||
| Self::Offload(OffloadError::Cancelled)
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
let root_cause = match &check_other {
|
||||
CheckOtherForCancel::No => return false,
|
||||
CheckOtherForCancel::Yes => {
|
||||
if let Self::Other(other) = self {
|
||||
other.root_cause()
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed
|
||||
}
|
||||
|
||||
/// Critical errors that indicate data corruption.
|
||||
|
||||
@@ -11,9 +11,9 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
|
||||
use super::{
|
||||
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
|
||||
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
|
||||
Timeline,
|
||||
CheckOtherForCancel, CompactFlags, CompactOptions, CompactionError, CreateImageLayersError,
|
||||
DurationRecorder, GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus,
|
||||
RecordedDuration, Timeline,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::DeltaEntry;
|
||||
@@ -1405,7 +1405,7 @@ impl Timeline {
|
||||
|
||||
// Suppress errors when cancelled.
|
||||
Err(_) if self.cancel.is_cancelled() => {}
|
||||
Err(err) if err.is_cancel() => {}
|
||||
Err(err) if err.is_cancel(CheckOtherForCancel::No) => {}
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(err) if err.is_critical() => {
|
||||
|
||||
Reference in New Issue
Block a user