mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
WIP
This commit is contained in:
@@ -11,6 +11,7 @@ pub mod layer_manager;
|
||||
pub(crate) mod logical_size;
|
||||
pub mod offload;
|
||||
pub mod span;
|
||||
mod standby_horizon;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
@@ -248,7 +249,7 @@ pub struct Timeline {
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
pub(crate) standby_horizon: AtomicLsn,
|
||||
pub(crate) standby_horizons: standby_horizon::Horizons,
|
||||
|
||||
// WAL redo manager. `None` only for broken tenants.
|
||||
walredo_mgr: Option<Arc<super::WalRedoManager>>,
|
||||
@@ -3085,8 +3086,6 @@ impl Timeline {
|
||||
ancestor_timeline: ancestor,
|
||||
ancestor_lsn: metadata.ancestor_lsn(),
|
||||
|
||||
metrics,
|
||||
|
||||
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
@@ -3151,7 +3150,9 @@ impl Timeline {
|
||||
l0_compaction_trigger: resources.l0_compaction_trigger,
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
|
||||
standby_horizon: AtomicLsn::new(0),
|
||||
standby_horizons: standby_horizon::Horizons::new(
|
||||
metrics.standby_horizon_gauge.clone(),
|
||||
),
|
||||
|
||||
pagestream_throttle: resources.pagestream_throttle,
|
||||
|
||||
@@ -3181,6 +3182,8 @@ impl Timeline {
|
||||
basebackup_cache: resources.basebackup_cache,
|
||||
|
||||
feature_resolver: resources.feature_resolver.clone(),
|
||||
|
||||
metrics,
|
||||
};
|
||||
|
||||
result.repartition_threshold =
|
||||
@@ -6531,10 +6534,28 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
|
||||
let standby_horizon = self.standby_horizon.load();
|
||||
// Hold GC for the standby, but as a safety guard do it only within some
|
||||
// reasonable lag.
|
||||
if standby_horizon != Lsn::INVALID {
|
||||
// TODO: revisit this once we've fully transitioned to leases. 10GiB isn't _that_ much
|
||||
// of permitted lag at high ingest rates. Yet again, unlimited lag is a DoS risk.
|
||||
// Ideally we implement a monitoring function for RO replica lag in a higher level controller,
|
||||
// e.g. the TBD compute manager aka endpoint controller. It would kill the replica if it's
|
||||
// lagging too much or is otherwise unresponsive. The standby horizon leasing could
|
||||
// then be moved into that controller, i.e., inside our trust domain. Replica can still
|
||||
// advance existing leases, but, control over existence (create/destroy lease) would be
|
||||
// with that controller.
|
||||
// When solving this, solve it generically for lsn leases as well.
|
||||
let min_standby_horizon = self.standby_horizons.min_and_clear_legacy();
|
||||
let min_standby_horizon = if self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("standby-horizon-leases-in-gc")
|
||||
.is_ok()
|
||||
{
|
||||
min_standby_horizon.all
|
||||
} else {
|
||||
min_standby_horizon.legacy
|
||||
};
|
||||
if let Some(standby_horizon) = min_standby_horizon {
|
||||
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
|
||||
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
|
||||
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
|
||||
@@ -6549,14 +6570,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Reset standby horizon to ignore it if it is not updated till next GC.
|
||||
// It is an easy way to unset it when standby disappears without adding
|
||||
// more conf options.
|
||||
self.standby_horizon.store(Lsn::INVALID);
|
||||
self.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(Lsn::INVALID.0 as i64);
|
||||
|
||||
let res = self
|
||||
.gc_timeline(
|
||||
space_cutoff,
|
||||
|
||||
72
pageserver/src/tenant/timeline/standby_horizon.rs
Normal file
72
pageserver/src/tenant/timeline/standby_horizon.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
//! The standby horizon functionality is used to ensure that getpage requests from
|
||||
//! RO replicas can be served.
|
||||
//!
|
||||
//! RO replicas always lag to some degree behind the primary, and request pages at
|
||||
//! their respective apply LSN. The standby horizon mechanism ensures that the
|
||||
//! Pageserver does not garbage-collect old page versions in the interval between
|
||||
//! `min(valid standby horizon leases)` and the most recent page version.
|
||||
//!
|
||||
//! There are currently two ways of how standby horizon is maintained on pageserver:
|
||||
//! - legacy: as described in RFC36, replica->safekeeper->broker->pageserver
|
||||
//! - leases: TODO
|
||||
|
||||
use metrics::IntGauge;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub struct Horizons {
|
||||
inner: std::sync::Mutex<Inner>,
|
||||
}
|
||||
struct Inner {
|
||||
legacy: Option<Lsn>,
|
||||
pub legacy_metric: IntGauge,
|
||||
}
|
||||
|
||||
/// Returned by [`Self::min_and_clear_legacy`].
|
||||
pub struct Mins {
|
||||
/// Just the legacy mechanism's value.
|
||||
pub legacy: Option<Lsn>,
|
||||
/// The minimum across legacy and all leases mechanism values.
|
||||
pub all: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl Horizons {
|
||||
pub fn new(legacy_metric: IntGauge) -> Self {
|
||||
legacy_metric.set(Lsn::INVALID.0 as i64);
|
||||
Self {
|
||||
inner: std::sync::Mutex::new(Inner {
|
||||
legacy: None,
|
||||
legacy_metric,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Register an update via the legacy mechanism.
|
||||
pub fn register_legacy_update(&self, lsn: Lsn) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.legacy = Some(lsn);
|
||||
inner.legacy_metric.set(lsn.0 as i64);
|
||||
}
|
||||
|
||||
/// Get the minimum standby horizon and clear the horizon propagated via the legacy mechanism
|
||||
/// via [`Self::register_legacy_update`].
|
||||
///
|
||||
/// This method is called from GC to incorporate standby horizons into GC decisions.
|
||||
///
|
||||
/// The clearing of legacy mechanism state is the way it deals with disappearing replicas.
|
||||
/// The legacy mechanims stops calling [`Self::register_legacy_update`] and so, one GC iteration,
|
||||
/// later, the disappeared replica doesn't affect GC anymore.
|
||||
pub fn min_and_clear_legacy(&self) -> Mins {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let legacy = {
|
||||
inner.legacy_metric.set(Lsn::INVALID.0 as i64);
|
||||
inner.legacy.take()
|
||||
};
|
||||
|
||||
// TODO: support leases
|
||||
let leases = [];
|
||||
|
||||
let all = legacy.into_iter().chain(leases.into_iter()).min();
|
||||
|
||||
Mins { legacy, all }
|
||||
}
|
||||
}
|
||||
@@ -754,15 +754,11 @@ impl ConnectionManagerState {
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
);
|
||||
// ignore reports from safekeepers not connected to replicas
|
||||
if timeline_update.standby_horizon != 0 {
|
||||
// ignore reports from safekeepers not connected to replicas
|
||||
self.timeline
|
||||
.standby_horizon
|
||||
.store(Lsn(timeline_update.standby_horizon));
|
||||
self.timeline
|
||||
.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(timeline_update.standby_horizon as i64);
|
||||
.standby_horizons
|
||||
.register_legacy_update(Lsn(timeline_update.standby_horizon));
|
||||
}
|
||||
|
||||
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
|
||||
|
||||
Reference in New Issue
Block a user