From 8a9f1dd5e7176ea556fda86712aba3e383d5a258 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 5 Aug 2025 22:15:46 +0200 Subject: [PATCH] use tokio::time::Instant internally, chrono::DateTime externally; commuicate expiration through rfc3339 format; chrono::DateTime has good Debug fmt so this also serves observability; finish implementing release valve mechanism --- Cargo.lock | 1 + compute_tools/src/ro_replica.rs | 42 +++++++-------- pageserver/page_api/Cargo.toml | 1 + pageserver/page_api/src/model.rs | 30 ++++++----- pageserver/src/page_service.rs | 34 ++++++------ pageserver/src/tenant.rs | 53 +++++++++++++++++-- pageserver/src/tenant/timeline.rs | 18 +++---- .../src/tenant/timeline/standby_horizon.rs | 48 +++++++++++++---- 8 files changed, 152 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 40f8b04463..55ea89149e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4538,6 +4538,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "chrono", "futures", "pageserver_api", "postgres_ffi_types", diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs index 2cc463d321..113a16984c 100644 --- a/compute_tools/src/ro_replica.rs +++ b/compute_tools/src/ro_replica.rs @@ -1,10 +1,7 @@ -use std::{ - pin::Pin, - str::FromStr, - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{pin::Pin, str::FromStr, sync::Arc, time::Duration}; +use anyhow::Context; +use chrono::Utc; use compute_api::spec::PageserverProtocol; use futures::{StreamExt, stream::FuturesUnordered}; use postgres::SimpleQueryMessage; @@ -90,12 +87,15 @@ async fn bg_task(compute: Arc) { let mut obtained = ObtainedLease { lsn: Lsn(0), - nearest_expiration: SystemTime::UNIX_EPOCH, + nearest_expiration: Utc::now(), }; loop { - let valid_duration = obtained + let valid_duration: Duration = obtained .nearest_expiration - .duration_since(SystemTime::now()) + .signed_duration_since(Utc::now()) + .to_std() + // to_std() errors if the duration is less than zero, i.e,. if the lease already expired; + // try to renew anyway in that case; .unwrap_or_default(); // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration. let sleep_duration = valid_duration @@ -139,7 +139,7 @@ async fn bg_task(compute: Arc) { struct ObtainedLease { lsn: Lsn, - nearest_expiration: SystemTime, + nearest_expiration: chrono::DateTime, } async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result { @@ -164,7 +164,7 @@ async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result timeline_id=%timeline_id, ); let logging_wrapper = - |fut: Pin>>>>| { + |fut: Pin>>>>>| { async move { // TODO: timeout? match fut.await { @@ -233,7 +233,7 @@ async fn attempt_one_libpq( timeline_id: TimelineId, lease_id: String, lsn: Lsn, -) -> anyhow::Result> { +) -> anyhow::Result>> { let ConnectInfo { tenant_shard_id, connstring, @@ -256,14 +256,12 @@ async fn attempt_one_libpq( _ => anyhow::bail!("expected row message type"), }; - // Note: this will be None if a lease is explicitly not granted. - let Some(expiration) = row.get("expiration") else { - return Ok(None); - }; - - let expiration = - SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?)); - Ok(expiration) + // Note: this will be NULL (=> None) if a lease is explicitly not granted. + row.get("expiration") + .map(|s| { + chrono::DateTime::::from_str(s).with_context(|| format!("parse expiration: {s:?}")) + }) + .transpose() } async fn attempt_one_grpc( @@ -271,7 +269,7 @@ async fn attempt_one_grpc( timeline_id: TimelineId, lease_id: String, lsn: Lsn, -) -> anyhow::Result> { +) -> anyhow::Result>> { let ConnectInfo { tenant_shard_id, connstring, @@ -289,7 +287,7 @@ async fn attempt_one_grpc( let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn }; match client.lease_standby_horizon(req).await { - Ok(expires) => Ok(Some(expires)), + Ok(pageserver_page_api::LeaseStandbyHorizonResponse { expiration }) => Ok(Some(expiration)), // Lease couldn't be acquired Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None), Err(err) => Err(err.into()), diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index fbad8cf9d0..5c819211ea 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +chrono.workspace = true bytes.workspace = true futures.workspace = true pageserver_api.workspace = true diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 477003af16..898a1cb884 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -15,10 +15,10 @@ //! receivers should expect all sorts of junk from senders. This also allows the sender to use e.g. //! stream combinators without dealing with errors, and avoids validating the same message twice. -use std::fmt::Display; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{fmt::Display, time::{Duration, SystemTime, UNIX_EPOCH}}; use bytes::Bytes; +use chrono::Utc; use postgres_ffi_types::Oid; // TODO: split out Lsn, RelTag, SlruKind and other basic types to a separate crate, to avoid // pulling in all of their other crate dependencies when building the client. @@ -789,29 +789,35 @@ impl From for proto::LeaseStandbyHorizonRequest { /// Lease expiration time. If the lease could not be granted because the LSN has already been /// garbage collected, a FailedPrecondition status will be returned instead. -pub type LeaseStandbyHorizonResponse = SystemTime; +pub struct LeaseStandbyHorizonResponse { + pub expiration: chrono::DateTime, +} impl TryFrom for LeaseStandbyHorizonResponse { type Error = ProtocolError; fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result { let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?; - UNIX_EPOCH - .checked_add(Duration::new( - expiration.seconds as u64, - expiration.nanos as u32, - )) - .ok_or_else(|| ProtocolError::invalid("expiration", expiration)) + Ok(Self { + // TODO: upgrade prost-type to 0.14.1 and use `chrono` feature? + expiration: chrono::DateTime::::from_timestamp( + expiration.seconds, + expiration + .nanos + .try_into() + .map_err(|_| ProtocolError::invalid("expiration.nanos", expiration.nanos))?, + ) + .ok_or_else(|| ProtocolError::invalid("expiration", expiration))?, + }) } } impl From for proto::LeaseStandbyHorizonResponse { fn from(response: LeaseStandbyHorizonResponse) -> Self { - let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default(); Self { expiration: Some(prost_types::Timestamp { - seconds: expiration.as_secs() as i64, - nanos: expiration.subsec_nanos() as i32, + seconds: response.expiration.timestamp() as i64, + nanos: response.expiration.timestamp_subsec_nanos() as i32, }), } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 993f4401e3..87b155a143 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -84,7 +84,7 @@ use crate::tenant::mgr::{ }; use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::handle::{Handle, HandleUpgradeError, WeakHandle}; -use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter}; +use crate::tenant::timeline::{self, WaitLsnError, WaitLsnTimeout, WaitLsnWaiter, standby_horizon}; use crate::tenant::{GetTimelineError, PageReconstructError, Timeline}; use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation}; @@ -2236,7 +2236,7 @@ impl PageServerHandler { Ok(()) } - #[instrument(skip_all, fields(shard_id, %lsn), ret)] + #[instrument(skip_all, fields(shard_id, %lsn))] async fn handle_lease_standby_horizon( &mut self, pgb: &mut PostgresBackend, @@ -2263,26 +2263,17 @@ impl PageServerHandler { .await?; set_tracing_field_shard_id(&timeline); - let result: Option = timeline - .lease_standby_horizon(lease_id, lsn, ctx) // logs errors internally + let result = timeline + // logs both Ok() and Err() internally, no need to do it here + .lease_standby_horizon(lease_id, lsn, ctx) .ok(); - debug!( - result = result.map(|x| chrono::DateTime::::from(x).to_rfc3339()), - "result" - ); // XXX better observability isn't great - // Encode result as Option - let bytes = result.map(|t| { - t.duration_since(SystemTime::UNIX_EPOCH) - .expect("we wouldn't allow a lease at epoch, system time would be horribly off") - .as_millis() - .to_string() - .into_bytes() - }); pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( b"expiration", )]))? - .write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?; + .write_message_noflush(&BeMessage::DataRow(&[result + .map(|standby_horizon::LeaseInfo { valid_until }| valid_until.to_rfc3339().into_bytes()) + .as_deref()]))?; Ok(()) } @@ -3952,12 +3943,17 @@ impl proto::PageService for GrpcPageServiceHandler { span_record!(lease_id=%lease_id, lsn=%lsn); // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted. - let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) { + let standby_horizon::LeaseInfo { + valid_until: expiration, + } = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) { Ok(expiration) => expiration, + // Use Display so the error towards the client is crisp; the function already logged the error with backtrace. Err(err) => return Err(tonic::Status::failed_precondition(format!("{err}"))), }; - Ok(tonic::Response::new(expiration.into())) + Ok(tonic::Response::new( + page_api::LeaseStandbyHorizonResponse { expiration }.into(), + )) } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 806e60e9ba..78f878d144 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4880,7 +4880,9 @@ impl TenantShard { // Cull any expired leases let now = SystemTime::now(); target.lsn_leases.retain(|_, lease| !lease.is_expired(&now)); - timeline.standby_horizons.cull_leases(now); + timeline + .standby_horizons + .cull_leases(tokio::time::Instant::now()); timeline .metrics @@ -6218,6 +6220,7 @@ mod tests { use crate::keyspace::KeySpaceAccum; use crate::tenant::harness::*; use crate::tenant::timeline::CompactFlags; + use crate::tenant::timeline::standby_horizon::LeaseInfo; static TEST_KEY: Lazy = Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001"))); @@ -9609,11 +9612,13 @@ mod tests { // NB: leases are tracked by SystemTime, which is not monotonic, but we want to assert monotonicity of the lease below. // Sleep a second to make flakiness less likely. tokio::time::sleep(Duration::from_secs(1)).await; - let renewed_lease = timeline + let LeaseInfo { + valid_until: renewed_lease, + } = timeline .lease_standby_horizon("renewed_lease_0".to_string(), Lsn(initial_leases[0]), &ctx) .expect("standby horizon lease renewal should succeed"); assert!( - renewed_lease >= expirations[0], + renewed_lease >= expirations[0].valid_until, "New lease should have expiration time at least as good as original" ); @@ -9696,7 +9701,7 @@ mod tests { // timeline .standby_horizons - .cull_leases(SystemTime::now() + lease_length + Duration::from_secs(1)); + .cull_leases(tokio::time::Instant::now() + lease_length + Duration::from_secs(1)); assert_eq!(timeline.standby_horizons.get_leases().len(), 0); @@ -9846,6 +9851,46 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_standby_horizon_emergency_forget_all_leases() -> anyhow::Result<()> { + let (tenant, ctx) = + TenantHarness::create("test_standby_horizon_emergency_forget_all_leases") + .await + .unwrap() + .load() + .await; + + let timeline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) + .await?; + + // + // Empty call works + // + timeline.standby_horizons.validate_invariants(&timeline); + timeline + .standby_horizons + .emergency_forget_all_leases_immediately(); + timeline.standby_horizons.validate_invariants(&timeline); + + // + // Leases are cleared but legacy remains + // + let legacy_lsn = Lsn(0x10); + timeline.standby_horizons.register_legacy_update(legacy_lsn); + timeline.lease_standby_horizon("mylease".to_string(), Lsn(0x20), &ctx)?; + timeline.lease_standby_horizon("otherlease".to_string(), Lsn(0x30), &ctx)?; + timeline.standby_horizons.validate_invariants(&timeline); + timeline + .standby_horizons + .emergency_forget_all_leases_immediately(); + timeline.standby_horizons.validate_invariants(&timeline); + assert_eq!(timeline.standby_horizons.get_leases().len(), 0); + assert_eq!(timeline.standby_horizons.legacy(), Some(legacy_lsn)); + + Ok(()) + } + #[tokio::test] async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> { // diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3334791c43..4cb5519c9c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1868,22 +1868,24 @@ impl Timeline { } /// Logs return value at info level and errors at warn level in Debug::fmt. - #[instrument(skip(_ctx), ret(level = tracing::Level::INFO, Debug), err(level = tracing::Level::WARN, Debug))] + #[instrument(skip_all, fields(?lease_id, %lsn), ret(level = tracing::Level::INFO, Debug), err(level = tracing::Level::WARN, Debug))] pub(crate) fn lease_standby_horizon( &self, lease_id: String, lsn: Lsn, _ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result { if self .feature_resolver .evaluate_boolean("standby-horizon-leases-track-disable") .is_ok() { - // Use feature-flagging as a fail-safe in case something is wrong with the data structure (memory consumption, etc.) - return Ok(SystemTime::now() - .checked_add(Duration::from_secs(5 * 60)) - .unwrap()); + // Use feature-flagging as an emergency switch in case something is wrong with the data structure (memory consumption, etc.) + self.standby_horizons + .emergency_forget_all_leases_immediately(); + return Ok(standby_horizon::LeaseInfo { + valid_until: Utc::now() + self.get_standby_horizon_lease_length(), + }); } let applied_gc_cutoff_lsn_guard = self.get_applied_gc_cutoff_lsn(); if lsn < *applied_gc_cutoff_lsn_guard { @@ -1894,9 +1896,7 @@ impl Timeline { ); } let length = self.get_standby_horizon_lease_length(); - self.standby_horizons - .upsert_lease(lease_id, lsn, length) - .map(|lease| lease.valid_until) + self.standby_horizons.upsert_lease(lease_id, lsn, length) } /// Freeze the current open in-memory layer. It will be written to disk on next iteration. diff --git a/pageserver/src/tenant/timeline/standby_horizon.rs b/pageserver/src/tenant/timeline/standby_horizon.rs index e2b249409e..0865b13101 100644 --- a/pageserver/src/tenant/timeline/standby_horizon.rs +++ b/pageserver/src/tenant/timeline/standby_horizon.rs @@ -12,10 +12,12 @@ use std::{ collections::{HashMap, hash_map}, - time::{Duration, SystemTime}, + time::Duration, }; +use chrono::Utc; use metrics::{IntGauge, UIntGauge}; +use tokio::time::Instant; use tracing::{instrument, warn}; use utils::lsn::Lsn; @@ -43,7 +45,7 @@ pub struct Metrics { #[derive(Debug)] struct Lease { - valid_until: SystemTime, + valid_until: tokio::time::Instant, lsn: Lsn, } @@ -62,7 +64,7 @@ impl Lease { #[derive(Debug)] pub struct LeaseInfo { - pub valid_until: SystemTime, + pub valid_until: chrono::DateTime, } /// Returned by [`Self::min_and_clear_legacy`]. @@ -129,6 +131,8 @@ impl Horizons { } } + /// Renew a lease. Due to their nonpersistent nature we can't tell the difference between + /// a renewal and an initial lease, so, let's call this `upsert` as a neutral term. pub fn upsert_lease( &self, id: String, @@ -136,7 +140,8 @@ impl Horizons { length: Duration, ) -> anyhow::Result { let mut inner = self.inner.lock().unwrap(); - let valid_until = SystemTime::now() + length; + let now = Instant::now(); + let valid_until = now + length; let update = Lease { valid_until, lsn }; let updated = match inner.leases_by_id.entry(id) { hash_map::Entry::Occupied(mut entry) => { @@ -145,18 +150,26 @@ impl Horizons { } hash_map::Entry::Vacant(entry) => entry.insert(update), }; - let res = LeaseInfo { - valid_until: updated.valid_until, + // Convert the internal expiration time to a wallclock time to return it to the caller. + let lease_info = LeaseInfo { + valid_until: Utc::now() + + updated + .valid_until + // reuse now as the upsert is assumed to be fast + .checked_duration_since(now) + .expect("reuse of now() + guarantee of monotonicity in try_update"), }; + let new_count = inner.leases_by_id.len().into_u64(); inner.metrics.leases_count.set(new_count); let leases_min = inner.leases_by_id.values().map(|v| v.lsn).min(); inner.leases_min = leases_min; inner.metrics.leases_min.set(leases_min.unwrap_or(Lsn(0)).0); - Ok(res) + + Ok(lease_info) } - pub fn cull_leases(&self, now: SystemTime) { + pub fn cull_leases(&self, now: Instant) { let mut inner = self.inner.lock().unwrap(); let mut min = None; inner.leases_by_id.retain(|_, l| { @@ -176,6 +189,23 @@ impl Horizons { inner.metrics.leases_min.set(min.unwrap_or(Lsn(0)).0); } + #[instrument(skip_all)] + pub fn emergency_forget_all_leases_immediately(&self) { + let mut inner = self.inner.lock().unwrap(); + for (lease_id, _) in inner.leases_by_id.drain() { + warn!("dropping lease early {}", lease_id); + } + inner + .metrics + .leases_count + .set(inner.leases_by_id.len().into_u64()); + inner.leases_min = None; + inner + .metrics + .leases_min + .set(inner.leases_min.unwrap_or(Lsn(0)).0); + } + pub fn dump(&self) -> serde_json::Value { let inner = self.inner.lock().unwrap(); let Inner { @@ -252,7 +282,7 @@ impl Horizons { } #[cfg(test)] - pub fn get_leases(&self) -> Vec<(Lsn, SystemTime)> { + pub fn get_leases(&self) -> Vec<(Lsn, tokio::time::Instant)> { let inner = self.inner.lock().unwrap(); inner .leases_by_id