mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
## TLDR This PR is a no-op. The changes are disabled by default. ## Problem I. Currently we don't have a way to detect disk I/O failures from WAL operations. II. We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs. wal_backup task continously failing to upload a full segment while the segment remains partial on the disk. The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space. III. We have discovered SK bugs where the WAL offload owner cannot keep up with WAL backup/upload to S3, which results in an unbounded accumulation of WAL segment files on the Safekeeper's disk until the disk becomes full. This is a somewhat dangerous operation that is hard to recover from because the Safekeeper cannot write its control files when it is out of disk space. There are actually 2 problems here: 1. A single problematic timeline can take over the entire disk for the SK 2. Once out of disk, it's difficult to recover SK IV. Neon reports certain storage errors as "critical" errors using a marco, which will increment a counter/metric that can be used to raise alerts. However, this metric isn't sliced by tenant and/or timeline today. We need the tenant/timeline dimension to better respond to incidents and for blast radius analysis. ## Summary of changes I. The PR adds a `safekeeper_wal_disk_io_errors ` which is incremented when SK fails to create or flush WALs. II. To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much. Each SK makes the decision locally but they are aware of each other's commit and backup lsns. The new algorithm is - determine_offloader will pick a SK. say SK-1. - Each SK checks -- if commit_lsn - back_lsn > threshold, -- -- remove SK-1 from the candidate and call determine_offloader again. SK-1 will step down and all SKs will elect the same leader again. After the backup is caught up, the leader will become SK-1 again. This also helps when SK-1 is slow to backup. I'll set the reelect backup lag to 4 GB later. Setting to 128 MB in dev to trigger the code more frequently. III. This change addresses problem no. 1 by having the Safekeeper perform a timeline disk utilization check check when processing WAL proposal messages from Postgres/compute. The Safekeeper now rejects the WAL proposal message, effectively stops writing more WAL for the timeline to disk, if the existing WAL files for the timeline on the SK disk exceeds a certain size (the default threshold is 100GB). The disk utilization is calculated based on a `last_removed_segno` variable tracked by the background task removing WAL files, which produces an accurate and conservative estimate (>= than actual disk usage) of the actual disk usage. IV. * Add a new metric `hadron_critical_storage_event_count` that has the `tenant_shard_id` and `timeline_id` as dimensions. * Modified the `crtitical!` marco to include tenant_id and timeline_id as additional arguments and adapted existing call sites to populate the tenant shard and timeline ID fields. The `critical!` marco invocation now increments the `hadron_critical_storage_event_count` with the extra dimensions. (In SK there isn't the notion of a tenant-shard, so just the tenant ID is recorded in lieu of tenant shard ID.) I considered adding a separate marco to avoid merge conflicts, but I think in this case (detecting critical errors) conflicts are probably more desirable so that we can be aware whenever Neon adds another `critical!` invocation in their code. --------- Co-authored-by: Chen Luo <chen.luo@databricks.com> Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com> Co-authored-by: William Huang <william.huang@databricks.com>
This commit is contained in:
@@ -24,12 +24,28 @@ macro_rules! critical {
|
||||
if cfg!(debug_assertions) {
|
||||
panic!($($arg)*);
|
||||
}
|
||||
// Increment both metrics
|
||||
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
|
||||
let backtrace = std::backtrace::Backtrace::capture();
|
||||
tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*));
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! critical_timeline {
|
||||
($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
|
||||
if cfg!(debug_assertions) {
|
||||
panic!($($arg)*);
|
||||
}
|
||||
// Increment both metrics
|
||||
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
|
||||
$crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
|
||||
let backtrace = std::backtrace::Backtrace::capture();
|
||||
tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
|
||||
$tenant_shard_id, $timeline_id, format!($($arg)*));
|
||||
}};
|
||||
}
|
||||
|
||||
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
@@ -61,6 +77,36 @@ pub struct TracingEventCountMetric {
|
||||
trace: IntCounter,
|
||||
}
|
||||
|
||||
// Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id
|
||||
pub struct HadronCriticalStorageEventCountMetric {
|
||||
critical: IntCounterVec,
|
||||
}
|
||||
|
||||
pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy<HadronCriticalStorageEventCountMetric> =
|
||||
Lazy::new(|| {
|
||||
let vec = metrics::register_int_counter_vec!(
|
||||
"hadron_critical_storage_event_count",
|
||||
"Number of critical storage events, by tenant_id and timeline_id",
|
||||
&["tenant_shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define metric");
|
||||
HadronCriticalStorageEventCountMetric::new(vec)
|
||||
});
|
||||
|
||||
impl HadronCriticalStorageEventCountMetric {
|
||||
fn new(vec: IntCounterVec) -> Self {
|
||||
Self { critical: vec }
|
||||
}
|
||||
|
||||
// Allow public access from `critical!` macro.
|
||||
pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) {
|
||||
self.critical
|
||||
.with_label_values(&[tenant_shard_id, timeline_id])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
// End Hadron
|
||||
|
||||
pub static TRACING_EVENT_COUNT_METRIC: Lazy<TracingEventCountMetric> = Lazy::new(|| {
|
||||
let vec = metrics::register_int_counter_vec!(
|
||||
"libmetrics_tracing_event_count",
|
||||
|
||||
@@ -78,7 +78,7 @@ use utils::rate_limit::RateLimit;
|
||||
use utils::seqwait::SeqWait;
|
||||
use utils::simple_rcu::{Rcu, RcuReadGuard};
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::{completion, critical, fs_ext, pausable_failpoint};
|
||||
use utils::{completion, critical_timeline, fs_ext, pausable_failpoint};
|
||||
#[cfg(test)]
|
||||
use wal_decoder::models::value::Value;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
@@ -6819,7 +6819,11 @@ impl Timeline {
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(err)) => {
|
||||
if fire_critical_error {
|
||||
critical!("walredo failure during page reconstruction: {err:?}");
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"walredo failure during page reconstruction: {err:?}"
|
||||
);
|
||||
}
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
err.context("reconstruct a page image"),
|
||||
|
||||
@@ -36,7 +36,7 @@ use serde::Serialize;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, info_span, trace, warn};
|
||||
use utils::critical;
|
||||
use utils::critical_timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
@@ -1390,7 +1390,11 @@ impl Timeline {
|
||||
GetVectoredError::MissingKey(_),
|
||||
) = err
|
||||
{
|
||||
critical!("missing key during compaction: {err:?}");
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"missing key during compaction: {err:?}"
|
||||
);
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -1418,7 +1422,11 @@ impl Timeline {
|
||||
|
||||
// Alert on critical errors that indicate data corruption.
|
||||
Err(err) if err.is_critical() => {
|
||||
critical!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"could not compact, repartitioning keyspace failed: {err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// Log other errors. No partitioning? This is normal, if the timeline was just created
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, trace, warn};
|
||||
use utils::critical;
|
||||
use utils::critical_timeline;
|
||||
use utils::id::NodeId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
@@ -368,9 +368,13 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
match raw_wal_start_lsn.cmp(&expected_wal_start) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
let msg = format!(
|
||||
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})"
|
||||
"Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn}"
|
||||
);
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{msg}"
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
@@ -383,7 +387,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
"Received record with next_record_lsn multiple times ({} < {})",
|
||||
first_rec.next_record_lsn, expected_wal_start
|
||||
);
|
||||
critical!("{msg}");
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{msg}"
|
||||
);
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
}
|
||||
@@ -452,7 +460,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// TODO: we can't differentiate cancellation errors with
|
||||
// anyhow::Error, so just ignore it if we're cancelled.
|
||||
if !cancellation.is_cancelled() && !timeline.is_stopping() {
|
||||
critical!("{err:?}")
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
"{err:?}"
|
||||
);
|
||||
}
|
||||
})?;
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ use tracing::*;
|
||||
use utils::bin_ser::{DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::{critical, failpoint_support};
|
||||
use utils::{critical_timeline, failpoint_support};
|
||||
use wal_decoder::models::record::NeonWalRecord;
|
||||
use wal_decoder::models::*;
|
||||
|
||||
@@ -418,18 +418,30 @@ impl WalIngest {
|
||||
// as there has historically been cases where PostgreSQL has cleared spurious VM pages. See:
|
||||
// https://github.com/neondatabase/neon/pull/10634.
|
||||
let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else {
|
||||
critical!("clear_vm_bits for unknown VM relation {vm_rel}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"clear_vm_bits for unknown VM relation {vm_rel}"
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
new_vm_blk = None;
|
||||
}
|
||||
}
|
||||
if let Some(blknum) = old_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}");
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
old_vm_blk = None;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,9 +18,10 @@ use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
|
||||
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
|
||||
DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
|
||||
};
|
||||
use safekeeper::wal_backup::WalBackup;
|
||||
use safekeeper::{
|
||||
@@ -138,6 +139,15 @@ struct Args {
|
||||
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
|
||||
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
|
||||
max_offloader_lag: u64,
|
||||
/* BEGIN_HADRON */
|
||||
/// Safekeeper will re-elect a new offloader if the current backup lagging for more than this value in bytes
|
||||
#[arg(long, default_value_t = DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES)]
|
||||
max_reelect_offloader_lag_bytes: u64,
|
||||
/// Safekeeper will stop accepting new WALs if the timeline disk usage exceeds this value in bytes.
|
||||
/// Setting this value to 0 disables the limit.
|
||||
#[arg(long, default_value_t = DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES)]
|
||||
max_timeline_disk_usage_bytes: u64,
|
||||
/* END_HADRON */
|
||||
/// Number of max parallel WAL segments to be offloaded to remote storage.
|
||||
#[arg(long, default_value = "5")]
|
||||
wal_backup_parallel_jobs: usize,
|
||||
@@ -391,6 +401,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
peer_recovery_enabled: args.peer_recovery,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
/* BEGIN_HADRON */
|
||||
max_reelect_offloader_lag_bytes: args.max_reelect_offloader_lag_bytes,
|
||||
max_timeline_disk_usage_bytes: args.max_timeline_disk_usage_bytes,
|
||||
/* END_HADRON */
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
backup_parallel_jobs: args.wal_backup_parallel_jobs,
|
||||
pg_auth,
|
||||
|
||||
@@ -17,6 +17,7 @@ use utils::crashsafe::durable_rename;
|
||||
|
||||
use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file};
|
||||
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
|
||||
use crate::metrics::WAL_DISK_IO_ERRORS;
|
||||
use crate::state::{EvictionState, TimelinePersistentState};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
@@ -192,11 +193,14 @@ impl TimelinePersistentState {
|
||||
impl Storage for FileStorage {
|
||||
/// Persists state durably to the underlying storage.
|
||||
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
|
||||
// start timer for metrics
|
||||
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
|
||||
|
||||
// write data to safekeeper.control.partial
|
||||
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
||||
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
|
||||
/* BEGIN_HADRON */
|
||||
WAL_DISK_IO_ERRORS.inc();
|
||||
/*END_HADRON */
|
||||
format!(
|
||||
"failed to create partial control file at: {}",
|
||||
&control_partial_path
|
||||
@@ -206,14 +210,24 @@ impl Storage for FileStorage {
|
||||
let buf: Vec<u8> = s.write_to_buf()?;
|
||||
|
||||
control_partial.write_all(&buf).await.with_context(|| {
|
||||
/* BEGIN_HADRON */
|
||||
WAL_DISK_IO_ERRORS.inc();
|
||||
/*END_HADRON */
|
||||
format!("failed to write safekeeper state into control file at: {control_partial_path}")
|
||||
})?;
|
||||
control_partial.flush().await.with_context(|| {
|
||||
/* BEGIN_HADRON */
|
||||
WAL_DISK_IO_ERRORS.inc();
|
||||
/*END_HADRON */
|
||||
format!("failed to flush safekeeper state into control file at: {control_partial_path}")
|
||||
})?;
|
||||
|
||||
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
||||
durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
|
||||
durable_rename(&control_partial_path, &control_path, !self.no_sync)
|
||||
.await
|
||||
/* BEGIN_HADRON */
|
||||
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
|
||||
/* END_HADRON */
|
||||
|
||||
// update internal state
|
||||
self.state = s.clone();
|
||||
|
||||
@@ -61,6 +61,13 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
/* BEGIN_HADRON */
|
||||
// Default leader re-elect is 0(disabled). SK will re-elect leader if the current leader is lagging this many bytes.
|
||||
pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 0;
|
||||
// Default disk usage limit is 0 (disabled). It means each timeline by default can use up to this many WAL
|
||||
// disk space on this SK until SK begins to reject WALs.
|
||||
pub const DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES: u64 = 0;
|
||||
/* END_HADRON */
|
||||
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
|
||||
pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s";
|
||||
pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5";
|
||||
@@ -99,6 +106,10 @@ pub struct SafeKeeperConf {
|
||||
pub peer_recovery_enabled: bool,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
/* BEGIN_HADRON */
|
||||
pub max_reelect_offloader_lag_bytes: u64,
|
||||
pub max_timeline_disk_usage_bytes: u64,
|
||||
/* END_HADRON */
|
||||
pub backup_parallel_jobs: usize,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub pg_auth: Option<Arc<JwtAuth>>,
|
||||
@@ -151,6 +162,10 @@ impl SafeKeeperConf {
|
||||
sk_auth_token: None,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
/* BEGIN_HADRON */
|
||||
max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES,
|
||||
max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES,
|
||||
/* END_HADRON */
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
|
||||
@@ -58,6 +58,25 @@ pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
|
||||
});
|
||||
/* BEGIN_HADRON */
|
||||
pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_wal_disk_io_errors",
|
||||
"Number of disk I/O errors when creating and flushing WALs and control files"
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_disk_io_errors counter")
|
||||
});
|
||||
pub static WAL_STORAGE_LIMIT_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_wal_storage_limit_errors",
|
||||
concat!(
|
||||
"Number of errors due to timeline WAL storage utilization exceeding configured limit. ",
|
||||
"An increase in this metric indicates issues backing up or removing WALs."
|
||||
)
|
||||
)
|
||||
.expect("Failed to register safekeeper_wal_storage_limit_errors counter")
|
||||
});
|
||||
/* END_HADRON */
|
||||
pub static PERSIST_CONTROL_FILE_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_persist_control_file_seconds",
|
||||
@@ -138,6 +157,15 @@ pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_backup_errors_total counter")
|
||||
});
|
||||
/* BEGIN_HADRON */
|
||||
pub static BACKUP_REELECT_LEADER_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_backup_reelect_leader_total",
|
||||
"Number of times the backup leader was reelected"
|
||||
)
|
||||
.expect("Failed to register safekeeper_backup_reelect_leader_total counter")
|
||||
});
|
||||
/* END_HADRON */
|
||||
pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"safekeeper_broker_push_update_seconds",
|
||||
|
||||
@@ -16,7 +16,7 @@ use tokio::sync::mpsc::error::SendError;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tracing::{Instrument, error, info, info_span};
|
||||
use utils::critical;
|
||||
use utils::critical_timeline;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::{Compression, InterpretedFormat};
|
||||
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
|
||||
@@ -268,6 +268,8 @@ impl InterpretedWalReader {
|
||||
|
||||
let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let ttid = wal_stream.ttid;
|
||||
|
||||
let reader = InterpretedWalReader {
|
||||
wal_stream,
|
||||
shard_senders: HashMap::from([(
|
||||
@@ -300,7 +302,11 @@ impl InterpretedWalReader {
|
||||
.inspect_err(|err| match err {
|
||||
// TODO: we may want to differentiate these errors further.
|
||||
InterpretedWalReaderError::Decode(_) => {
|
||||
critical!("failed to decode WAL record: {err:?}");
|
||||
critical_timeline!(
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
"failed to read WAL record: {err:?}"
|
||||
);
|
||||
}
|
||||
err => error!("failed to read WAL record: {err}"),
|
||||
})
|
||||
@@ -363,9 +369,14 @@ impl InterpretedWalReader {
|
||||
metric.dec();
|
||||
}
|
||||
|
||||
let ttid = self.wal_stream.ttid;
|
||||
match self.run_impl(start_pos).await {
|
||||
Err(err @ InterpretedWalReaderError::Decode(_)) => {
|
||||
critical!("failed to decode WAL record: {err:?}");
|
||||
critical_timeline!(
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
"failed to decode WAL record: {err:?}"
|
||||
);
|
||||
}
|
||||
Err(err) => error!("failed to read WAL record: {err}"),
|
||||
Ok(()) => info!("interpreted wal reader exiting"),
|
||||
|
||||
@@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
|
||||
use crate::metrics::{
|
||||
FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics,
|
||||
};
|
||||
use crate::rate_limit::RateLimiter;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
|
||||
@@ -1050,6 +1052,39 @@ impl WalResidentTimeline {
|
||||
Ok(ss)
|
||||
}
|
||||
|
||||
// BEGIN HADRON
|
||||
// Check if disk usage by WAL segment files for this timeline exceeds the configured limit.
|
||||
fn hadron_check_disk_usage(
|
||||
&self,
|
||||
shared_state_locked: &mut WriteGuardSharedState<'_>,
|
||||
) -> Result<()> {
|
||||
// The disk usage is calculated based on the number of segments between `last_removed_segno`
|
||||
// and the current flush LSN segment number. `last_removed_segno` is advanced after
|
||||
// unneeded WAL files are physically removed from disk (see `update_wal_removal_end()`
|
||||
// in `timeline_manager.rs`).
|
||||
let max_timeline_disk_usage_bytes = self.conf.max_timeline_disk_usage_bytes;
|
||||
if max_timeline_disk_usage_bytes > 0 {
|
||||
let last_removed_segno = self.last_removed_segno.load(Ordering::Relaxed);
|
||||
let flush_lsn = shared_state_locked.sk.flush_lsn();
|
||||
let wal_seg_size = shared_state_locked.sk.state().server.wal_seg_size as u64;
|
||||
let current_segno = flush_lsn.segment_number(wal_seg_size as usize);
|
||||
|
||||
let segno_count = current_segno - last_removed_segno;
|
||||
let disk_usage_bytes = segno_count * wal_seg_size;
|
||||
|
||||
if disk_usage_bytes > max_timeline_disk_usage_bytes {
|
||||
WAL_STORAGE_LIMIT_ERRORS.inc();
|
||||
bail!(
|
||||
"WAL storage utilization exceeds configured limit of {} bytes: current disk usage: {} bytes",
|
||||
max_timeline_disk_usage_bytes,
|
||||
disk_usage_bytes
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
// END HADRON
|
||||
|
||||
/// Pass arrived message to the safekeeper.
|
||||
pub async fn process_msg(
|
||||
&self,
|
||||
@@ -1062,6 +1097,13 @@ impl WalResidentTimeline {
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
// BEGIN HADRON
|
||||
// Errors from the `hadron_check_disk_usage()` function fail the process_msg() function, which
|
||||
// gets propagated upward and terminates the entire WalAcceptor. This will cause postgres to
|
||||
// disconnect from the safekeeper and reestablish another connection. Postgres will keep retrying
|
||||
// safekeeper connections every second until it can successfully propose WAL to the SK again.
|
||||
self.hadron_check_disk_usage(&mut shared_state)?;
|
||||
// END HADRON
|
||||
rmsg = shared_state.sk.safekeeper().process_msg(msg).await?;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback.
|
||||
|
||||
@@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantTimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
|
||||
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
|
||||
use crate::metrics::{
|
||||
BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS,
|
||||
};
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::timeline_manager::{Manager, StateSnapshot};
|
||||
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
|
||||
@@ -70,8 +72,9 @@ pub(crate) async fn update_task(
|
||||
need_backup: bool,
|
||||
state: &StateSnapshot,
|
||||
) {
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
|
||||
/* BEGIN_HADRON */
|
||||
let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state);
|
||||
/* END_HADRON */
|
||||
let elected_me = Some(mgr.conf.my_id) == offloader;
|
||||
|
||||
let should_task_run = need_backup && elected_me;
|
||||
@@ -127,6 +130,70 @@ async fn shut_down_task(entry: &mut Option<WalBackupTaskHandle>) {
|
||||
}
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
// On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much.
|
||||
// If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs.
|
||||
//
|
||||
// We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs.
|
||||
// wal_backup task continously failing to upload a full segment while the segment remains partial on the disk.
|
||||
// The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space.
|
||||
// See go/sk-ood-xlog-switch for more details.
|
||||
//
|
||||
// To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much.
|
||||
// Each SK makes the decision locally but they are aware of each other's commit and backup lsns.
|
||||
//
|
||||
// determine_offloader will pick a SK. say SK-1.
|
||||
// Each SK checks
|
||||
// -- if commit_lsn - back_lsn > threshold,
|
||||
// -- -- remove SK-1 from the candidate and call determine_offloader again.
|
||||
// SK-1 will step down and all SKs will elect the same leader again.
|
||||
// After the backup is caught up, the leader will become SK-1 again.
|
||||
fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<NodeId>, String) {
|
||||
let mut offloader: Option<NodeId>;
|
||||
let mut election_dbg_str: String;
|
||||
let caughtup_peers_count: usize;
|
||||
(offloader, election_dbg_str, caughtup_peers_count) =
|
||||
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
|
||||
|
||||
if offloader.is_none()
|
||||
|| caughtup_peers_count <= 1
|
||||
|| mgr.conf.max_reelect_offloader_lag_bytes == 0
|
||||
{
|
||||
return (offloader, election_dbg_str);
|
||||
}
|
||||
|
||||
let offloader_sk_id = offloader.unwrap();
|
||||
|
||||
let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
|
||||
if backup_lag.is_none() {
|
||||
info!("Backup lag is None. Skipping re-election.");
|
||||
return (offloader, election_dbg_str);
|
||||
}
|
||||
|
||||
let backup_lag = backup_lag.unwrap().0;
|
||||
|
||||
if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes {
|
||||
return (offloader, election_dbg_str);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}",
|
||||
backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str
|
||||
);
|
||||
BACKUP_REELECT_LEADER_COUNT.inc();
|
||||
// Remove the current offloader if lag is too high.
|
||||
let new_peers: Vec<_> = state
|
||||
.peers
|
||||
.iter()
|
||||
.filter(|p| p.sk_id != offloader_sk_id)
|
||||
.cloned()
|
||||
.collect();
|
||||
(offloader, election_dbg_str, _) =
|
||||
determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
|
||||
(offloader, election_dbg_str)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/// The goal is to ensure that normally only one safekeepers offloads. However,
|
||||
/// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short
|
||||
/// time we have several ones as they PUT the same files. Also,
|
||||
@@ -141,13 +208,13 @@ fn determine_offloader(
|
||||
wal_backup_lsn: Lsn,
|
||||
ttid: TenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
) -> (Option<NodeId>, String) {
|
||||
) -> (Option<NodeId>, String, usize) {
|
||||
// TODO: remove this once we fill newly joined safekeepers since backup_lsn.
|
||||
let capable_peers = alive_peers
|
||||
.iter()
|
||||
.filter(|p| p.local_start_lsn <= wal_backup_lsn);
|
||||
match capable_peers.clone().map(|p| p.commit_lsn).max() {
|
||||
None => (None, "no connected peers to elect from".to_string()),
|
||||
None => (None, "no connected peers to elect from".to_string(), 0),
|
||||
Some(max_commit_lsn) => {
|
||||
let threshold = max_commit_lsn
|
||||
.checked_sub(conf.max_offloader_lag_bytes)
|
||||
@@ -175,6 +242,7 @@ fn determine_offloader(
|
||||
capable_peers_dbg,
|
||||
caughtup_peers.len()
|
||||
),
|
||||
caughtup_peers.len(),
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -346,6 +414,8 @@ async fn backup_lsn_range(
|
||||
anyhow::bail!("parallel_jobs must be >= 1");
|
||||
}
|
||||
|
||||
pausable_failpoint!("backup-lsn-range-pausable");
|
||||
|
||||
let remote_timeline_path = &timeline.remote_path;
|
||||
let start_lsn = *backup_lsn;
|
||||
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use safekeeper_api::Term;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::send_wal::EndWatch;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_storage::WalReader;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use safekeeper_api::Term;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub(crate) struct WalBytes {
|
||||
@@ -37,6 +37,8 @@ struct PositionedWalReader {
|
||||
pub(crate) struct StreamingWalReader {
|
||||
stream: BoxStream<'static, WalOrReset>,
|
||||
start_changed_tx: tokio::sync::watch::Sender<Lsn>,
|
||||
// HADRON: Added TenantTimelineId for instrumentation purposes.
|
||||
pub(crate) ttid: TenantTimelineId,
|
||||
}
|
||||
|
||||
pub(crate) enum WalOrReset {
|
||||
@@ -63,6 +65,7 @@ impl StreamingWalReader {
|
||||
buffer_size: usize,
|
||||
) -> Self {
|
||||
let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
|
||||
let ttid = tli.ttid;
|
||||
|
||||
let state = WalReaderStreamState {
|
||||
tli,
|
||||
@@ -107,6 +110,7 @@ impl StreamingWalReader {
|
||||
Self {
|
||||
stream,
|
||||
start_changed_tx,
|
||||
ttid,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,8 @@ use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
|
||||
REMOVED_WAL_SEGMENTS, WAL_DISK_IO_ERRORS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics,
|
||||
time_io_closure,
|
||||
};
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
|
||||
@@ -293,9 +294,12 @@ impl PhysicalStorage {
|
||||
// half initialized segment, first bake it under tmp filename and
|
||||
// then rename.
|
||||
let tmp_path = self.timeline_dir.join("waltmp");
|
||||
let file = File::create(&tmp_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
|
||||
let file: File = File::create(&tmp_path).await.with_context(|| {
|
||||
/* BEGIN_HADRON */
|
||||
WAL_DISK_IO_ERRORS.inc();
|
||||
/* END_HADRON */
|
||||
format!("Failed to open tmp wal file {:?}", &tmp_path)
|
||||
})?;
|
||||
|
||||
fail::fail_point!("sk-zero-segment", |_| {
|
||||
info!("sk-zero-segment failpoint hit");
|
||||
@@ -382,7 +386,11 @@ impl PhysicalStorage {
|
||||
|
||||
let flushed = self
|
||||
.write_in_segment(segno, xlogoff, &buf[..bytes_write])
|
||||
.await?;
|
||||
.await
|
||||
/* BEGIN_HADRON */
|
||||
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
|
||||
/* END_HADRON */
|
||||
|
||||
self.write_lsn += bytes_write as u64;
|
||||
if flushed {
|
||||
self.flush_lsn = self.write_lsn;
|
||||
@@ -491,7 +499,11 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
|
||||
if let Some(unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
self.fdatasync_file(&unflushed_file)
|
||||
.await
|
||||
/* BEGIN_HADRON */
|
||||
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
|
||||
/* END_HADRON */
|
||||
self.file = Some(unflushed_file);
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file. This
|
||||
|
||||
@@ -159,6 +159,10 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
heartbeat_timeout: Duration::from_secs(0),
|
||||
remote_storage: None,
|
||||
max_offloader_lag_bytes: 0,
|
||||
/* BEGIN_HADRON */
|
||||
max_reelect_offloader_lag_bytes: 0,
|
||||
max_timeline_disk_usage_bytes: 0,
|
||||
/* END_HADRON */
|
||||
wal_backup_enabled: false,
|
||||
listen_pg_addr_tenant_only: None,
|
||||
advertise_pg_addr: None,
|
||||
|
||||
@@ -2740,3 +2740,85 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
|
||||
raise Exception("Uneviction did not happen on source safekeeper yet")
|
||||
|
||||
wait_until(unevicted)
|
||||
|
||||
|
||||
def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that the timeline disk usage circuit breaker works as expected. We test that:
|
||||
1. The circuit breaker kicks in when the timeline's disk usage exceeds the configured limit,
|
||||
and it causes writes to hang.
|
||||
2. The hanging writes unblock when the issue resolves (by restarting the safekeeper in the
|
||||
test to simulate a more realistic production troubleshooting scenario).
|
||||
3. We can continue to write as normal after the issue resolves.
|
||||
4. There is no data corruption throughout the test.
|
||||
"""
|
||||
# Set up environment with a very small disk usage limit (1KB)
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
|
||||
|
||||
# Set a very small disk usage limit (1KB)
|
||||
neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"]
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Create a timeline and endpoint
|
||||
env.create_branch("test_timeline_disk_usage_limit")
|
||||
endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit")
|
||||
|
||||
# Get the safekeeper
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# Inject a failpoint to stop WAL backup
|
||||
with sk.http_client() as http_cli:
|
||||
http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")])
|
||||
|
||||
# Write some data that will exceed the 1KB limit. While the failpoint is active, this operation
|
||||
# will hang as Postgres encounters safekeeper-returned errors and retries.
|
||||
def run_hanging_insert():
|
||||
with closing(endpoint.connect()) as bg_conn:
|
||||
with bg_conn.cursor() as bg_cur:
|
||||
# This should generate more than 1KB of WAL
|
||||
bg_cur.execute("create table t(key int, value text)")
|
||||
bg_cur.execute("insert into t select generate_series(1,2000), 'payload'")
|
||||
|
||||
# Start the inserts in a background thread
|
||||
bg_thread = threading.Thread(target=run_hanging_insert)
|
||||
bg_thread.start()
|
||||
|
||||
# Wait for the error message to appear in the compute log
|
||||
def error_logged():
|
||||
return endpoint.log_contains("WAL storage utilization exceeds configured limit") is not None
|
||||
|
||||
wait_until(error_logged)
|
||||
log.info("Found expected error message in compute log, resuming.")
|
||||
|
||||
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
|
||||
# implemented didn't work as expected.
|
||||
time.sleep(2)
|
||||
assert bg_thread.is_alive(), (
|
||||
"The hanging insert somehow unblocked without resolving the disk usage issue!"
|
||||
)
|
||||
|
||||
log.info("Restarting the safekeeper to resume WAL backup.")
|
||||
# Restart the safekeeper with defaults to both clear the failpoint and resume the larger disk usage limit.
|
||||
for sk in env.safekeepers:
|
||||
sk.stop().start(extra_opts=[])
|
||||
|
||||
# The hanging insert will now complete. Join the background thread so that we can
|
||||
# verify that the insert completed successfully.
|
||||
bg_thread.join(timeout=120)
|
||||
assert not bg_thread.is_alive(), "Hanging insert did not complete after safekeeper restart"
|
||||
log.info("Hanging insert unblocked.")
|
||||
|
||||
# Verify we can continue to write as normal
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("insert into t select generate_series(2001,3000), 'payload'")
|
||||
|
||||
# Sanity check data correctness
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select count(*) from t")
|
||||
# 2000 rows from first insert + 1000 from last insert
|
||||
assert cur.fetchone() == (3000,)
|
||||
|
||||
Reference in New Issue
Block a user