From 2b5bb850f2355ba6ad94077649be9a4080b8ceb0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 20 Jul 2025 20:46:20 +0000 Subject: [PATCH] WIP --- pageserver/src/tenant/timeline.rs | 41 +++++++---- .../src/tenant/timeline/standby_horizon.rs | 72 +++++++++++++++++++ .../walreceiver/connection_manager.rs | 10 +-- 3 files changed, 102 insertions(+), 21 deletions(-) create mode 100644 pageserver/src/tenant/timeline/standby_horizon.rs diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 47eb20cf61..de942c153b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -11,6 +11,7 @@ pub mod layer_manager; pub(crate) mod logical_size; pub mod offload; pub mod span; +mod standby_horizon; pub mod uninit; mod walreceiver; @@ -248,7 +249,7 @@ pub struct Timeline { // Atomic would be more appropriate here. last_freeze_ts: RwLock, - pub(crate) standby_horizon: AtomicLsn, + pub(crate) standby_horizons: standby_horizon::Horizons, // WAL redo manager. `None` only for broken tenants. walredo_mgr: Option>, @@ -3085,8 +3086,6 @@ impl Timeline { ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn(), - metrics, - query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new( &tenant_shard_id, &timeline_id, @@ -3151,7 +3150,9 @@ impl Timeline { l0_compaction_trigger: resources.l0_compaction_trigger, gc_lock: tokio::sync::Mutex::default(), - standby_horizon: AtomicLsn::new(0), + standby_horizons: standby_horizon::Horizons::new( + metrics.standby_horizon_gauge.clone(), + ), pagestream_throttle: resources.pagestream_throttle, @@ -3181,6 +3182,8 @@ impl Timeline { basebackup_cache: resources.basebackup_cache, feature_resolver: resources.feature_resolver.clone(), + + metrics, }; result.repartition_threshold = @@ -6531,10 +6534,28 @@ impl Timeline { }; let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default()); - let standby_horizon = self.standby_horizon.load(); // Hold GC for the standby, but as a safety guard do it only within some // reasonable lag. - if standby_horizon != Lsn::INVALID { + // TODO: revisit this once we've fully transitioned to leases. 10GiB isn't _that_ much + // of permitted lag at high ingest rates. Yet again, unlimited lag is a DoS risk. + // Ideally we implement a monitoring function for RO replica lag in a higher level controller, + // e.g. the TBD compute manager aka endpoint controller. It would kill the replica if it's + // lagging too much or is otherwise unresponsive. The standby horizon leasing could + // then be moved into that controller, i.e., inside our trust domain. Replica can still + // advance existing leases, but, control over existence (create/destroy lease) would be + // with that controller. + // When solving this, solve it generically for lsn leases as well. + let min_standby_horizon = self.standby_horizons.min_and_clear_legacy(); + let min_standby_horizon = if self + .feature_resolver + .evaluate_boolean("standby-horizon-leases-in-gc") + .is_ok() + { + min_standby_horizon.all + } else { + min_standby_horizon.legacy + }; + if let Some(standby_horizon) = min_standby_horizon { if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) { const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG { @@ -6549,14 +6570,6 @@ impl Timeline { } } - // Reset standby horizon to ignore it if it is not updated till next GC. - // It is an easy way to unset it when standby disappears without adding - // more conf options. - self.standby_horizon.store(Lsn::INVALID); - self.metrics - .standby_horizon_gauge - .set(Lsn::INVALID.0 as i64); - let res = self .gc_timeline( space_cutoff, diff --git a/pageserver/src/tenant/timeline/standby_horizon.rs b/pageserver/src/tenant/timeline/standby_horizon.rs new file mode 100644 index 0000000000..fdda479643 --- /dev/null +++ b/pageserver/src/tenant/timeline/standby_horizon.rs @@ -0,0 +1,72 @@ +//! The standby horizon functionality is used to ensure that getpage requests from +//! RO replicas can be served. +//! +//! RO replicas always lag to some degree behind the primary, and request pages at +//! their respective apply LSN. The standby horizon mechanism ensures that the +//! Pageserver does not garbage-collect old page versions in the interval between +//! `min(valid standby horizon leases)` and the most recent page version. +//! +//! There are currently two ways of how standby horizon is maintained on pageserver: +//! - legacy: as described in RFC36, replica->safekeeper->broker->pageserver +//! - leases: TODO + +use metrics::IntGauge; +use utils::lsn::Lsn; + +pub struct Horizons { + inner: std::sync::Mutex, +} +struct Inner { + legacy: Option, + pub legacy_metric: IntGauge, +} + +/// Returned by [`Self::min_and_clear_legacy`]. +pub struct Mins { + /// Just the legacy mechanism's value. + pub legacy: Option, + /// The minimum across legacy and all leases mechanism values. + pub all: Option, +} + +impl Horizons { + pub fn new(legacy_metric: IntGauge) -> Self { + legacy_metric.set(Lsn::INVALID.0 as i64); + Self { + inner: std::sync::Mutex::new(Inner { + legacy: None, + legacy_metric, + }), + } + } + + /// Register an update via the legacy mechanism. + pub fn register_legacy_update(&self, lsn: Lsn) { + let mut inner = self.inner.lock().unwrap(); + inner.legacy = Some(lsn); + inner.legacy_metric.set(lsn.0 as i64); + } + + /// Get the minimum standby horizon and clear the horizon propagated via the legacy mechanism + /// via [`Self::register_legacy_update`]. + /// + /// This method is called from GC to incorporate standby horizons into GC decisions. + /// + /// The clearing of legacy mechanism state is the way it deals with disappearing replicas. + /// The legacy mechanims stops calling [`Self::register_legacy_update`] and so, one GC iteration, + /// later, the disappeared replica doesn't affect GC anymore. + pub fn min_and_clear_legacy(&self) -> Mins { + let mut inner = self.inner.lock().unwrap(); + let legacy = { + inner.legacy_metric.set(Lsn::INVALID.0 as i64); + inner.legacy.take() + }; + + // TODO: support leases + let leases = []; + + let all = legacy.into_iter().chain(leases.into_iter()).min(); + + Mins { legacy, all } + } +} diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index aba94244a3..add1c8b304 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -754,15 +754,11 @@ impl ConnectionManagerState { "safekeeper info update: standby_horizon(cutoff)={}", timeline_update.standby_horizon ); + // ignore reports from safekeepers not connected to replicas if timeline_update.standby_horizon != 0 { - // ignore reports from safekeepers not connected to replicas self.timeline - .standby_horizon - .store(Lsn(timeline_update.standby_horizon)); - self.timeline - .metrics - .standby_horizon_gauge - .set(timeline_update.standby_horizon as i64); + .standby_horizons + .register_legacy_update(Lsn(timeline_update.standby_horizon)); } let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);