diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 05e55e6092..65c3dac40e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2258,9 +2258,7 @@ impl PageServerHandler { let result: Option = timeline .lease_standby_horizon(lease_id, lsn, ctx) - .inspect_err(|e| { - warn!("{e}"); - }) + // logging happens inside .ok(); // Encode result as Option @@ -3946,7 +3944,7 @@ impl proto::PageService for GrpcPageServiceHandler { // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted. let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) { Ok(expiration) => expiration, - Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))), + Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))), }; Ok(tonic::Response::new(expiration.into())) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f576119db8..ee10b51eff 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4830,6 +4830,7 @@ impl TenantShard { // Cull any expired leases let now = SystemTime::now(); target.leases.retain(|_, lease| !lease.is_expired(&now)); + timeline.standby_horizons.cull_leases(now); timeline .metrics diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index de942c153b..ae062ca326 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1861,13 +1861,27 @@ impl Timeline { Ok(lease) } + #[instrument(skip(_ctx), ret(Debug), err(Debug))] pub(crate) fn lease_standby_horizon( &self, lease_id: String, lsn: Lsn, - ctx: &RequestContext, + _ctx: &RequestContext, ) -> anyhow::Result { - todo!() + if self + .feature_resolver + .evaluate_boolean("standby-horizon-leases-track-disable") + .is_ok() + { + // Use feature-flagging as a fail-safe in case something is wrong with the data structure (memory consumption, etc.) + return Ok(SystemTime::now() + .checked_add(Duration::from_secs(5 * 60)) + .unwrap()); + } + let length = todo!("duplicate init lease deadline logic?"); + self.standby_horizons + .upsert_lease(lease_id, lsn, length) + .map(|lease| lease.valid_until) } /// Freeze the current open in-memory layer. It will be written to disk on next iteration. diff --git a/pageserver/src/tenant/timeline/standby_horizon.rs b/pageserver/src/tenant/timeline/standby_horizon.rs index fdda479643..dc6d67f353 100644 --- a/pageserver/src/tenant/timeline/standby_horizon.rs +++ b/pageserver/src/tenant/timeline/standby_horizon.rs @@ -10,6 +10,11 @@ //! - legacy: as described in RFC36, replica->safekeeper->broker->pageserver //! - leases: TODO +use std::{ + collections::{HashMap, hash_map}, + time::{Duration, SystemTime}, +}; + use metrics::IntGauge; use utils::lsn::Lsn; @@ -18,7 +23,31 @@ pub struct Horizons { } struct Inner { legacy: Option, - pub legacy_metric: IntGauge, + legacy_metric: IntGauge, + leases_by_id: HashMap, +} + +struct Lease { + valid_until: SystemTime, + lsn: Lsn, +} + +impl Lease { + pub fn try_update(&mut self, update: Lease) -> anyhow::Result<()> { + let Lease { + valid_until: expiration, + lsn, + } = update; + anyhow::ensure!(self.valid_until <= expiration); + anyhow::ensure!(self.lsn <= lsn); + *self = update; + Ok(()) + } +} + +#[derive(Debug)] +pub struct LeaseInfo { + pub valid_until: SystemTime, } /// Returned by [`Self::min_and_clear_legacy`]. @@ -36,6 +65,7 @@ impl Horizons { inner: std::sync::Mutex::new(Inner { legacy: None, legacy_metric, + leases_by_id: Default::default(), }), } } @@ -62,11 +92,37 @@ impl Horizons { inner.legacy.take() }; - // TODO: support leases - let leases = []; - - let all = legacy.into_iter().chain(leases.into_iter()).min(); + let all = legacy + .into_iter() + .chain(inner.leases_by_id.values().map(|lease| lease.lsn)) + .min(); Mins { legacy, all } } + + pub fn upsert_lease( + &self, + id: String, + lsn: Lsn, + length: Duration, + ) -> anyhow::Result { + let mut inner = self.inner.lock().unwrap(); + let valid_until = SystemTime::now() + length; + let update = Lease { valid_until, lsn }; + let updated = match inner.leases_by_id.entry(id) { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().try_update(update)?; + entry.into_mut() + } + hash_map::Entry::Vacant(entry) => entry.insert(update), + }; + Ok(LeaseInfo { + valid_until: updated.valid_until, + }) + } + + pub fn cull_leases(&self, now: SystemTime) { + let mut inner = self.inner.lock().unwrap(); + inner.leases_by_id.retain(|_, l| l.valid_until <= now); + } }