diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8112831766..f83619eafe 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1050,7 +1050,7 @@ async fn get_lsn_by_timestamp_handler( let lease = if with_lease { timeline - .init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx) + .make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), true, &ctx) .inspect_err(|_| { warn!("fail to grant a lease to {}", lsn); }) @@ -2222,7 +2222,7 @@ async fn post_lsn_lease_handler( let result = async { timeline - .init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx) + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), true, &ctx) .map_err(|e| { ApiError::InternalServerError( e.context(format!("invalid lsn lease request at {lsn}")), diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6b614deac8..01e1849e5b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2199,7 +2199,7 @@ impl PageServerHandler { set_tracing_field_shard_id(&timeline); let lease = timeline - .renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx) + .make_lsn_lease(lsn, timeline.get_lsn_lease_length(), false, ctx) .inspect_err(|e| { warn!("{e}"); }) @@ -3787,7 +3787,7 @@ impl proto::PageService for GrpcPageServiceHandler { // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted. let lease_length = timeline.get_lsn_lease_length(); - let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) { + let expires = match timeline.make_lsn_lease(req.lsn, lease_length, false, &ctx) { Ok(lease) => lease.valid_until, Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))), }; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f576119db8..6f5a1db111 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4829,12 +4829,12 @@ impl TenantShard { // Cull any expired leases let now = SystemTime::now(); - target.leases.retain(|_, lease| !lease.is_expired(&now)); + let leases_info = target.leases.cull(now); timeline .metrics .valid_lsn_lease_count_gauge - .set(target.leases.len() as u64); + .set(leases_info.num_unique_lsns as u64); // Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() { @@ -9439,7 +9439,7 @@ mod tests { }); let updated_lease_0 = timeline - .renew_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx) + .lease_lsn2(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx) .expect("lease renewal should succeed"); assert_eq!( updated_lease_0.valid_until, leases[0].valid_until, @@ -9447,7 +9447,7 @@ mod tests { ); let updated_lease_1 = timeline - .renew_lsn_lease( + .lease_lsn2( Lsn(leased_lsns[1]), timeline.get_lsn_lease_length() * 2, &ctx, diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index d1020cff96..7349e33058 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -252,9 +252,8 @@ pub(super) async fn gather_inputs( let lease_points = gc_info .leases - .keys() - .filter(|&&lsn| lsn > ancestor_lsn) - .copied() + .iter_leased_lsns() + .filter(|lsn| *lsn > ancestor_lsn) .collect::>(); // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4320f3b142..ab71207442 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -8,6 +8,7 @@ mod heatmap_layers_downloader; pub(crate) mod import_pgdata; mod init; pub mod layer_manager; +mod leases; pub(crate) mod logical_size; pub mod offload; pub mod span; @@ -481,8 +482,7 @@ pub(crate) struct GcInfo { /// The cutoff coordinates, which are combined by selecting the minimum. pub(crate) cutoffs: GcCutoffs, - /// Leases granted to particular LSNs. - pub(crate) leases: BTreeMap, + pub(crate) leases: leases::Leases, /// Whether our branch point is within our ancestor's PITR interval (for cost estimation) pub(crate) within_ancestor_pitr: bool, @@ -530,7 +530,7 @@ impl GcInfo { self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes) } pub(crate) fn lsn_covered_by_lease(&self, lsn: Lsn) -> bool { - self.leases.contains_key(&lsn) + self.leases.has_lease_at_exactly_this_lsn(lsn) } } @@ -1760,76 +1760,63 @@ impl Timeline { Ok(()) } - /// Initializes an LSN lease. The function will return an error if the requested LSN is less than the `latest_gc_cutoff_lsn`. - pub(crate) fn init_lsn_lease( - &self, - lsn: Lsn, - length: Duration, - ctx: &RequestContext, - ) -> anyhow::Result { - self.make_lsn_lease(lsn, length, true, ctx) - } - - /// Renews a lease at a particular LSN. The requested LSN is not validated against the `latest_gc_cutoff_lsn` when we are in the grace period. - pub(crate) fn renew_lsn_lease( - &self, - lsn: Lsn, - length: Duration, - ctx: &RequestContext, - ) -> anyhow::Result { - self.make_lsn_lease(lsn, length, false, ctx) - } - - /// Obtains a temporary lease blocking garbage collection for the given LSN. + /// Upsert a temporary lease to inhibit garbage collection for the given LSN. /// - /// If we are in `AttachedSingle` mode and is not blocked by the lsn lease deadline, this function will error - /// if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is no existing request present. + /// renewal of an existing lease always works + /// page_service/compute can create new leases above applied cutoff + /// management API/cplane can create new leases above planned cutoff /// - /// If there is an existing lease in the map, the lease will be renewed only if the request extends the lease. - /// The returned lease is therefore the maximum between the existing lease and the requesting lease. - fn make_lsn_lease( + pub fn make_lsn_lease( &self, lsn: Lsn, length: Duration, - init: bool, + is_management_api: bool, _ctx: &RequestContext, ) -> anyhow::Result { - let lease = { - // Normalize the requested LSN to be aligned, and move to the first record - // if it points to the beginning of the page (header). - let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE); + // Normalize the requested LSN to be aligned, and move to the first record + // if it points to the beginning of the page (header). + let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE); - let mut gc_info = self.gc_info.write().unwrap(); - let planned_cutoff = gc_info.min_cutoff(); + let mut gc_info = self.gc_info.write().unwrap(); + let planned_cutoff = gc_info.min_cutoff(); - let valid_until = SystemTime::now() + length; + let valid_until = SystemTime::now() + length; - let entry = gc_info.leases.entry(lsn); + let pending = match gc_info.leases.begin_upsert_unnamed(lsn) { + Ok(pending) => pending, + Err(leases::BeginUpsertError::ExistingLeaseWithHigherLsn { + holder, + existing, + update, + }) => { + return Err(anyhow::anyhow!( + "tried to move named lsn lease to an older LSN: existing={existing:?} update={update:?} holder={holder}" + )); + } + Err(leases::BeginUpsertError::ExistingLeaseWithLaterExpirationTime { + existing, + update, + }) => { + info!("existing lease covers greater length, valid until {}", dt); + } + }; - match entry { - Entry::Occupied(mut occupied) => { - let existing_lease = occupied.get_mut(); - if valid_until > existing_lease.valid_until { - existing_lease.valid_until = valid_until; - let dt: DateTime = valid_until.into(); - info!("lease extended to {}", dt); - } else { - let dt: DateTime = existing_lease.valid_until.into(); - info!("existing lease covers greater length, valid until {}", dt); - } + // Policy checks for lease renewal. + // + // Note that we avoid persisting leases and instead inhibit GC for the max lease duration after tenant restart/migration. + // This period is called `lsn_lease_deadline`. + // So, just an insert into runtime data structure can in fact be a renewal from the caller's point of view. - existing_lease.clone() - } - Entry::Vacant(vacant) => { - // Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted. - let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn(); - if lsn < *latest_gc_cutoff_lsn { - bail!( - "tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", - lsn, - *latest_gc_cutoff_lsn - ); - } + // Rule #1: never allow a lease to exist below applied GC cutoff. + // + let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn(); + if lsn < *latest_gc_cutoff_lsn { + bail!( + "tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", + lsn, + *latest_gc_cutoff_lsn + ); + } // We allow create lease for those below the planned gc cutoff if we are still within the grace period // of GC blocking. @@ -1851,10 +1838,56 @@ impl Timeline { let dt: DateTime = valid_until.into(); info!("lease created, valid until {}", dt); vacant.insert(LsnLease { valid_until }).clone() - } - } - }; + if gc_info.leases.has_lease_at_exactly_this_lsn(lsn) { + // It's a renewal. + } else { + self.feature_resolver + .evaluate_boolean("lease-renewal-allow-"); + + // Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted. + let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn(); + if lsn < *latest_gc_cutoff_lsn { + bail!( + "tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}", + lsn, + *latest_gc_cutoff_lsn + ); + } + + // We avoid persisting leases and instead inhibit GC for the max lease duration after tenant restart. + // + let validate = { + let conf = self.tenant_conf.load(); + !conf.is_gc_blocked_by_lsn_lease_deadline() + }; + if !(is_compute_request && conf.is_gc_blocked_by_lsn_lease_deadline()) + && lsn < planned_cutoff + { + bail!( + "tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}", + lsn, + planned_cutoff + ); + } + } + + let (lease, result) = gc_info + .leases + .begin_upsert_unnamed(leases::UpsertUnnamed { lsn, valid_until }); + + let dt: DateTime = lease.valid_until.into(); + match result { + leases::UpsertUnnamedResult::Created => { + info!("lease created, valid until {}", dt); + } + leases::UpsertUnnamedResult::ExtendedExisting => { + info!("lease extended to {}", dt); + } + leases::UpsertUnnamedResult::ExistingIsLonger { valid_until } => { + info!("existing lease covers greater length, valid until {}", dt); + } + } Ok(lease) } @@ -6503,11 +6536,8 @@ impl Timeline { .map(|(lsn, _child_id, _is_offloaded)| *lsn) .collect(); - // Gets the maximum LSN that holds the valid lease. - // - // Caveat: `refresh_gc_info` is in charged of updating the lease map. - // Here, we do not check for stale leases again. - let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn); + // NB: We already culled stale leases in `refresh_gc_info`. + let max_lsn_with_valid_lease = gc_info.leases.max_lsn(); ( space_cutoff, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index e5ce733663..d1b7aa0cc6 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -3267,9 +3267,9 @@ impl Timeline { retain_lsns_below_horizon.push(*lsn); } } - for lsn in gc_info.leases.keys() { - if lsn < &gc_cutoff { - retain_lsns_below_horizon.push(*lsn); + for lsn in gc_info.leases.iter_leased_lsns() { + if lsn < gc_cutoff { + retain_lsns_below_horizon.push(lsn); } } let mut selected_layers: Vec = Vec::new(); diff --git a/pageserver/src/tenant/timeline/leases.rs b/pageserver/src/tenant/timeline/leases.rs new file mode 100644 index 0000000000..c7becd00a4 --- /dev/null +++ b/pageserver/src/tenant/timeline/leases.rs @@ -0,0 +1,235 @@ +//! Leases ensure readability at the leased or later LSN by inhibiting GC. +//! +//! Users of leases: +//! +//! - cplane to avoid time-of-check time-of-use issues when it retrieves an +//! LSN from some Pageserver APIs and later wants to use that LSN in an API +//! that requires the passed LSN to be readable. Example: branch creation. +//! - static-lsn computes aka ephemeral endpoints aka static computes that +//! issue getpage requests at a fixed LSN +//! - soon: RO replicas instead of the current `standby_horizon` mechanism. +//! +//! The first two identify their lease by LSN. +//! If there are multiple such leasers for the same LSN, they share an underlying +//! expiration deadline. +//! +//! RO replicas do not have a fixed lease LSN but move it forward on each renewal. +//! They refer to their lease through a replica-supplied ID. +//! +//! Limits +//! +//! There are currently no limits imposed on the number of leases per timeline. +//! This should be fixed because it poses a DoS risk. + +use std::{ + collections::{BTreeMap, HashMap, btree_map, hash_map}, + time::SystemTime, +}; + +use itertools::Itertools; +use utils::lsn::Lsn; + +#[derive(Default, Debug, Clone, serde::Serialize)] +pub struct Leases { + holders: HashMap, + lsns: BTreeMap, +} + +enum Holder { + Unnamed(Lsn), + Named(String), +} + +struct Lease { + lsn: Lsn, + valid_until: SystemTime, +} + +pub struct LeasesInfo { + pub num_unique_lsns: usize, +} + +pub struct Pending<'a>(PendingInner<'a>); +enum PendingInner<'a> { + Update { + holder: &'a mut hash_map::OccupiedEntry<'a, &'a Holder, &'a Lease>, + lsns: &'a mut BTreeMap, + update: Lease, + }, + Insert { + holder: &'a mut hash_map::VacantEntry<'a, &'a Holder, &'a Lease>, + lsns: &'a mut BTreeMap, + update: Lease, + }, +} + +pub enum BeginUpsertError { + ExistingLeaseWithLaterExpirationTime { + existing: Lease, + update: Lease, + }, + ExistingLeaseWithHigherLsn { + holder: String, + existing: Lease, + update: Lease, + }, +} + +impl Leases { + pub fn begin_upsert_unnamed( + &mut self, + req: UpsertUnnamed, + ) -> Result { + let UpsertUnnamed { lsn, valid_until } = req; + self.begin_upsert(Holder::Unnamed(lsn), Lease { lsn, valid_until }) + } + + fn begin_upsert(&mut self, holder: Holder, lease: Lease) -> Result { + match &holder { + Holder::Unnamed(lsn) => assert_eq!(lsn, &lease.lsn), + Holder::Named(_) => (), + } + match self.holders.entry(holder) { + hash_map::Entry::Occupied(occupied) => { + // check lsn advancement rules + match occupied.key() { + Holder::Unnamed(lsn) => { + assert_eq!(lsn, occupied.get().lsn, "data structure invariant"); + // assertion because `Holder` is only created by `begin_upsert_unnamed` + // where we are under control of this condition + assert_eq!(lsn, lease.lsn, "unnamed holder cannot change its LSN"); + } + Holder::Named(holder) => { + // named leases can advance their LSN + return Err(BeginUpsertError::ExistingLeaseWithHigherLsn { + holder, + existing: occupied.get().clone(), + update: lease, + }); + } + } + + // check expiration time advancement rules + if valid_until > occupied.get().valid_until { + // ok to extend lease + } else { + return Err(BeginUpsertError::ExistingLeaseWithLaterExpirationTime { + existing: occupied.get().clone(), + update: lease, + }); + } + + Pending(PendingInner::Update { + holder: occupied, + lsns: &mut self.lsns, + update: lease, + }) + } + hash_map::Entry::Vacant(mut vacant) => Pending(PendingInner::Insert { + holder: vacant, + lsns: &mut self.lsns, + update: lease, + }), + } + } +} + +impl<'a> Pending<'a> { + pub fn is_update(&self) -> bool { + match &self.0 { + PendingInner::Update { .. } => true, + PendingInner::Insert { .. } => false, + } + } + pub fn lsn(&self) -> Lsn { + match &self.0 { + PendingInner::Update { + holder, + lsns, + update, + } => update.lsn, + PendingInner::Insert { + holder, + lsns, + update, + } => update.lsn, + } + } + pub fn commit(self) { + let update_refcount; + match self.0 { + PendingInner::Update { + holder, + lsns, + update, + } => { + update_refcount = (Some(holder.get().lsn), update.lsn); + *holder.get_mut() = update; + } + PendingInner::Insert { + holder, + lsns, + update, + } => { + update_refcount = (None, update.lsn); + holder.insert_entry(update) + } + }; + let (old, new) = update_refcount; + if let Some(old) = old { + match lsns.entry(old) { + btree_map::Entry::Vacant(vacant_entry) => unreachable!("data structure invariant"), + btree_map::Entry::Occupied(lsn_refcount) => { + *lsn_refcount.get_mut() = + lsn_refcount.get().checked_sub(1).expect("refcount wrong"); + if lsn_refcount.get() == 0 { + lsn_refcount.remove(); + } + } + } + } + + let lsn_refcount = lsns.entry(new).or_default(); + *lsn_refcount = lsn_refcount.checked_add(1).unwrap(); + } +} + +impl Leases { + pub fn cull(&mut self, now: SystemTime) -> LeasesInfo { + let before = self.leases.len(); + let mut expired = 0; + self.leases.retain(|_, lease| { + if lease.is_expired(&now) { + expired += 1; + false + } else { + true + } + }); + let after = self.leases.len(); + assert!(after <= before, "{after} {before}"); + assert_eq!(after + expired, before, "{after} {expired} {before}"); + LeasesInfo { + num_unique_lsns: after, + } + } + pub fn iter_leased_lsns(&self) -> impl Iterator { + self.leases.keys().cloned() + } + pub fn max_lsn(&self) -> Option { + self.lsns.last_key_value().map(|(lsn, _)| *lsn) + } + pub fn has_lease_at_exactly_this_lsn(&self, lsn: Lsn) -> bool { + self.leases.contains_key(&lsn) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_iter_leased_lsns() { + let mut leases = Leases::default(); + assert_eq!(vec![], leases.iter_leased_lsns().collect()); + } +}