diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 33047051df..3c4c81e499 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -166,6 +166,10 @@ struct Args { /// useful for debugging. #[arg(long)] current_thread_runtime: bool, + /// Keep horizon for walsenders, i.e. don't remove WAL segments that are + /// still needed for existing replication connection. + #[arg(long)] + walsenders_keep_horizon: bool, } // Like PathBufValueParser, but allows empty string. @@ -295,6 +299,7 @@ async fn main() -> anyhow::Result<()> { pg_tenant_only_auth, http_auth, current_thread_runtime: args.current_thread_runtime, + walsenders_keep_horizon: args.walsenders_keep_horizon, }; // initialize sentry if SENTRY_DSN is provided diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 27b80fcbe8..ce4b4d7bd0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -78,6 +78,7 @@ pub struct SafeKeeperConf { pub pg_tenant_only_auth: Option>, pub http_auth: Option>, pub current_thread_runtime: bool, + pub walsenders_keep_horizon: bool, } impl SafeKeeperConf { @@ -121,6 +122,7 @@ impl SafeKeeperConf { heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, current_thread_runtime: false, + walsenders_keep_horizon: false, } } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d66db9b652..84393d8dab 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE}; +use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -946,28 +946,12 @@ where } Ok(()) } - - /// Get oldest segno we still need to keep. We hold WAL till it is consumed - /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 - /// offloading. - /// While it is safe to use inmem values for determining horizon, - /// we use persistent to make possible normal states less surprising. - pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo { - let mut horizon_lsn = min( - self.state.remote_consistent_lsn, - self.state.peer_horizon_lsn, - ); - if wal_backup_enabled { - horizon_lsn = min(horizon_lsn, self.state.backup_lsn); - } - horizon_lsn.segment_number(self.state.server.wal_seg_size as usize) - } } #[cfg(test)] mod tests { use futures::future::BoxFuture; - use postgres_ffi::WAL_SEGMENT_SIZE; + use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; use super::*; use crate::{ diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index ee3e4c8ead..4b887f36b7 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -136,6 +136,21 @@ impl WalSenders { self.mutex.lock().slots.iter().flatten().cloned().collect() } + /// Get LSN of the most lagging pageserver receiver. Return None if there are no + /// active walsenders. + pub fn laggard_lsn(self: &Arc) -> Option { + self.mutex + .lock() + .slots + .iter() + .flatten() + .filter_map(|s| match s.feedback { + ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn), + ReplicationFeedback::Standby(_) => None, + }) + .min() + } + /// Get aggregated pageserver feedback. pub fn get_ps_feedback(self: &Arc) -> PageserverFeedback { self.mutex.lock().agg_ps_feedback diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 730a80a583..9b7ab14218 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -286,6 +286,29 @@ impl SharedState { .cloned() .collect() } + + /// Get oldest segno we still need to keep. We hold WAL till it is consumed + /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 + /// offloading. + /// While it is safe to use inmem values for determining horizon, + /// we use persistent to make possible normal states less surprising. + fn get_horizon_segno( + &self, + wal_backup_enabled: bool, + extra_horizon_lsn: Option, + ) -> XLogSegNo { + let state = &self.sk.state; + + use std::cmp::min; + let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn); + if wal_backup_enabled { + horizon_lsn = min(horizon_lsn, state.backup_lsn); + } + if let Some(extra_horizon_lsn) = extra_horizon_lsn { + horizon_lsn = min(horizon_lsn, extra_horizon_lsn); + } + horizon_lsn.segment_number(state.server.wal_seg_size as usize) + } } #[derive(Debug, thiserror::Error)] @@ -353,6 +376,12 @@ pub struct Timeline { /// Directory where timeline state is stored. pub timeline_dir: Utf8PathBuf, + + /// Should we keep WAL on disk for active replication connections. + /// Especially useful for sharding, when different shards process WAL + /// with different speed. + // TODO: add `Arc` here instead of adding each field separately. + walsenders_keep_horizon: bool, } impl Timeline { @@ -386,6 +415,7 @@ impl Timeline { cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), + walsenders_keep_horizon: conf.walsenders_keep_horizon, }) } @@ -418,6 +448,7 @@ impl Timeline { cancellation_rx, cancellation_tx, timeline_dir: conf.timeline_dir(&ttid), + walsenders_keep_horizon: conf.walsenders_keep_horizon, }) } @@ -817,10 +848,20 @@ impl Timeline { bail!(TimelineError::Cancelled(self.ttid)); } + // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon. + // This allows to get better read speed for pageservers that are lagging behind, + // at the cost of keeping more WAL on disk. + let replication_horizon_lsn = if self.walsenders_keep_horizon { + self.walsenders.laggard_lsn() + } else { + None + }; + let horizon_segno: XLogSegNo; let remover = { let shared_state = self.write_shared_state().await; - horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); + horizon_segno = + shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { return Ok(()); // nothing to do } diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index 1945b9d0cb..e3aaf5d391 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -175,6 +175,7 @@ pub fn run_server(os: NodeOs, disk: Arc) -> Result<()> { pg_tenant_only_auth: None, http_auth: None, current_thread_runtime: false, + walsenders_keep_horizon: false, }; let mut global = GlobalMap::new(disk, conf.clone())?;