finalize 3-stepped feature-gating (legacy,all,leases) + more tests + observability + fixes

This commit is contained in:
Christian Schwarz
2025-07-26 16:19:26 +02:00
parent fc7267a760
commit 73336962a8
4 changed files with 266 additions and 27 deletions

View File

@@ -724,21 +724,30 @@ static TIMELINE_ARCHIVE_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_standby_horizon",
"Standby apply LSN for which GC is hold off, by timeline.",
"Gauge mirroring the legacy standby_horizon propagation mechanism's in-memory value.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static STANDBY_HORIZON_LEASES: Lazy<UIntGaugeVec> = Lazy::new(|| {
static STANDBY_HORIZON_LEASES_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_standby_horizon_leases",
"pageserver_standby_horizon_leases_count",
"Gauge indicating current number of standby horizon leases, per timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static STANDBY_HORIZON_LEASES_MIN: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_standby_horizon_leases_min",
"Gauge indicating the minimum of all known standby_horizon lease.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_resident_physical_size",
@@ -3345,7 +3354,10 @@ impl TimelineMetrics {
legacy_value: STANDBY_HORIZON
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap(),
leases_count_gauge: STANDBY_HORIZON_LEASES
leases_min: STANDBY_HORIZON_LEASES_MIN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap(),
leases_count: STANDBY_HORIZON_LEASES_COUNT
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap(),
};
@@ -3576,7 +3588,8 @@ impl TimelineMetrics {
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON_LEASES.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ =
STANDBY_HORIZON_LEASES_COUNT.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);

View File

@@ -3120,6 +3120,11 @@ impl TenantShard {
return Err(GcError::NotActive);
}
// Horizon invariants hold at all times, including during startup & initial lease deadline.
for tl in self.timelines.lock().unwrap().values().collect_vec() {
tl.standby_horizons.validate_invariants(tl);
}
{
let conf = self.tenant_conf.load();
@@ -9549,7 +9554,7 @@ mod tests {
}
#[tokio::test]
async fn test_standby_horizons() -> anyhow::Result<()> {
async fn test_standby_horizons_basics() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_standby_horizons")
.await
.unwrap()
@@ -9710,6 +9715,131 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_standby_horizon_leases_gc_cutoff_interaction() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_standby_horizons_renewal_below_cutoff")
.await
.unwrap()
.load()
.await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
timeline
.applied_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x20))
.wait()
.await;
// below horizon
timeline
.lease_standby_horizon("mylease1".to_string(), Lsn(0x10), &ctx)
.expect_err("this is below cutoff");
timeline.standby_horizons.validate_invariants(&timeline);
// at horizon
timeline.lease_standby_horizon("mylease2".to_string(), Lsn(0x20), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// above horizon
timeline.lease_standby_horizon("mylease3".to_string(), Lsn(0x30), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// legacy mechanism had no enforcement to be above gc cutoff in the past
timeline.standby_horizons.register_legacy_update(Lsn(0x10));
timeline.standby_horizons.validate_invariants(&timeline);
timeline.standby_horizons.register_legacy_update(Lsn(0x20));
timeline.standby_horizons.validate_invariants(&timeline);
timeline.standby_horizons.register_legacy_update(Lsn(0x30));
timeline.standby_horizons.validate_invariants(&timeline);
Ok(())
}
#[tokio::test]
async fn test_standby_horizon_monotonicity() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_standby_horizon_monotonicity")
.await
.unwrap()
.load()
.await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
timeline
.applied_gc_cutoff_lsn
.lock_for_write()
.store_and_unlock(Lsn(0x20))
.wait()
.await;
// initial
timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// stay in place
timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// advance
timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x30), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// move back within gc horizon
timeline
.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)
.expect_err("should fail");
timeline.standby_horizons.validate_invariants(&timeline);
let leases = timeline
.standby_horizons
.get_leases()
.into_iter()
.map(|(lsn, _)| lsn)
.collect_vec();
assert_eq!(
leases,
vec![Lsn(0x30)],
"failure should not have changed the lease"
);
// move back below gc horizon also forbidden
timeline
.lease_standby_horizon("mylease".to_string(), Lsn(0x10), &ctx)
.expect_err("should fail");
timeline.standby_horizons.validate_invariants(&timeline);
let leases = timeline
.standby_horizons
.get_leases()
.into_iter()
.map(|(lsn, _)| lsn)
.collect_vec();
assert_eq!(
leases,
vec![Lsn(0x30)],
"failure should not have changed the lease"
);
// another allowed move forward to ensure no poisoning happened
timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x40), &ctx)?;
timeline.standby_horizons.validate_invariants(&timeline);
// legacy can move freely, not subject to monotonicity
timeline.standby_horizons.register_legacy_update(Lsn(0x40));
timeline.standby_horizons.validate_invariants(&timeline);
timeline.standby_horizons.register_legacy_update(Lsn(0x10));
timeline.standby_horizons.validate_invariants(&timeline);
timeline.standby_horizons.register_legacy_update(Lsn(0x20));
timeline.standby_horizons.validate_invariants(&timeline);
timeline.standby_horizons.register_legacy_update(Lsn(0x30));
timeline.standby_horizons.validate_invariants(&timeline);
Ok(())
}
#[tokio::test]
async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> {
//

View File

@@ -6526,6 +6526,7 @@ impl Timeline {
/// Currently, we don't make any attempt at removing unneeded page versions
/// within a layer file. We can only remove the whole file if it's fully
/// obsolete.
#[instrument(skip_all, fields(standby_horizon_which_min))]
pub(super) async fn gc(&self) -> Result<GcResult, GcError> {
// this is most likely the background tasks, but it might be the spawned task from
// immediate_gc
@@ -6582,14 +6583,24 @@ impl Timeline {
// with that controller.
// When solving this, solve it generically for lsn leases as well.
let min_standby_horizon = self.standby_horizons.min_and_clear_legacy();
let min_standby_horizon = if self
let flag_evaluation_result = self
.feature_resolver
.evaluate_boolean("standby-horizon-leases-in-gc")
.is_ok()
{
min_standby_horizon.all
.evaluate_multivariate("standby-horizon-which-min")
.ok();
Span::current().record(
"standby_horizon_which_min",
tracing::field::debug(&flag_evaluation_result),
);
let min_standby_horizon = if cfg!(test) || cfg!(feature = "testing") {
// TODO: parametrize rust test / test suite over the feature flag?
// For now, test the new feature.
min_standby_horizon.leases
} else {
min_standby_horizon.legacy
match flag_evaluation_result.as_deref() {
Some("all") => min_standby_horizon.all,
Some("leases") => min_standby_horizon.leases,
None | Some("legacy") | Some(_) => min_standby_horizon.legacy,
}
};
if let Some(standby_horizon) = min_standby_horizon {
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {

View File

@@ -16,9 +16,10 @@ use std::{
};
use metrics::{IntGauge, UIntGauge};
use tracing::{instrument, warn};
use utils::lsn::Lsn;
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::{assert_u64_eq_usize::UsizeIsU64, tenant::Timeline};
pub struct Horizons {
inner: std::sync::Mutex<Inner>,
@@ -26,6 +27,7 @@ pub struct Horizons {
struct Inner {
legacy: Option<Lsn>,
leases_by_id: HashMap<String, Lease>,
leases_min: Option<Lsn>,
metrics: Metrics,
}
@@ -33,8 +35,10 @@ struct Inner {
pub struct Metrics {
/// `pageserver_standby_horizon`
pub legacy_value: IntGauge,
/// `pageserver_standby_horizon_leases`
pub leases_count_gauge: UIntGauge,
/// `pageserver_standby_horizon_leases_min`
pub leases_min: UIntGauge,
/// `pageserver_standby_horizon_leases_count`
pub leases_count: UIntGauge,
}
#[derive(Debug)]
@@ -65,6 +69,8 @@ pub struct LeaseInfo {
pub struct Mins {
/// Just the legacy mechanism's value.
pub legacy: Option<Lsn>,
/// Just the leases mechanism's value.
pub leases: Option<Lsn>,
/// The minimum across legacy and all leases mechanism values.
pub all: Option<Lsn>,
}
@@ -75,12 +81,16 @@ impl Horizons {
metrics.legacy_value.set(Lsn::INVALID.0 as i64);
let leases_by_id = HashMap::default();
metrics.leases_count_gauge.set(0);
metrics.leases_count.set(0);
let leases_min = None;
metrics.leases_min.set(0);
Self {
inner: std::sync::Mutex::new(Inner {
legacy,
leases_by_id,
leases_min,
metrics,
}),
}
@@ -108,12 +118,15 @@ impl Horizons {
inner.legacy.take()
};
let all = legacy
.into_iter()
.chain(inner.leases_by_id.values().map(|lease| lease.lsn))
.min();
let leases = inner.leases_min;
Mins { legacy, all }
let all = std::cmp::min(legacy, inner.leases_min);
Mins {
legacy,
leases,
all,
}
}
pub fn upsert_lease(
@@ -135,16 +148,32 @@ impl Horizons {
let res = LeaseInfo {
valid_until: updated.valid_until,
};
inner
.metrics
.leases_count_gauge
.set(inner.leases_by_id.len().into_u64());
let new_count = inner.leases_by_id.len().into_u64();
inner.metrics.leases_count.set(new_count);
let leases_min = inner.leases_by_id.values().map(|v| v.lsn).min();
inner.leases_min = leases_min;
inner.metrics.leases_min.set(leases_min.unwrap_or(Lsn(0)).0);
Ok(res)
}
pub fn cull_leases(&self, now: SystemTime) {
let mut inner = self.inner.lock().unwrap();
inner.leases_by_id.retain(|_, l| l.valid_until > now);
let mut min = None;
inner.leases_by_id.retain(|_, l| {
if l.valid_until > now {
let min = min.get_or_insert(l.lsn);
*min = std::cmp::min(*min, l.lsn);
true
} else {
false
}
});
inner
.metrics
.leases_count
.set(inner.leases_by_id.len().into_u64());
inner.leases_min = min;
inner.metrics.leases_min.set(min.unwrap_or(Lsn(0)).0);
}
pub fn dump(&self) -> serde_json::Value {
@@ -152,15 +181,71 @@ impl Horizons {
let Inner {
legacy,
leases_by_id,
leases_min,
metrics: _,
} = &*inner;
serde_json::json!({
"legacy": format!("{legacy:?}"),
"leases_by_id": format!("{leases_by_id:?}"),
"leases_min": format!("{leases_min:?}"),
})
}
#[cfg(test )]
#[instrument(skip_all)]
pub fn validate_invariants(&self, timeline: &Timeline) {
let mut bug = false;
let inner = self.inner.lock().unwrap();
let applied_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
// INVARIANT: All leases must be at or above applied gc cutoff.
// Violation of this invariant would constitute a bug in gc:
// it should
for (lease_id, lease) in inner.leases_by_id.iter() {
if !(lease.lsn >= *applied_gc_cutoff_lsn) {
warn!(?lease_id, applied_gc_cutoff_lsn=%*applied_gc_cutoff_lsn, "lease is below the applied gc cutoff");
bug = true;
}
}
// The legacy mechanism never had this invariant, so we don't enforce it here.
macro_rules! bug_on_neq {
($what:literal, $a:expr, $b:expr,) => {
let a = $a;
let b = $b;
if a != b {
warn!(lhs=?a, rhs=?b, $what);
bug = true;
}
};
}
// INVARIANT: The lease count metrics is kept in sync
bug_on_neq!(
"lease count metric",
inner.metrics.leases_count.get(),
inner.leases_by_id.len().into_u64(),
);
// INVARIANT: The minimum value is the min of all leases
bug_on_neq!(
"leases_min",
inner.leases_min,
inner.leases_by_id.values().map(|l| l.lsn).min(),
);
// INVARIANT: The minimum value and the metric is kept in sync
bug_on_neq!(
"leases_min metric",
inner.metrics.leases_min.get(),
inner.leases_min.unwrap_or(Lsn(0)).0,
);
// Make tests fail if invariant is violated.
if cfg!(test) || cfg!(feature = "testing") {
assert!(!bug, "check logs");
}
}
#[cfg(test)]
pub fn legacy(&self) -> Option<Lsn> {
let inner = self.inner.lock().unwrap();
inner.legacy