diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 0ac8201795..5828a400a0 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -24,12 +24,28 @@ macro_rules! critical { if cfg!(debug_assertions) { panic!($($arg)*); } + // Increment both metrics $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical(); let backtrace = std::backtrace::Backtrace::capture(); tracing::error!("CRITICAL: {}\n{backtrace}", format!($($arg)*)); }}; } +#[macro_export] +macro_rules! critical_timeline { + ($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{ + if cfg!(debug_assertions) { + panic!($($arg)*); + } + // Increment both metrics + $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical(); + $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string()); + let backtrace = std::backtrace::Backtrace::capture(); + tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}", + $tenant_shard_id, $timeline_id, format!($($arg)*)); + }}; +} + #[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum LogFormat { @@ -61,6 +77,36 @@ pub struct TracingEventCountMetric { trace: IntCounter, } +// Begin Hadron: Add a HadronCriticalStorageEventCountMetric metric that is sliced by tenant_id and timeline_id +pub struct HadronCriticalStorageEventCountMetric { + critical: IntCounterVec, +} + +pub static HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC: Lazy = + Lazy::new(|| { + let vec = metrics::register_int_counter_vec!( + "hadron_critical_storage_event_count", + "Number of critical storage events, by tenant_id and timeline_id", + &["tenant_shard_id", "timeline_id"] + ) + .expect("failed to define metric"); + HadronCriticalStorageEventCountMetric::new(vec) + }); + +impl HadronCriticalStorageEventCountMetric { + fn new(vec: IntCounterVec) -> Self { + Self { critical: vec } + } + + // Allow public access from `critical!` macro. + pub fn inc(&self, tenant_shard_id: &str, timeline_id: &str) { + self.critical + .with_label_values(&[tenant_shard_id, timeline_id]) + .inc(); + } +} +// End Hadron + pub static TRACING_EVENT_COUNT_METRIC: Lazy = Lazy::new(|| { let vec = metrics::register_int_counter_vec!( "libmetrics_tracing_event_count", diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 443fb7fafb..84fb9a96af 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -78,7 +78,7 @@ use utils::rate_limit::RateLimit; use utils::seqwait::SeqWait; use utils::simple_rcu::{Rcu, RcuReadGuard}; use utils::sync::gate::{Gate, GateGuard}; -use utils::{completion, critical, fs_ext, pausable_failpoint}; +use utils::{completion, critical_timeline, fs_ext, pausable_failpoint}; #[cfg(test)] use wal_decoder::models::value::Value; use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta}; @@ -6819,7 +6819,11 @@ impl Timeline { Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled), Err(walredo::Error::Other(err)) => { if fire_critical_error { - critical!("walredo failure during page reconstruction: {err:?}"); + critical_timeline!( + self.tenant_shard_id, + self.timeline_id, + "walredo failure during page reconstruction: {err:?}" + ); } return Err(PageReconstructError::WalRedo( err.context("reconstruct a page image"), diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9b64938b3e..ac3930fb71 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -36,7 +36,7 @@ use serde::Serialize; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, trace, warn}; -use utils::critical; +use utils::critical_timeline; use utils::id::TimelineId; use utils::lsn::Lsn; use wal_decoder::models::record::NeonWalRecord; @@ -1390,7 +1390,11 @@ impl Timeline { GetVectoredError::MissingKey(_), ) = err { - critical!("missing key during compaction: {err:?}"); + critical_timeline!( + self.tenant_shard_id, + self.timeline_id, + "missing key during compaction: {err:?}" + ); } })?; @@ -1418,7 +1422,11 @@ impl Timeline { // Alert on critical errors that indicate data corruption. Err(err) if err.is_critical() => { - critical!("could not compact, repartitioning keyspace failed: {err:?}"); + critical_timeline!( + self.tenant_shard_id, + self.timeline_id, + "could not compact, repartitioning keyspace failed: {err:?}" + ); } // Log other errors. No partitioning? This is normal, if the timeline was just created diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 6d52da1f00..f8b0849c73 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -25,7 +25,7 @@ use tokio_postgres::replication::ReplicationStream; use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, trace, warn}; -use utils::critical; +use utils::critical_timeline; use utils::id::NodeId; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; @@ -368,9 +368,13 @@ pub(super) async fn handle_walreceiver_connection( match raw_wal_start_lsn.cmp(&expected_wal_start) { std::cmp::Ordering::Greater => { let msg = format!( - "Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn})" + "Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn}" + ); + critical_timeline!( + timeline.tenant_shard_id, + timeline.timeline_id, + "{msg}" ); - critical!("{msg}"); return Err(WalReceiverError::Other(anyhow!(msg))); } std::cmp::Ordering::Less => { @@ -383,7 +387,11 @@ pub(super) async fn handle_walreceiver_connection( "Received record with next_record_lsn multiple times ({} < {})", first_rec.next_record_lsn, expected_wal_start ); - critical!("{msg}"); + critical_timeline!( + timeline.tenant_shard_id, + timeline.timeline_id, + "{msg}" + ); return Err(WalReceiverError::Other(anyhow!(msg))); } } @@ -452,7 +460,11 @@ pub(super) async fn handle_walreceiver_connection( // TODO: we can't differentiate cancellation errors with // anyhow::Error, so just ignore it if we're cancelled. if !cancellation.is_cancelled() && !timeline.is_stopping() { - critical!("{err:?}") + critical_timeline!( + timeline.tenant_shard_id, + timeline.timeline_id, + "{err:?}" + ); } })?; diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index a597aedee3..f852051178 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -40,7 +40,7 @@ use tracing::*; use utils::bin_ser::{DeserializeError, SerializeError}; use utils::lsn::Lsn; use utils::rate_limit::RateLimit; -use utils::{critical, failpoint_support}; +use utils::{critical_timeline, failpoint_support}; use wal_decoder::models::record::NeonWalRecord; use wal_decoder::models::*; @@ -418,18 +418,30 @@ impl WalIngest { // as there has historically been cases where PostgreSQL has cleared spurious VM pages. See: // https://github.com/neondatabase/neon/pull/10634. let Some(vm_size) = get_relsize(modification, vm_rel, ctx).await? else { - critical!("clear_vm_bits for unknown VM relation {vm_rel}"); + critical_timeline!( + modification.tline.tenant_shard_id, + modification.tline.timeline_id, + "clear_vm_bits for unknown VM relation {vm_rel}" + ); return Ok(()); }; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { - critical!("new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"); + critical_timeline!( + modification.tline.tenant_shard_id, + modification.tline.timeline_id, + "new_vm_blk {blknum} not in {vm_rel} of size {vm_size}" + ); new_vm_blk = None; } } if let Some(blknum) = old_vm_blk { if blknum >= vm_size { - critical!("old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"); + critical_timeline!( + modification.tline.tenant_shard_id, + modification.tline.timeline_id, + "old_vm_blk {blknum} not in {vm_rel} of size {vm_size}" + ); old_vm_blk = None; } } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8d31ada24f..8fda625817 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -18,9 +18,10 @@ use metrics::set_build_info_metric; use remote_storage::RemoteStorageConfig; use safekeeper::defaults::{ DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT, - DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY, - DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE, - DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, + DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, + DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES, + DEFAULT_PARTIAL_BACKUP_CONCURRENCY, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, + DEFAULT_SSL_CERT_FILE, DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE, }; use safekeeper::wal_backup::WalBackup; use safekeeper::{ @@ -138,6 +139,15 @@ struct Args { /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)] max_offloader_lag: u64, + /* BEGIN_HADRON */ + /// Safekeeper will re-elect a new offloader if the current backup lagging for more than this value in bytes + #[arg(long, default_value_t = DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES)] + max_reelect_offloader_lag_bytes: u64, + /// Safekeeper will stop accepting new WALs if the timeline disk usage exceeds this value in bytes. + /// Setting this value to 0 disables the limit. + #[arg(long, default_value_t = DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES)] + max_timeline_disk_usage_bytes: u64, + /* END_HADRON */ /// Number of max parallel WAL segments to be offloaded to remote storage. #[arg(long, default_value = "5")] wal_backup_parallel_jobs: usize, @@ -391,6 +401,10 @@ async fn main() -> anyhow::Result<()> { peer_recovery_enabled: args.peer_recovery, remote_storage: args.remote_storage, max_offloader_lag_bytes: args.max_offloader_lag, + /* BEGIN_HADRON */ + max_reelect_offloader_lag_bytes: args.max_reelect_offloader_lag_bytes, + max_timeline_disk_usage_bytes: args.max_timeline_disk_usage_bytes, + /* END_HADRON */ wal_backup_enabled: !args.disable_wal_backup, backup_parallel_jobs: args.wal_backup_parallel_jobs, pg_auth, diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 4fc62fb229..76c2223891 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -17,6 +17,7 @@ use utils::crashsafe::durable_rename; use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file}; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; +use crate::metrics::WAL_DISK_IO_ERRORS; use crate::state::{EvictionState, TimelinePersistentState}; pub const SK_MAGIC: u32 = 0xcafeceefu32; @@ -192,11 +193,14 @@ impl TimelinePersistentState { impl Storage for FileStorage { /// Persists state durably to the underlying storage. async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> { + // start timer for metrics let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); - // write data to safekeeper.control.partial let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); let mut control_partial = File::create(&control_partial_path).await.with_context(|| { + /* BEGIN_HADRON */ + WAL_DISK_IO_ERRORS.inc(); + /*END_HADRON */ format!( "failed to create partial control file at: {}", &control_partial_path @@ -206,14 +210,24 @@ impl Storage for FileStorage { let buf: Vec = s.write_to_buf()?; control_partial.write_all(&buf).await.with_context(|| { + /* BEGIN_HADRON */ + WAL_DISK_IO_ERRORS.inc(); + /*END_HADRON */ format!("failed to write safekeeper state into control file at: {control_partial_path}") })?; control_partial.flush().await.with_context(|| { + /* BEGIN_HADRON */ + WAL_DISK_IO_ERRORS.inc(); + /*END_HADRON */ format!("failed to flush safekeeper state into control file at: {control_partial_path}") })?; let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - durable_rename(&control_partial_path, &control_path, !self.no_sync).await?; + durable_rename(&control_partial_path, &control_path, !self.no_sync) + .await + /* BEGIN_HADRON */ + .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?; + /* END_HADRON */ // update internal state self.state = s.clone(); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index b4d9cadd6d..c461c071da 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -61,6 +61,13 @@ pub mod defaults { pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms"; pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); + /* BEGIN_HADRON */ + // Default leader re-elect is 0(disabled). SK will re-elect leader if the current leader is lagging this many bytes. + pub const DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES: u64 = 0; + // Default disk usage limit is 0 (disabled). It means each timeline by default can use up to this many WAL + // disk space on this SK until SK begins to reject WALs. + pub const DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES: u64 = 0; + /* END_HADRON */ pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m"; pub const DEFAULT_CONTROL_FILE_SAVE_INTERVAL: &str = "300s"; pub const DEFAULT_PARTIAL_BACKUP_CONCURRENCY: &str = "5"; @@ -99,6 +106,10 @@ pub struct SafeKeeperConf { pub peer_recovery_enabled: bool, pub remote_storage: Option, pub max_offloader_lag_bytes: u64, + /* BEGIN_HADRON */ + pub max_reelect_offloader_lag_bytes: u64, + pub max_timeline_disk_usage_bytes: u64, + /* END_HADRON */ pub backup_parallel_jobs: usize, pub wal_backup_enabled: bool, pub pg_auth: Option>, @@ -151,6 +162,10 @@ impl SafeKeeperConf { sk_auth_token: None, heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, + /* BEGIN_HADRON */ + max_reelect_offloader_lag_bytes: defaults::DEFAULT_MAX_REELECT_OFFLOADER_LAG_BYTES, + max_timeline_disk_usage_bytes: defaults::DEFAULT_MAX_TIMELINE_DISK_USAGE_BYTES, + /* END_HADRON */ current_thread_runtime: false, walsenders_keep_horizon: false, partial_backup_timeout: Duration::from_secs(0), diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index cb21a5f6d2..9baa80f73a 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -58,6 +58,25 @@ pub static FLUSH_WAL_SECONDS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_flush_wal_seconds histogram") }); +/* BEGIN_HADRON */ +pub static WAL_DISK_IO_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_wal_disk_io_errors", + "Number of disk I/O errors when creating and flushing WALs and control files" + ) + .expect("Failed to register safekeeper_wal_disk_io_errors counter") +}); +pub static WAL_STORAGE_LIMIT_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_wal_storage_limit_errors", + concat!( + "Number of errors due to timeline WAL storage utilization exceeding configured limit. ", + "An increase in this metric indicates issues backing up or removing WALs." + ) + ) + .expect("Failed to register safekeeper_wal_storage_limit_errors counter") +}); +/* END_HADRON */ pub static PERSIST_CONTROL_FILE_SECONDS: Lazy = Lazy::new(|| { register_histogram!( "safekeeper_persist_control_file_seconds", @@ -138,6 +157,15 @@ pub static BACKUP_ERRORS: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_backup_errors_total counter") }); +/* BEGIN_HADRON */ +pub static BACKUP_REELECT_LEADER_COUNT: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_backup_reelect_leader_total", + "Number of times the backup leader was reelected" + ) + .expect("Failed to register safekeeper_backup_reelect_leader_total counter") +}); +/* END_HADRON */ pub static BROKER_PUSH_ALL_UPDATES_SECONDS: Lazy = Lazy::new(|| { register_histogram!( "safekeeper_broker_push_update_seconds", diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 2192f5eab4..3797ac39d1 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc::error::SendError; use tokio::task::JoinHandle; use tokio::time::MissedTickBehavior; use tracing::{Instrument, error, info, info_span}; -use utils::critical; +use utils::critical_timeline; use utils::lsn::Lsn; use utils::postgres_client::{Compression, InterpretedFormat}; use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords}; @@ -268,6 +268,8 @@ impl InterpretedWalReader { let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel(); + let ttid = wal_stream.ttid; + let reader = InterpretedWalReader { wal_stream, shard_senders: HashMap::from([( @@ -300,7 +302,11 @@ impl InterpretedWalReader { .inspect_err(|err| match err { // TODO: we may want to differentiate these errors further. InterpretedWalReaderError::Decode(_) => { - critical!("failed to decode WAL record: {err:?}"); + critical_timeline!( + ttid.tenant_id, + ttid.timeline_id, + "failed to read WAL record: {err:?}" + ); } err => error!("failed to read WAL record: {err}"), }) @@ -363,9 +369,14 @@ impl InterpretedWalReader { metric.dec(); } + let ttid = self.wal_stream.ttid; match self.run_impl(start_pos).await { Err(err @ InterpretedWalReaderError::Decode(_)) => { - critical!("failed to decode WAL record: {err:?}"); + critical_timeline!( + ttid.tenant_id, + ttid.timeline_id, + "failed to decode WAL record: {err:?}" + ); } Err(err) => error!("failed to read WAL record: {err}"), Ok(()) => info!("interpreted wal reader exiting"), diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 0a27876862..95b5fe6d5d 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantId, TenantTimelineId}; use utils::lsn::Lsn; use utils::sync::gate::Gate; -use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics}; +use crate::metrics::{ + FullTimelineInfo, MISC_OPERATION_SECONDS, WAL_STORAGE_LIMIT_ERRORS, WalStorageMetrics, +}; use crate::rate_limit::RateLimiter; use crate::receive_wal::WalReceivers; use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn}; @@ -1050,6 +1052,39 @@ impl WalResidentTimeline { Ok(ss) } + // BEGIN HADRON + // Check if disk usage by WAL segment files for this timeline exceeds the configured limit. + fn hadron_check_disk_usage( + &self, + shared_state_locked: &mut WriteGuardSharedState<'_>, + ) -> Result<()> { + // The disk usage is calculated based on the number of segments between `last_removed_segno` + // and the current flush LSN segment number. `last_removed_segno` is advanced after + // unneeded WAL files are physically removed from disk (see `update_wal_removal_end()` + // in `timeline_manager.rs`). + let max_timeline_disk_usage_bytes = self.conf.max_timeline_disk_usage_bytes; + if max_timeline_disk_usage_bytes > 0 { + let last_removed_segno = self.last_removed_segno.load(Ordering::Relaxed); + let flush_lsn = shared_state_locked.sk.flush_lsn(); + let wal_seg_size = shared_state_locked.sk.state().server.wal_seg_size as u64; + let current_segno = flush_lsn.segment_number(wal_seg_size as usize); + + let segno_count = current_segno - last_removed_segno; + let disk_usage_bytes = segno_count * wal_seg_size; + + if disk_usage_bytes > max_timeline_disk_usage_bytes { + WAL_STORAGE_LIMIT_ERRORS.inc(); + bail!( + "WAL storage utilization exceeds configured limit of {} bytes: current disk usage: {} bytes", + max_timeline_disk_usage_bytes, + disk_usage_bytes + ); + } + } + Ok(()) + } + // END HADRON + /// Pass arrived message to the safekeeper. pub async fn process_msg( &self, @@ -1062,6 +1097,13 @@ impl WalResidentTimeline { let mut rmsg: Option; { let mut shared_state = self.write_shared_state().await; + // BEGIN HADRON + // Errors from the `hadron_check_disk_usage()` function fail the process_msg() function, which + // gets propagated upward and terminates the entire WalAcceptor. This will cause postgres to + // disconnect from the safekeeper and reestablish another connection. Postgres will keep retrying + // safekeeper connections every second until it can successfully propose WAL to the SK again. + self.hadron_check_disk_usage(&mut shared_state)?; + // END HADRON rmsg = shared_state.sk.safekeeper().process_msg(msg).await?; // if this is AppendResponse, fill in proper hot standby feedback. diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0beb272a60..7e10847a1b 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -26,7 +26,9 @@ use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; use utils::{backoff, pausable_failpoint}; -use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS}; +use crate::metrics::{ + BACKED_UP_SEGMENTS, BACKUP_ERRORS, BACKUP_REELECT_LEADER_COUNT, WAL_BACKUP_TASKS, +}; use crate::timeline::WalResidentTimeline; use crate::timeline_manager::{Manager, StateSnapshot}; use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME}; @@ -70,8 +72,9 @@ pub(crate) async fn update_task( need_backup: bool, state: &StateSnapshot, ) { - let (offloader, election_dbg_str) = - determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf); + /* BEGIN_HADRON */ + let (offloader, election_dbg_str) = hadron_determine_offloader(mgr, state); + /* END_HADRON */ let elected_me = Some(mgr.conf.my_id) == offloader; let should_task_run = need_backup && elected_me; @@ -127,6 +130,70 @@ async fn shut_down_task(entry: &mut Option) { } } +/* BEGIN_HADRON */ +// On top of the neon determine_offloader, we also check if the current offloader is lagging behind too much. +// If it is, we re-elect a new offloader. This mitigates the below issue. It also helps distribute the load across SKs. +// +// We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs. +// wal_backup task continously failing to upload a full segment while the segment remains partial on the disk. +// The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space. +// See go/sk-ood-xlog-switch for more details. +// +// To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much. +// Each SK makes the decision locally but they are aware of each other's commit and backup lsns. +// +// determine_offloader will pick a SK. say SK-1. +// Each SK checks +// -- if commit_lsn - back_lsn > threshold, +// -- -- remove SK-1 from the candidate and call determine_offloader again. +// SK-1 will step down and all SKs will elect the same leader again. +// After the backup is caught up, the leader will become SK-1 again. +fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option, String) { + let mut offloader: Option; + let mut election_dbg_str: String; + let caughtup_peers_count: usize; + (offloader, election_dbg_str, caughtup_peers_count) = + determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf); + + if offloader.is_none() + || caughtup_peers_count <= 1 + || mgr.conf.max_reelect_offloader_lag_bytes == 0 + { + return (offloader, election_dbg_str); + } + + let offloader_sk_id = offloader.unwrap(); + + let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn); + if backup_lag.is_none() { + info!("Backup lag is None. Skipping re-election."); + return (offloader, election_dbg_str); + } + + let backup_lag = backup_lag.unwrap().0; + + if backup_lag < mgr.conf.max_reelect_offloader_lag_bytes { + return (offloader, election_dbg_str); + } + + info!( + "Electing a new leader: Backup lag is too high backup lsn lag {} threshold {}: {}", + backup_lag, mgr.conf.max_reelect_offloader_lag_bytes, election_dbg_str + ); + BACKUP_REELECT_LEADER_COUNT.inc(); + // Remove the current offloader if lag is too high. + let new_peers: Vec<_> = state + .peers + .iter() + .filter(|p| p.sk_id != offloader_sk_id) + .cloned() + .collect(); + (offloader, election_dbg_str, _) = + determine_offloader(&new_peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf); + (offloader, election_dbg_str) +} +/* END_HADRON */ + /// The goal is to ensure that normally only one safekeepers offloads. However, /// it is fine (and inevitable, as s3 doesn't provide CAS) that for some short /// time we have several ones as they PUT the same files. Also, @@ -141,13 +208,13 @@ fn determine_offloader( wal_backup_lsn: Lsn, ttid: TenantTimelineId, conf: &SafeKeeperConf, -) -> (Option, String) { +) -> (Option, String, usize) { // TODO: remove this once we fill newly joined safekeepers since backup_lsn. let capable_peers = alive_peers .iter() .filter(|p| p.local_start_lsn <= wal_backup_lsn); match capable_peers.clone().map(|p| p.commit_lsn).max() { - None => (None, "no connected peers to elect from".to_string()), + None => (None, "no connected peers to elect from".to_string(), 0), Some(max_commit_lsn) => { let threshold = max_commit_lsn .checked_sub(conf.max_offloader_lag_bytes) @@ -175,6 +242,7 @@ fn determine_offloader( capable_peers_dbg, caughtup_peers.len() ), + caughtup_peers.len(), ) } } @@ -346,6 +414,8 @@ async fn backup_lsn_range( anyhow::bail!("parallel_jobs must be >= 1"); } + pausable_failpoint!("backup-lsn-range-pausable"); + let remote_timeline_path = &timeline.remote_path; let start_lsn = *backup_lsn; let segments = get_segments(start_lsn, end_lsn, wal_seg_size); diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs index aab82fedb5..cba156888c 100644 --- a/safekeeper/src/wal_reader_stream.rs +++ b/safekeeper/src/wal_reader_stream.rs @@ -1,15 +1,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use bytes::Bytes; -use futures::stream::BoxStream; -use futures::{Stream, StreamExt}; -use safekeeper_api::Term; -use utils::lsn::Lsn; - use crate::send_wal::EndWatch; use crate::timeline::WalResidentTimeline; use crate::wal_storage::WalReader; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt}; +use safekeeper_api::Term; +use utils::id::TenantTimelineId; +use utils::lsn::Lsn; #[derive(PartialEq, Eq, Debug)] pub(crate) struct WalBytes { @@ -37,6 +37,8 @@ struct PositionedWalReader { pub(crate) struct StreamingWalReader { stream: BoxStream<'static, WalOrReset>, start_changed_tx: tokio::sync::watch::Sender, + // HADRON: Added TenantTimelineId for instrumentation purposes. + pub(crate) ttid: TenantTimelineId, } pub(crate) enum WalOrReset { @@ -63,6 +65,7 @@ impl StreamingWalReader { buffer_size: usize, ) -> Self { let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start); + let ttid = tli.ttid; let state = WalReaderStreamState { tli, @@ -107,6 +110,7 @@ impl StreamingWalReader { Self { stream, start_changed_tx, + ttid, } } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 70e53d86ee..9864ac3229 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -31,7 +31,8 @@ use utils::id::TenantTimelineId; use utils::lsn::Lsn; use crate::metrics::{ - REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure, + REMOVED_WAL_SEGMENTS, WAL_DISK_IO_ERRORS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, + time_io_closure, }; use crate::state::TimelinePersistentState; use crate::wal_backup::{WalBackup, read_object, remote_timeline_path}; @@ -293,9 +294,12 @@ impl PhysicalStorage { // half initialized segment, first bake it under tmp filename and // then rename. let tmp_path = self.timeline_dir.join("waltmp"); - let file = File::create(&tmp_path) - .await - .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?; + let file: File = File::create(&tmp_path).await.with_context(|| { + /* BEGIN_HADRON */ + WAL_DISK_IO_ERRORS.inc(); + /* END_HADRON */ + format!("Failed to open tmp wal file {:?}", &tmp_path) + })?; fail::fail_point!("sk-zero-segment", |_| { info!("sk-zero-segment failpoint hit"); @@ -382,7 +386,11 @@ impl PhysicalStorage { let flushed = self .write_in_segment(segno, xlogoff, &buf[..bytes_write]) - .await?; + .await + /* BEGIN_HADRON */ + .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?; + /* END_HADRON */ + self.write_lsn += bytes_write as u64; if flushed { self.flush_lsn = self.write_lsn; @@ -491,7 +499,11 @@ impl Storage for PhysicalStorage { } if let Some(unflushed_file) = self.file.take() { - self.fdatasync_file(&unflushed_file).await?; + self.fdatasync_file(&unflushed_file) + .await + /* BEGIN_HADRON */ + .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?; + /* END_HADRON */ self.file = Some(unflushed_file); } else { // We have unflushed data (write_lsn != flush_lsn), but no file. This diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 1fdf8e4949..1f6990c682 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -159,6 +159,10 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { heartbeat_timeout: Duration::from_secs(0), remote_storage: None, max_offloader_lag_bytes: 0, + /* BEGIN_HADRON */ + max_reelect_offloader_lag_bytes: 0, + max_timeline_disk_usage_bytes: 0, + /* END_HADRON */ wal_backup_enabled: false, listen_pg_addr_tenant_only: None, advertise_pg_addr: None, diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ea120c1814..22e6d2e1c3 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2740,3 +2740,85 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde raise Exception("Uneviction did not happen on source safekeeper yet") wait_until(unevicted) + + +def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): + """ + Test that the timeline disk usage circuit breaker works as expected. We test that: + 1. The circuit breaker kicks in when the timeline's disk usage exceeds the configured limit, + and it causes writes to hang. + 2. The hanging writes unblock when the issue resolves (by restarting the safekeeper in the + test to simulate a more realistic production troubleshooting scenario). + 3. We can continue to write as normal after the issue resolves. + 4. There is no data corruption throughout the test. + """ + # Set up environment with a very small disk usage limit (1KB) + neon_env_builder.num_safekeepers = 1 + remote_storage_kind = s3_storage() + neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind) + + # Set a very small disk usage limit (1KB) + neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"] + + env = neon_env_builder.init_start() + + # Create a timeline and endpoint + env.create_branch("test_timeline_disk_usage_limit") + endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit") + + # Get the safekeeper + sk = env.safekeepers[0] + + # Inject a failpoint to stop WAL backup + with sk.http_client() as http_cli: + http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")]) + + # Write some data that will exceed the 1KB limit. While the failpoint is active, this operation + # will hang as Postgres encounters safekeeper-returned errors and retries. + def run_hanging_insert(): + with closing(endpoint.connect()) as bg_conn: + with bg_conn.cursor() as bg_cur: + # This should generate more than 1KB of WAL + bg_cur.execute("create table t(key int, value text)") + bg_cur.execute("insert into t select generate_series(1,2000), 'payload'") + + # Start the inserts in a background thread + bg_thread = threading.Thread(target=run_hanging_insert) + bg_thread.start() + + # Wait for the error message to appear in the compute log + def error_logged(): + return endpoint.log_contains("WAL storage utilization exceeds configured limit") is not None + + wait_until(error_logged) + log.info("Found expected error message in compute log, resuming.") + + # Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we + # implemented didn't work as expected. + time.sleep(2) + assert bg_thread.is_alive(), ( + "The hanging insert somehow unblocked without resolving the disk usage issue!" + ) + + log.info("Restarting the safekeeper to resume WAL backup.") + # Restart the safekeeper with defaults to both clear the failpoint and resume the larger disk usage limit. + for sk in env.safekeepers: + sk.stop().start(extra_opts=[]) + + # The hanging insert will now complete. Join the background thread so that we can + # verify that the insert completed successfully. + bg_thread.join(timeout=120) + assert not bg_thread.is_alive(), "Hanging insert did not complete after safekeeper restart" + log.info("Hanging insert unblocked.") + + # Verify we can continue to write as normal + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("insert into t select generate_series(2001,3000), 'payload'") + + # Sanity check data correctness + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("select count(*) from t") + # 2000 rows from first insert + 1000 from last insert + assert cur.fetchone() == (3000,)