diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index a6e99450fa..2882d2ca39 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -724,21 +724,30 @@ static TIMELINE_ARCHIVE_SIZE: Lazy = Lazy::new(|| { static STANDBY_HORIZON: Lazy = Lazy::new(|| { register_int_gauge_vec!( "pageserver_standby_horizon", - "Standby apply LSN for which GC is hold off, by timeline.", + "Gauge mirroring the legacy standby_horizon propagation mechanism's in-memory value.", &["tenant_id", "shard_id", "timeline_id"] ) .expect("failed to define a metric") }); -static STANDBY_HORIZON_LEASES: Lazy = Lazy::new(|| { +static STANDBY_HORIZON_LEASES_COUNT: Lazy = Lazy::new(|| { register_uint_gauge_vec!( - "pageserver_standby_horizon_leases", + "pageserver_standby_horizon_leases_count", "Gauge indicating current number of standby horizon leases, per timeline", &["tenant_id", "shard_id", "timeline_id"] ) .expect("failed to define a metric") }); +static STANDBY_HORIZON_LEASES_MIN: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_standby_horizon_leases_min", + "Gauge indicating the minimum of all known standby_horizon lease.", + &["tenant_id", "shard_id", "timeline_id"] + ) + .expect("failed to define a metric") +}); + static RESIDENT_PHYSICAL_SIZE: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_resident_physical_size", @@ -3345,7 +3354,10 @@ impl TimelineMetrics { legacy_value: STANDBY_HORIZON .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(), - leases_count_gauge: STANDBY_HORIZON_LEASES + leases_min: STANDBY_HORIZON_LEASES_MIN + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(), + leases_count: STANDBY_HORIZON_LEASES_COUNT .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(), }; @@ -3576,7 +3588,8 @@ impl TimelineMetrics { let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]); let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]); let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]); - let _ = STANDBY_HORIZON_LEASES.remove_label_values(&[tenant_id, shard_id, timeline_id]); + let _ = + STANDBY_HORIZON_LEASES_COUNT.remove_label_values(&[tenant_id, shard_id, timeline_id]); { RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get()); let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ae9dbdc021..74fe7eb5d3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3120,6 +3120,11 @@ impl TenantShard { return Err(GcError::NotActive); } + // Horizon invariants hold at all times, including during startup & initial lease deadline. + for tl in self.timelines.lock().unwrap().values().collect_vec() { + tl.standby_horizons.validate_invariants(tl); + } + { let conf = self.tenant_conf.load(); @@ -9549,7 +9554,7 @@ mod tests { } #[tokio::test] - async fn test_standby_horizons() -> anyhow::Result<()> { + async fn test_standby_horizons_basics() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_standby_horizons") .await .unwrap() @@ -9710,6 +9715,131 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_standby_horizon_leases_gc_cutoff_interaction() -> anyhow::Result<()> { + let (tenant, ctx) = TenantHarness::create("test_standby_horizons_renewal_below_cutoff") + .await + .unwrap() + .load() + .await; + + let timeline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + + timeline + .applied_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x20)) + .wait() + .await; + + // below horizon + timeline + .lease_standby_horizon("mylease1".to_string(), Lsn(0x10), &ctx) + .expect_err("this is below cutoff"); + timeline.standby_horizons.validate_invariants(&timeline); + // at horizon + timeline.lease_standby_horizon("mylease2".to_string(), Lsn(0x20), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + // above horizon + timeline.lease_standby_horizon("mylease3".to_string(), Lsn(0x30), &ctx)?; + + timeline.standby_horizons.validate_invariants(&timeline); + + // legacy mechanism had no enforcement to be above gc cutoff in the past + timeline.standby_horizons.register_legacy_update(Lsn(0x10)); + timeline.standby_horizons.validate_invariants(&timeline); + timeline.standby_horizons.register_legacy_update(Lsn(0x20)); + timeline.standby_horizons.validate_invariants(&timeline); + timeline.standby_horizons.register_legacy_update(Lsn(0x30)); + timeline.standby_horizons.validate_invariants(&timeline); + + Ok(()) + } + + #[tokio::test] + async fn test_standby_horizon_monotonicity() -> anyhow::Result<()> { + let (tenant, ctx) = TenantHarness::create("test_standby_horizon_monotonicity") + .await + .unwrap() + .load() + .await; + + let timeline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + + timeline + .applied_gc_cutoff_lsn + .lock_for_write() + .store_and_unlock(Lsn(0x20)) + .wait() + .await; + + // initial + timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + + // stay in place + timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + + // advance + timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x30), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + + // move back within gc horizon + timeline + .lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx) + .expect_err("should fail"); + timeline.standby_horizons.validate_invariants(&timeline); + let leases = timeline + .standby_horizons + .get_leases() + .into_iter() + .map(|(lsn, _)| lsn) + .collect_vec(); + assert_eq!( + leases, + vec![Lsn(0x30)], + "failure should not have changed the lease" + ); + + // move back below gc horizon also forbidden + timeline + .lease_standby_horizon("mylease".to_string(), Lsn(0x10), &ctx) + .expect_err("should fail"); + timeline.standby_horizons.validate_invariants(&timeline); + let leases = timeline + .standby_horizons + .get_leases() + .into_iter() + .map(|(lsn, _)| lsn) + .collect_vec(); + assert_eq!( + leases, + vec![Lsn(0x30)], + "failure should not have changed the lease" + ); + + // another allowed move forward to ensure no poisoning happened + timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x40), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + + // legacy can move freely, not subject to monotonicity + timeline.standby_horizons.register_legacy_update(Lsn(0x40)); + timeline.standby_horizons.validate_invariants(&timeline); + timeline.standby_horizons.register_legacy_update(Lsn(0x10)); + timeline.standby_horizons.validate_invariants(&timeline); + timeline.standby_horizons.register_legacy_update(Lsn(0x20)); + timeline.standby_horizons.validate_invariants(&timeline); + timeline.standby_horizons.register_legacy_update(Lsn(0x30)); + timeline.standby_horizons.validate_invariants(&timeline); + + Ok(()) + } + #[tokio::test] async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> { // diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 57810eacc4..7e058639cb 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6526,6 +6526,7 @@ impl Timeline { /// Currently, we don't make any attempt at removing unneeded page versions /// within a layer file. We can only remove the whole file if it's fully /// obsolete. + #[instrument(skip_all, fields(standby_horizon_which_min))] pub(super) async fn gc(&self) -> Result { // this is most likely the background tasks, but it might be the spawned task from // immediate_gc @@ -6582,14 +6583,24 @@ impl Timeline { // with that controller. // When solving this, solve it generically for lsn leases as well. let min_standby_horizon = self.standby_horizons.min_and_clear_legacy(); - let min_standby_horizon = if self + let flag_evaluation_result = self .feature_resolver - .evaluate_boolean("standby-horizon-leases-in-gc") - .is_ok() - { - min_standby_horizon.all + .evaluate_multivariate("standby-horizon-which-min") + .ok(); + Span::current().record( + "standby_horizon_which_min", + tracing::field::debug(&flag_evaluation_result), + ); + let min_standby_horizon = if cfg!(test) || cfg!(feature = "testing") { + // TODO: parametrize rust test / test suite over the feature flag? + // For now, test the new feature. + min_standby_horizon.leases } else { - min_standby_horizon.legacy + match flag_evaluation_result.as_deref() { + Some("all") => min_standby_horizon.all, + Some("leases") => min_standby_horizon.leases, + None | Some("legacy") | Some(_) => min_standby_horizon.legacy, + } }; if let Some(standby_horizon) = min_standby_horizon { if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) { diff --git a/pageserver/src/tenant/timeline/standby_horizon.rs b/pageserver/src/tenant/timeline/standby_horizon.rs index 43f8a4f1a5..e2b249409e 100644 --- a/pageserver/src/tenant/timeline/standby_horizon.rs +++ b/pageserver/src/tenant/timeline/standby_horizon.rs @@ -16,9 +16,10 @@ use std::{ }; use metrics::{IntGauge, UIntGauge}; +use tracing::{instrument, warn}; use utils::lsn::Lsn; -use crate::assert_u64_eq_usize::UsizeIsU64; +use crate::{assert_u64_eq_usize::UsizeIsU64, tenant::Timeline}; pub struct Horizons { inner: std::sync::Mutex, @@ -26,6 +27,7 @@ pub struct Horizons { struct Inner { legacy: Option, leases_by_id: HashMap, + leases_min: Option, metrics: Metrics, } @@ -33,8 +35,10 @@ struct Inner { pub struct Metrics { /// `pageserver_standby_horizon` pub legacy_value: IntGauge, - /// `pageserver_standby_horizon_leases` - pub leases_count_gauge: UIntGauge, + /// `pageserver_standby_horizon_leases_min` + pub leases_min: UIntGauge, + /// `pageserver_standby_horizon_leases_count` + pub leases_count: UIntGauge, } #[derive(Debug)] @@ -65,6 +69,8 @@ pub struct LeaseInfo { pub struct Mins { /// Just the legacy mechanism's value. pub legacy: Option, + /// Just the leases mechanism's value. + pub leases: Option, /// The minimum across legacy and all leases mechanism values. pub all: Option, } @@ -75,12 +81,16 @@ impl Horizons { metrics.legacy_value.set(Lsn::INVALID.0 as i64); let leases_by_id = HashMap::default(); - metrics.leases_count_gauge.set(0); + metrics.leases_count.set(0); + + let leases_min = None; + metrics.leases_min.set(0); Self { inner: std::sync::Mutex::new(Inner { legacy, leases_by_id, + leases_min, metrics, }), } @@ -108,12 +118,15 @@ impl Horizons { inner.legacy.take() }; - let all = legacy - .into_iter() - .chain(inner.leases_by_id.values().map(|lease| lease.lsn)) - .min(); + let leases = inner.leases_min; - Mins { legacy, all } + let all = std::cmp::min(legacy, inner.leases_min); + + Mins { + legacy, + leases, + all, + } } pub fn upsert_lease( @@ -135,16 +148,32 @@ impl Horizons { let res = LeaseInfo { valid_until: updated.valid_until, }; - inner - .metrics - .leases_count_gauge - .set(inner.leases_by_id.len().into_u64()); + let new_count = inner.leases_by_id.len().into_u64(); + inner.metrics.leases_count.set(new_count); + let leases_min = inner.leases_by_id.values().map(|v| v.lsn).min(); + inner.leases_min = leases_min; + inner.metrics.leases_min.set(leases_min.unwrap_or(Lsn(0)).0); Ok(res) } pub fn cull_leases(&self, now: SystemTime) { let mut inner = self.inner.lock().unwrap(); - inner.leases_by_id.retain(|_, l| l.valid_until > now); + let mut min = None; + inner.leases_by_id.retain(|_, l| { + if l.valid_until > now { + let min = min.get_or_insert(l.lsn); + *min = std::cmp::min(*min, l.lsn); + true + } else { + false + } + }); + inner + .metrics + .leases_count + .set(inner.leases_by_id.len().into_u64()); + inner.leases_min = min; + inner.metrics.leases_min.set(min.unwrap_or(Lsn(0)).0); } pub fn dump(&self) -> serde_json::Value { @@ -152,15 +181,71 @@ impl Horizons { let Inner { legacy, leases_by_id, + leases_min, metrics: _, } = &*inner; serde_json::json!({ "legacy": format!("{legacy:?}"), "leases_by_id": format!("{leases_by_id:?}"), + "leases_min": format!("{leases_min:?}"), }) } - #[cfg(test )] + #[instrument(skip_all)] + pub fn validate_invariants(&self, timeline: &Timeline) { + let mut bug = false; + let inner = self.inner.lock().unwrap(); + let applied_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); + + // INVARIANT: All leases must be at or above applied gc cutoff. + // Violation of this invariant would constitute a bug in gc: + // it should + for (lease_id, lease) in inner.leases_by_id.iter() { + if !(lease.lsn >= *applied_gc_cutoff_lsn) { + warn!(?lease_id, applied_gc_cutoff_lsn=%*applied_gc_cutoff_lsn, "lease is below the applied gc cutoff"); + bug = true; + } + } + // The legacy mechanism never had this invariant, so we don't enforce it here. + macro_rules! bug_on_neq { + ($what:literal, $a:expr, $b:expr,) => { + let a = $a; + let b = $b; + if a != b { + warn!(lhs=?a, rhs=?b, $what); + bug = true; + } + }; + } + + // INVARIANT: The lease count metrics is kept in sync + bug_on_neq!( + "lease count metric", + inner.metrics.leases_count.get(), + inner.leases_by_id.len().into_u64(), + ); + + // INVARIANT: The minimum value is the min of all leases + bug_on_neq!( + "leases_min", + inner.leases_min, + inner.leases_by_id.values().map(|l| l.lsn).min(), + ); + + // INVARIANT: The minimum value and the metric is kept in sync + bug_on_neq!( + "leases_min metric", + inner.metrics.leases_min.get(), + inner.leases_min.unwrap_or(Lsn(0)).0, + ); + + // Make tests fail if invariant is violated. + if cfg!(test) || cfg!(feature = "testing") { + assert!(!bug, "check logs"); + } + } + + #[cfg(test)] pub fn legacy(&self) -> Option { let inner = self.inner.lock().unwrap(); inner.legacy