From 03f8a42ed9d5eba142c162000f69bef8bf239b70 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 21 Feb 2024 19:09:40 +0000 Subject: [PATCH] Add walsenders_keep_horizon option (#6860) Add `--walsenders-keep-horizon` argument to safekeeper cmdline. It will prevent deleting WAL segments from disk if they are needed by the active START_REPLICATION connection. This is useful for sharding. Without this option, if one of the shard falls behind, it starts to read WAL from S3, which is much slower than disk. This can result in huge shard lagging. --- safekeeper/src/bin/safekeeper.rs | 5 +++ safekeeper/src/lib.rs | 2 + safekeeper/src/safekeeper.rs | 20 +-------- safekeeper/src/send_wal.rs | 15 +++++++ safekeeper/src/timeline.rs | 43 ++++++++++++++++++- .../tests/walproposer_sim/safekeeper.rs | 1 + 6 files changed, 67 insertions(+), 19 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 33047051df..3c4c81e499 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -166,6 +166,10 @@ struct Args { /// useful for debugging. #[arg(long)] current_thread_runtime: bool, + /// Keep horizon for walsenders, i.e. don't remove WAL segments that are + /// still needed for existing replication connection. + #[arg(long)] + walsenders_keep_horizon: bool, } // Like PathBufValueParser, but allows empty string. @@ -295,6 +299,7 @@ async fn main() -> anyhow::Result<()> { pg_tenant_only_auth, http_auth, current_thread_runtime: args.current_thread_runtime, + walsenders_keep_horizon: args.walsenders_keep_horizon, }; // initialize sentry if SENTRY_DSN is provided diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 27b80fcbe8..ce4b4d7bd0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -78,6 +78,7 @@ pub struct SafeKeeperConf { pub pg_tenant_only_auth: Option>, pub http_auth: Option>, pub current_thread_runtime: bool, + pub walsenders_keep_horizon: bool, } impl SafeKeeperConf { @@ -121,6 +122,7 @@ impl SafeKeeperConf { heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, current_thread_runtime: false, + walsenders_keep_horizon: false, } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d66db9b652..84393d8dab 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE}; +use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -946,28 +946,12 @@ where } Ok(()) } - - /// Get oldest segno we still need to keep. We hold WAL till it is consumed - /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 - /// offloading. - /// While it is safe to use inmem values for determining horizon, - /// we use persistent to make possible normal states less surprising. - pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo { - let mut horizon_lsn = min( - self.state.remote_consistent_lsn, - self.state.peer_horizon_lsn, - ); - if wal_backup_enabled { - horizon_lsn = min(horizon_lsn, self.state.backup_lsn); - } - horizon_lsn.segment_number(self.state.server.wal_seg_size as usize) - } } #[cfg(test)] mod tests { use futures::future::BoxFuture; - use postgres_ffi::WAL_SEGMENT_SIZE; + use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; use super::*; use crate::{ diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index ee3e4c8ead..4b887f36b7 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -136,6 +136,21 @@ impl WalSenders { self.mutex.lock().slots.iter().flatten().cloned().collect() } + /// Get LSN of the most lagging pageserver receiver. Return None if there are no + /// active walsenders. + pub fn laggard_lsn(self: &Arc) -> Option { + self.mutex + .lock() + .slots + .iter() + .flatten() + .filter_map(|s| match s.feedback { + ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn), + ReplicationFeedback::Standby(_) => None, + }) + .min() + } + /// Get aggregated pageserver feedback. pub fn get_ps_feedback(self: &Arc) -> PageserverFeedback { self.mutex.lock().agg_ps_feedback diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 730a80a583..9b7ab14218 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -286,6 +286,29 @@ impl SharedState { .cloned() .collect() } + + /// Get oldest segno we still need to keep. We hold WAL till it is consumed + /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 + /// offloading. + /// While it is safe to use inmem values for determining horizon, + /// we use persistent to make possible normal states less surprising. + fn get_horizon_segno( + &self, + wal_backup_enabled: bool, + extra_horizon_lsn: Option, + ) -> XLogSegNo { + let state = &self.sk.state; + + use std::cmp::min; + let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn); + if wal_backup_enabled { + horizon_lsn = min(horizon_lsn, state.backup_lsn); + } + if let Some(extra_horizon_lsn) = extra_horizon_lsn { + horizon_lsn = min(horizon_lsn, extra_horizon_lsn); + } + horizon_lsn.segment_number(state.server.wal_seg_size as usize) + } } #[derive(Debug, thiserror::Error)] @@ -353,6 +376,12 @@ pub struct Timeline { /// Directory where timeline state is stored. pub timeline_dir: Utf8PathBuf, + + /// Should we keep WAL on disk for active replication connections. + /// Especially useful for sharding, when different shards process WAL + /// with different speed. + // TODO: add `Arc` here instead of adding each field separately. + walsenders_keep_horizon: bool, } impl Timeline { @@ -386,6 +415,7 @@ impl Timeline { cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), + walsenders_keep_horizon: conf.walsenders_keep_horizon, }) } @@ -418,6 +448,7 @@ impl Timeline { cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), + walsenders_keep_horizon: conf.walsenders_keep_horizon, }) } @@ -817,10 +848,20 @@ impl Timeline { bail!(TimelineError::Cancelled(self.ttid)); } + // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon. + // This allows to get better read speed for pageservers that are lagging behind, + // at the cost of keeping more WAL on disk. + let replication_horizon_lsn = if self.walsenders_keep_horizon { + self.walsenders.laggard_lsn() + } else { + None + }; + let horizon_segno: XLogSegNo; let remover = { let shared_state = self.write_shared_state().await; - horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); + horizon_segno = + shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { return Ok(()); // nothing to do } diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 1945b9d0cb..e3aaf5d391 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -175,6 +175,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { pg_tenant_only_auth: None, http_auth: None, current_thread_runtime: false, + walsenders_keep_horizon: false, }; let mut global = GlobalMap::new(disk, conf.clone())?;