Revert "WIP Leases struct + refactor"

This reverts commit 651a321886.
This commit is contained in:
Christian Schwarz
2025-07-17 14:52:37 +00:00
parent b2ba489cc6
commit ecd189b5b7
7 changed files with 84 additions and 348 deletions

View File

@@ -1050,7 +1050,7 @@ async fn get_lsn_by_timestamp_handler(
let lease = if with_lease {
timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), true, &ctx)
.init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
.inspect_err(|_| {
warn!("fail to grant a lease to {}", lsn);
})
@@ -2222,7 +2222,7 @@ async fn post_lsn_lease_handler(
let result = async {
timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), true, &ctx)
.init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
.map_err(|e| {
ApiError::InternalServerError(
e.context(format!("invalid lsn lease request at {lsn}")),

View File

@@ -2199,7 +2199,7 @@ impl PageServerHandler {
set_tracing_field_shard_id(&timeline);
let lease = timeline
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), false, ctx)
.renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
.inspect_err(|e| {
warn!("{e}");
})
@@ -3787,7 +3787,7 @@ impl proto::PageService for GrpcPageServiceHandler {
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
let lease_length = timeline.get_lsn_lease_length();
let expires = match timeline.make_lsn_lease(req.lsn, lease_length, false, &ctx) {
let expires = match timeline.renew_lsn_lease(req.lsn, lease_length, &ctx) {
Ok(lease) => lease.valid_until,
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
};

View File

@@ -4829,12 +4829,12 @@ impl TenantShard {
// Cull any expired leases
let now = SystemTime::now();
let leases_info = target.leases.cull(now);
target.leases.retain(|_, lease| !lease.is_expired(&now));
timeline
.metrics
.valid_lsn_lease_count_gauge
.set(leases_info.num_unique_lsns as u64);
.set(target.leases.len() as u64);
// Look up parent's PITR cutoff to update the child's knowledge of whether it is within parent's PITR
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
@@ -9439,7 +9439,7 @@ mod tests {
});
let updated_lease_0 = timeline
.lease_lsn2(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)
.renew_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)
.expect("lease renewal should succeed");
assert_eq!(
updated_lease_0.valid_until, leases[0].valid_until,
@@ -9447,7 +9447,7 @@ mod tests {
);
let updated_lease_1 = timeline
.lease_lsn2(
.renew_lsn_lease(
Lsn(leased_lsns[1]),
timeline.get_lsn_lease_length() * 2,
&ctx,

View File

@@ -252,8 +252,9 @@ pub(super) async fn gather_inputs(
let lease_points = gc_info
.leases
.iter_leased_lsns()
.filter(|lsn| *lsn > ancestor_lsn)
.keys()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
.collect::<Vec<_>>();
// next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we

View File

@@ -8,7 +8,6 @@ mod heatmap_layers_downloader;
pub(crate) mod import_pgdata;
mod init;
pub mod layer_manager;
mod leases;
pub(crate) mod logical_size;
pub mod offload;
pub mod span;
@@ -482,7 +481,8 @@ pub(crate) struct GcInfo {
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
pub(crate) leases: leases::Leases,
/// Leases granted to particular LSNs.
pub(crate) leases: BTreeMap<Lsn, LsnLease>,
/// Whether our branch point is within our ancestor's PITR interval (for cost estimation)
pub(crate) within_ancestor_pitr: bool,
@@ -530,7 +530,7 @@ impl GcInfo {
self.remove_child_maybe_offloaded(child_id, MaybeOffloaded::Yes)
}
pub(crate) fn lsn_covered_by_lease(&self, lsn: Lsn) -> bool {
self.leases.has_lease_at_exactly_this_lsn(lsn)
self.leases.contains_key(&lsn)
}
}
@@ -1760,63 +1760,76 @@ impl Timeline {
Ok(())
}
/// Upsert a temporary lease to inhibit garbage collection for the given LSN.
///
/// renewal of an existing lease always works
/// page_service/compute can create new leases above applied cutoff
/// management API/cplane can create new leases above planned cutoff
///
pub fn make_lsn_lease(
/// Initializes an LSN lease. The function will return an error if the requested LSN is less than the `latest_gc_cutoff_lsn`.
pub(crate) fn init_lsn_lease(
&self,
lsn: Lsn,
length: Duration,
is_management_api: bool,
ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
self.make_lsn_lease(lsn, length, true, ctx)
}
/// Renews a lease at a particular LSN. The requested LSN is not validated against the `latest_gc_cutoff_lsn` when we are in the grace period.
pub(crate) fn renew_lsn_lease(
&self,
lsn: Lsn,
length: Duration,
ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
self.make_lsn_lease(lsn, length, false, ctx)
}
/// Obtains a temporary lease blocking garbage collection for the given LSN.
///
/// If we are in `AttachedSingle` mode and is not blocked by the lsn lease deadline, this function will error
/// if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is no existing request present.
///
/// If there is an existing lease in the map, the lease will be renewed only if the request extends the lease.
/// The returned lease is therefore the maximum between the existing lease and the requesting lease.
fn make_lsn_lease(
&self,
lsn: Lsn,
length: Duration,
init: bool,
_ctx: &RequestContext,
) -> anyhow::Result<LsnLease> {
// Normalize the requested LSN to be aligned, and move to the first record
// if it points to the beginning of the page (header).
let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE);
let lease = {
// Normalize the requested LSN to be aligned, and move to the first record
// if it points to the beginning of the page (header).
let lsn = xlog_utils::normalize_lsn(lsn, WAL_SEGMENT_SIZE);
let mut gc_info = self.gc_info.write().unwrap();
let planned_cutoff = gc_info.min_cutoff();
let mut gc_info = self.gc_info.write().unwrap();
let planned_cutoff = gc_info.min_cutoff();
let valid_until = SystemTime::now() + length;
let valid_until = SystemTime::now() + length;
let pending = match gc_info.leases.begin_upsert_unnamed(lsn) {
Ok(pending) => pending,
Err(leases::BeginUpsertError::ExistingLeaseWithHigherLsn {
holder,
existing,
update,
}) => {
return Err(anyhow::anyhow!(
"tried to move named lsn lease to an older LSN: existing={existing:?} update={update:?} holder={holder}"
));
}
Err(leases::BeginUpsertError::ExistingLeaseWithLaterExpirationTime {
existing,
update,
}) => {
info!("existing lease covers greater length, valid until {}", dt);
}
};
let entry = gc_info.leases.entry(lsn);
// Policy checks for lease renewal.
//
// Note that we avoid persisting leases and instead inhibit GC for the max lease duration after tenant restart/migration.
// This period is called `lsn_lease_deadline`.
// So, just an insert into runtime data structure can in fact be a renewal from the caller's point of view.
match entry {
Entry::Occupied(mut occupied) => {
let existing_lease = occupied.get_mut();
if valid_until > existing_lease.valid_until {
existing_lease.valid_until = valid_until;
let dt: DateTime<Utc> = valid_until.into();
info!("lease extended to {}", dt);
} else {
let dt: DateTime<Utc> = existing_lease.valid_until.into();
info!("existing lease covers greater length, valid until {}", dt);
}
// Rule #1: never allow a lease to exist below applied GC cutoff.
//
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
existing_lease.clone()
}
Entry::Vacant(vacant) => {
// Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted.
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
// We allow create lease for those below the planned gc cutoff if we are still within the grace period
// of GC blocking.
@@ -1838,56 +1851,10 @@ impl Timeline {
let dt: DateTime<Utc> = valid_until.into();
info!("lease created, valid until {}", dt);
vacant.insert(LsnLease { valid_until }).clone()
if gc_info.leases.has_lease_at_exactly_this_lsn(lsn) {
// It's a renewal.
} else {
self.feature_resolver
.evaluate_boolean("lease-renewal-allow-");
// Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted.
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
}
};
// We avoid persisting leases and instead inhibit GC for the max lease duration after tenant restart.
//
let validate = {
let conf = self.tenant_conf.load();
!conf.is_gc_blocked_by_lsn_lease_deadline()
};
if !(is_compute_request && conf.is_gc_blocked_by_lsn_lease_deadline())
&& lsn < planned_cutoff
{
bail!(
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
lsn,
planned_cutoff
);
}
}
let (lease, result) = gc_info
.leases
.begin_upsert_unnamed(leases::UpsertUnnamed { lsn, valid_until });
let dt: DateTime<Utc> = lease.valid_until.into();
match result {
leases::UpsertUnnamedResult::Created => {
info!("lease created, valid until {}", dt);
}
leases::UpsertUnnamedResult::ExtendedExisting => {
info!("lease extended to {}", dt);
}
leases::UpsertUnnamedResult::ExistingIsLonger { valid_until } => {
info!("existing lease covers greater length, valid until {}", dt);
}
}
Ok(lease)
}
@@ -6536,8 +6503,11 @@ impl Timeline {
.map(|(lsn, _child_id, _is_offloaded)| *lsn)
.collect();
// NB: We already culled stale leases in `refresh_gc_info`.
let max_lsn_with_valid_lease = gc_info.leases.max_lsn();
// Gets the maximum LSN that holds the valid lease.
//
// Caveat: `refresh_gc_info` is in charged of updating the lease map.
// Here, we do not check for stale leases again.
let max_lsn_with_valid_lease = gc_info.leases.last_key_value().map(|(lsn, _)| *lsn);
(
space_cutoff,

View File

@@ -3267,9 +3267,9 @@ impl Timeline {
retain_lsns_below_horizon.push(*lsn);
}
}
for lsn in gc_info.leases.iter_leased_lsns() {
if lsn < gc_cutoff {
retain_lsns_below_horizon.push(lsn);
for lsn in gc_info.leases.keys() {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
}
}
let mut selected_layers: Vec<Layer> = Vec::new();

View File

@@ -1,235 +0,0 @@
//! Leases ensure readability at the leased or later LSN by inhibiting GC.
//!
//! Users of leases:
//!
//! - cplane to avoid time-of-check time-of-use issues when it retrieves an
//! LSN from some Pageserver APIs and later wants to use that LSN in an API
//! that requires the passed LSN to be readable. Example: branch creation.
//! - static-lsn computes aka ephemeral endpoints aka static computes that
//! issue getpage requests at a fixed LSN
//! - soon: RO replicas instead of the current `standby_horizon` mechanism.
//!
//! The first two identify their lease by LSN.
//! If there are multiple such leasers for the same LSN, they share an underlying
//! expiration deadline.
//!
//! RO replicas do not have a fixed lease LSN but move it forward on each renewal.
//! They refer to their lease through a replica-supplied ID.
//!
//! Limits
//!
//! There are currently no limits imposed on the number of leases per timeline.
//! This should be fixed because it poses a DoS risk.
use std::{
collections::{BTreeMap, HashMap, btree_map, hash_map},
time::SystemTime,
};
use itertools::Itertools;
use utils::lsn::Lsn;
#[derive(Default, Debug, Clone, serde::Serialize)]
pub struct Leases {
holders: HashMap<Holder, Lease>,
lsns: BTreeMap<Lsn, usize>,
}
enum Holder {
Unnamed(Lsn),
Named(String),
}
struct Lease {
lsn: Lsn,
valid_until: SystemTime,
}
pub struct LeasesInfo {
pub num_unique_lsns: usize,
}
pub struct Pending<'a>(PendingInner<'a>);
enum PendingInner<'a> {
Update {
holder: &'a mut hash_map::OccupiedEntry<'a, &'a Holder, &'a Lease>,
lsns: &'a mut BTreeMap<Lsn, usize>,
update: Lease,
},
Insert {
holder: &'a mut hash_map::VacantEntry<'a, &'a Holder, &'a Lease>,
lsns: &'a mut BTreeMap<Lsn, usize>,
update: Lease,
},
}
pub enum BeginUpsertError {
ExistingLeaseWithLaterExpirationTime {
existing: Lease,
update: Lease,
},
ExistingLeaseWithHigherLsn {
holder: String,
existing: Lease,
update: Lease,
},
}
impl Leases {
pub fn begin_upsert_unnamed(
&mut self,
req: UpsertUnnamed,
) -> Result<PendingInner, BeginUpsertError> {
let UpsertUnnamed { lsn, valid_until } = req;
self.begin_upsert(Holder::Unnamed(lsn), Lease { lsn, valid_until })
}
fn begin_upsert(&mut self, holder: Holder, lease: Lease) -> Result<Pending, BeginUpsertError> {
match &holder {
Holder::Unnamed(lsn) => assert_eq!(lsn, &lease.lsn),
Holder::Named(_) => (),
}
match self.holders.entry(holder) {
hash_map::Entry::Occupied(occupied) => {
// check lsn advancement rules
match occupied.key() {
Holder::Unnamed(lsn) => {
assert_eq!(lsn, occupied.get().lsn, "data structure invariant");
// assertion because `Holder` is only created by `begin_upsert_unnamed`
// where we are under control of this condition
assert_eq!(lsn, lease.lsn, "unnamed holder cannot change its LSN");
}
Holder::Named(holder) => {
// named leases can advance their LSN
return Err(BeginUpsertError::ExistingLeaseWithHigherLsn {
holder,
existing: occupied.get().clone(),
update: lease,
});
}
}
// check expiration time advancement rules
if valid_until > occupied.get().valid_until {
// ok to extend lease
} else {
return Err(BeginUpsertError::ExistingLeaseWithLaterExpirationTime {
existing: occupied.get().clone(),
update: lease,
});
}
Pending(PendingInner::Update {
holder: occupied,
lsns: &mut self.lsns,
update: lease,
})
}
hash_map::Entry::Vacant(mut vacant) => Pending(PendingInner::Insert {
holder: vacant,
lsns: &mut self.lsns,
update: lease,
}),
}
}
}
impl<'a> Pending<'a> {
pub fn is_update(&self) -> bool {
match &self.0 {
PendingInner::Update { .. } => true,
PendingInner::Insert { .. } => false,
}
}
pub fn lsn(&self) -> Lsn {
match &self.0 {
PendingInner::Update {
holder,
lsns,
update,
} => update.lsn,
PendingInner::Insert {
holder,
lsns,
update,
} => update.lsn,
}
}
pub fn commit(self) {
let update_refcount;
match self.0 {
PendingInner::Update {
holder,
lsns,
update,
} => {
update_refcount = (Some(holder.get().lsn), update.lsn);
*holder.get_mut() = update;
}
PendingInner::Insert {
holder,
lsns,
update,
} => {
update_refcount = (None, update.lsn);
holder.insert_entry(update)
}
};
let (old, new) = update_refcount;
if let Some(old) = old {
match lsns.entry(old) {
btree_map::Entry::Vacant(vacant_entry) => unreachable!("data structure invariant"),
btree_map::Entry::Occupied(lsn_refcount) => {
*lsn_refcount.get_mut() =
lsn_refcount.get().checked_sub(1).expect("refcount wrong");
if lsn_refcount.get() == 0 {
lsn_refcount.remove();
}
}
}
}
let lsn_refcount = lsns.entry(new).or_default();
*lsn_refcount = lsn_refcount.checked_add(1).unwrap();
}
}
impl Leases {
pub fn cull(&mut self, now: SystemTime) -> LeasesInfo {
let before = self.leases.len();
let mut expired = 0;
self.leases.retain(|_, lease| {
if lease.is_expired(&now) {
expired += 1;
false
} else {
true
}
});
let after = self.leases.len();
assert!(after <= before, "{after} {before}");
assert_eq!(after + expired, before, "{after} {expired} {before}");
LeasesInfo {
num_unique_lsns: after,
}
}
pub fn iter_leased_lsns(&self) -> impl Iterator<Item = Lsn> {
self.leases.keys().cloned()
}
pub fn max_lsn(&self) -> Option<Lsn> {
self.lsns.last_key_value().map(|(lsn, _)| *lsn)
}
pub fn has_lease_at_exactly_this_lsn(&self, lsn: Lsn) -> bool {
self.leases.contains_key(&lsn)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_iter_leased_lsns() {
let mut leases = Leases::default();
assert_eq!(vec![], leases.iter_leased_lsns().collect());
}
}