mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
rudimentary leases impl, lacks initial lease deadline stuff
This commit is contained in:
@@ -2258,9 +2258,7 @@ impl PageServerHandler {
|
||||
|
||||
let result: Option<SystemTime> = timeline
|
||||
.lease_standby_horizon(lease_id, lsn, ctx)
|
||||
.inspect_err(|e| {
|
||||
warn!("{e}");
|
||||
})
|
||||
// logging happens inside
|
||||
.ok();
|
||||
|
||||
// Encode result as Option<millis since epoch>
|
||||
@@ -3946,7 +3944,7 @@ impl proto::PageService for GrpcPageServiceHandler {
|
||||
// Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted.
|
||||
let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) {
|
||||
Ok(expiration) => expiration,
|
||||
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))),
|
||||
Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))),
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(expiration.into()))
|
||||
|
||||
@@ -4830,6 +4830,7 @@ impl TenantShard {
|
||||
// Cull any expired leases
|
||||
let now = SystemTime::now();
|
||||
target.leases.retain(|_, lease| !lease.is_expired(&now));
|
||||
timeline.standby_horizons.cull_leases(now);
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
|
||||
@@ -1861,13 +1861,27 @@ impl Timeline {
|
||||
Ok(lease)
|
||||
}
|
||||
|
||||
#[instrument(skip(_ctx), ret(Debug), err(Debug))]
|
||||
pub(crate) fn lease_standby_horizon(
|
||||
&self,
|
||||
lease_id: String,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<SystemTime> {
|
||||
todo!()
|
||||
if self
|
||||
.feature_resolver
|
||||
.evaluate_boolean("standby-horizon-leases-track-disable")
|
||||
.is_ok()
|
||||
{
|
||||
// Use feature-flagging as a fail-safe in case something is wrong with the data structure (memory consumption, etc.)
|
||||
return Ok(SystemTime::now()
|
||||
.checked_add(Duration::from_secs(5 * 60))
|
||||
.unwrap());
|
||||
}
|
||||
let length = todo!("duplicate init lease deadline logic?");
|
||||
self.standby_horizons
|
||||
.upsert_lease(lease_id, lsn, length)
|
||||
.map(|lease| lease.valid_until)
|
||||
}
|
||||
|
||||
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
|
||||
|
||||
@@ -10,6 +10,11 @@
|
||||
//! - legacy: as described in RFC36, replica->safekeeper->broker->pageserver
|
||||
//! - leases: TODO
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, hash_map},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use metrics::IntGauge;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -18,7 +23,31 @@ pub struct Horizons {
|
||||
}
|
||||
struct Inner {
|
||||
legacy: Option<Lsn>,
|
||||
pub legacy_metric: IntGauge,
|
||||
legacy_metric: IntGauge,
|
||||
leases_by_id: HashMap<String, Lease>,
|
||||
}
|
||||
|
||||
struct Lease {
|
||||
valid_until: SystemTime,
|
||||
lsn: Lsn,
|
||||
}
|
||||
|
||||
impl Lease {
|
||||
pub fn try_update(&mut self, update: Lease) -> anyhow::Result<()> {
|
||||
let Lease {
|
||||
valid_until: expiration,
|
||||
lsn,
|
||||
} = update;
|
||||
anyhow::ensure!(self.valid_until <= expiration);
|
||||
anyhow::ensure!(self.lsn <= lsn);
|
||||
*self = update;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LeaseInfo {
|
||||
pub valid_until: SystemTime,
|
||||
}
|
||||
|
||||
/// Returned by [`Self::min_and_clear_legacy`].
|
||||
@@ -36,6 +65,7 @@ impl Horizons {
|
||||
inner: std::sync::Mutex::new(Inner {
|
||||
legacy: None,
|
||||
legacy_metric,
|
||||
leases_by_id: Default::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -62,11 +92,37 @@ impl Horizons {
|
||||
inner.legacy.take()
|
||||
};
|
||||
|
||||
// TODO: support leases
|
||||
let leases = [];
|
||||
|
||||
let all = legacy.into_iter().chain(leases.into_iter()).min();
|
||||
let all = legacy
|
||||
.into_iter()
|
||||
.chain(inner.leases_by_id.values().map(|lease| lease.lsn))
|
||||
.min();
|
||||
|
||||
Mins { legacy, all }
|
||||
}
|
||||
|
||||
pub fn upsert_lease(
|
||||
&self,
|
||||
id: String,
|
||||
lsn: Lsn,
|
||||
length: Duration,
|
||||
) -> anyhow::Result<LeaseInfo> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let valid_until = SystemTime::now() + length;
|
||||
let update = Lease { valid_until, lsn };
|
||||
let updated = match inner.leases_by_id.entry(id) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().try_update(update)?;
|
||||
entry.into_mut()
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => entry.insert(update),
|
||||
};
|
||||
Ok(LeaseInfo {
|
||||
valid_until: updated.valid_until,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn cull_leases(&self, now: SystemTime) {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.leases_by_id.retain(|_, l| l.valid_until <= now);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user