mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
Add walsenders_keep_horizon option (#6860)
Add `--walsenders-keep-horizon` argument to safekeeper cmdline. It will prevent deleting WAL segments from disk if they are needed by the active START_REPLICATION connection. This is useful for sharding. Without this option, if one of the shard falls behind, it starts to read WAL from S3, which is much slower than disk. This can result in huge shard lagging.
This commit is contained in:
committed by
GitHub
parent
60e5a56a5a
commit
03f8a42ed9
@@ -166,6 +166,10 @@ struct Args {
|
||||
/// useful for debugging.
|
||||
#[arg(long)]
|
||||
current_thread_runtime: bool,
|
||||
/// Keep horizon for walsenders, i.e. don't remove WAL segments that are
|
||||
/// still needed for existing replication connection.
|
||||
#[arg(long)]
|
||||
walsenders_keep_horizon: bool,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -295,6 +299,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
pg_tenant_only_auth,
|
||||
http_auth,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
|
||||
@@ -78,6 +78,7 @@ pub struct SafeKeeperConf {
|
||||
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -121,6 +122,7 @@ impl SafeKeeperConf {
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use anyhow::{bail, Context, Result};
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
|
||||
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::max;
|
||||
use std::cmp::min;
|
||||
@@ -946,28 +946,12 @@ where
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get oldest segno we still need to keep. We hold WAL till it is consumed
|
||||
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
|
||||
/// offloading.
|
||||
/// While it is safe to use inmem values for determining horizon,
|
||||
/// we use persistent to make possible normal states less surprising.
|
||||
pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
|
||||
let mut horizon_lsn = min(
|
||||
self.state.remote_consistent_lsn,
|
||||
self.state.peer_horizon_lsn,
|
||||
);
|
||||
if wal_backup_enabled {
|
||||
horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
|
||||
}
|
||||
horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::future::BoxFuture;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
|
||||
@@ -136,6 +136,21 @@ impl WalSenders {
|
||||
self.mutex.lock().slots.iter().flatten().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get LSN of the most lagging pageserver receiver. Return None if there are no
|
||||
/// active walsenders.
|
||||
pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
|
||||
self.mutex
|
||||
.lock()
|
||||
.slots
|
||||
.iter()
|
||||
.flatten()
|
||||
.filter_map(|s| match s.feedback {
|
||||
ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
|
||||
ReplicationFeedback::Standby(_) => None,
|
||||
})
|
||||
.min()
|
||||
}
|
||||
|
||||
/// Get aggregated pageserver feedback.
|
||||
pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
|
||||
self.mutex.lock().agg_ps_feedback
|
||||
|
||||
@@ -286,6 +286,29 @@ impl SharedState {
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Get oldest segno we still need to keep. We hold WAL till it is consumed
|
||||
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
|
||||
/// offloading.
|
||||
/// While it is safe to use inmem values for determining horizon,
|
||||
/// we use persistent to make possible normal states less surprising.
|
||||
fn get_horizon_segno(
|
||||
&self,
|
||||
wal_backup_enabled: bool,
|
||||
extra_horizon_lsn: Option<Lsn>,
|
||||
) -> XLogSegNo {
|
||||
let state = &self.sk.state;
|
||||
|
||||
use std::cmp::min;
|
||||
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
|
||||
if wal_backup_enabled {
|
||||
horizon_lsn = min(horizon_lsn, state.backup_lsn);
|
||||
}
|
||||
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
|
||||
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
|
||||
}
|
||||
horizon_lsn.segment_number(state.server.wal_seg_size as usize)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -353,6 +376,12 @@ pub struct Timeline {
|
||||
|
||||
/// Directory where timeline state is stored.
|
||||
pub timeline_dir: Utf8PathBuf,
|
||||
|
||||
/// Should we keep WAL on disk for active replication connections.
|
||||
/// Especially useful for sharding, when different shards process WAL
|
||||
/// with different speed.
|
||||
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
|
||||
walsenders_keep_horizon: bool,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
@@ -386,6 +415,7 @@ impl Timeline {
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
walsenders_keep_horizon: conf.walsenders_keep_horizon,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -418,6 +448,7 @@ impl Timeline {
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&ttid),
|
||||
walsenders_keep_horizon: conf.walsenders_keep_horizon,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -817,10 +848,20 @@ impl Timeline {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
|
||||
// This allows to get better read speed for pageservers that are lagging behind,
|
||||
// at the cost of keeping more WAL on disk.
|
||||
let replication_horizon_lsn = if self.walsenders_keep_horizon {
|
||||
self.walsenders.laggard_lsn()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
|
||||
horizon_segno =
|
||||
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
|
||||
@@ -175,6 +175,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
pg_tenant_only_auth: None,
|
||||
http_auth: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
Reference in New Issue
Block a user