From 0d2c1000483688cd7335975ae0a799ed74500a0b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 20 Jul 2025 18:51:36 +0000 Subject: [PATCH] WIP --- compute_tools/src/ro_replica.rs | 205 ++++++++++++++----- pageserver/page_api/proto/page_service.proto | 25 +++ pageserver/page_api/src/client.rs | 6 + pageserver/page_api/src/model.rs | 61 ++++++ pageserver/src/metrics.rs | 1 + pageserver/src/page_service.rs | 130 +++++++++++- 6 files changed, 374 insertions(+), 54 deletions(-) diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs index eb6c5f535a..1a56ff0d92 100644 --- a/compute_tools/src/ro_replica.rs +++ b/compute_tools/src/ro_replica.rs @@ -5,10 +5,13 @@ use std::{ time::{Duration, Instant, SystemTime}, }; +use anyhow::Context; +use chrono::Utc; use compute_api::spec::PageserverProtocol; use futures::{StreamExt, stream::FuturesUnordered}; +use postgres::SimpleQueryMessage; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, error, info, info_span, warn}; +use tracing::{Instrument, error, info, info_span, instrument, warn}; use utils::{backoff::retry, id::TimelineId, lsn::Lsn}; use crate::{ @@ -47,57 +50,87 @@ pub fn spawn_bg_task(compute: Arc) { #[derive(Clone)] struct Reservation { horizon: Lsn, - expiration: Instant, + expiration: SystemTime, } +#[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))] async fn bg_task(compute: Arc) { + // Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation. + let lease_id = format!("v1-{}-{}", compute.params.compute_id, std::process::id()); + tracing::Span::current().record("lease_id", tracing::field::display(&lease_id)); + // Wait until we have the first value. // Allows us to simply .unwrap() later because it never transitions back to None. + info!("waiting for first lease lsn to be fetched from postgres"); let mut min_inflight_request_lsn_changed = compute.ro_replica.min_inflight_request_lsn.subscribe(); - min_inflight_request_lsn_changed.mark_changed(); + min_inflight_request_lsn_changed.mark_changed(); // it could have been set already min_inflight_request_lsn_changed .wait_for(|value| value.is_some()) .await; - let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(()); + // React to connstring changes. Sadly there is no async API for this yet. + let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None); std::thread::spawn({ let compute = Arc::clone(&compute); move || { loop { - compute - .wait_timeout_while_pageserver_connstr_unchanged(Duration::from_secs(todo!())); - connstr_watch_tx.send_replace(()); + compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::MAX); + let new = compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .and_then(|pspec| pspec.spec.pageserver_connstring.clone()); + connstr_watch_tx.send_if_modified(|existing| { + if &new != existing { + *existing = new; + true + } else { + false + } + }); } } }); - let mut reservation = Reservation { - horizon: Lsn(0), - expiration: Instant::now(), + let mut obtained = ObtainedLease { + lsn: Lsn(0), + nearest_expiration: SystemTime::UNIX_EPOCH, }; loop { + let valid_duration = obtained + .nearest_expiration + .duration_since(SystemTime::now()) + .unwrap_or_default(); + // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration. + let sleep_duration = valid_duration + .saturating_sub(Duration::from_secs(60)) + .max(valid_duration / 2); + tokio::select! { - _ = tokio::time::sleep_until(reservation.expiration.into()) => { - info!("updating due to expiration"); + _ = tokio::time::sleep(sleep_duration) => { + info!("updating because lease is going to expire soon"); } _ = connstr_watch_rx.changed() => { info!("updating due to changed pageserver_connstr") } _ = async { - // debounce TODO make this lower in tests + // debounce; TODO make this lower in tests tokio::time::sleep(Duration::from_secs(10)); - // every 10 GiB TODO make this tighter in tests? + // every 10 GiB; TODO make this tighter in tests? let max_horizon_lag = 10 * (1<<30); - min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > reservation.horizon.0 + max_horizon_lag).await + min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await } => { - info!(%reservation.horizon, "updating due to max horizon lag"); + info!(%obtained.lsn, "updating due to max horizon lag"); } } // retry forever let compute = Arc::clone(&compute); - reservation = retry( - || attempt(&compute), + let lease_id = lease_id.clone(); + obtained = retry( + || attempt(lease_id.clone(), &compute), |_| false, 0, u32::MAX, // forever @@ -111,8 +144,12 @@ async fn bg_task(compute: Arc) { } } -// Returns expiration time -async fn attempt(compute: &Arc) -> anyhow::Result { +struct ObtainedLease { + lsn: Lsn, + nearest_expiration: SystemTime, +} + +async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result { let (shards, timeline_id) = { let state = compute.state.lock().unwrap(); let pspec = state.pspec.as_ref().expect("spec must be set"); @@ -124,6 +161,7 @@ async fn attempt(compute: &Arc) -> anyhow::Result { .min_inflight_request_lsn .borrow() .expect("we only call this function once it has been transitioned to Some"); + let mut futs = FuturesUnordered::new(); for connect_info in shards { let logging_span = info_span!( @@ -132,42 +170,54 @@ async fn attempt(compute: &Arc) -> anyhow::Result { shard_id=%connect_info.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, ); - let logging_wrapper = |fut: Pin>>>| { - async move { - match fut.await { - Ok(v) => Ok(v), - Err(err) => { - error!( - "failed to advance standby_horizon, communicator reads from this shard may star failing: {err:?}" - ); - Err(()) + let logging_wrapper = + |fut: Pin>>>>| { + async move { + // TODO: timeout? + match fut.await { + Ok(Some(v)) => { + info!("lease obtained"); + Ok(Some(v)) + } + Ok(None) => { + error!("pageserver rejected our request"); + Ok(None) + } + Err(err) => { + error!("communication failure: {err:?}"); + Err(()) + } + } } - }}.instrument(logging_span) - }; + .instrument(logging_span) + }; let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? { - PageserverProtocol::Libpq => { - logging_wrapper(Box::pin(attempt_one_libpq(connect_info, timeline_id, lsn))) - } - PageserverProtocol::Grpc => { - logging_wrapper(Box::pin(attempt_one_grpc(connect_info, timeline_id, lsn))) - } + PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq( + connect_info, + timeline_id, + lease_id.clone(), + lsn, + ))), + PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc( + connect_info, + timeline_id, + lease_id.clone(), + lsn, + ))), }; futs.push(fut); } let mut errors = 0; - let mut min = None; + let mut nearest_expiration = None; while let Some(res) = futs.next().await { match res { - Ok(reservation) => { - let Reservation { - horizon, - expiration, - } = min.get_or_insert_with(|| reservation.clone()); - *horizon = std::cmp::min(*horizon, reservation.horizon); - *expiration = std::cmp::min(*expiration, reservation.expiration); + Ok(Some(expiration)) => { + let nearest_expiration = nearest_expiration.get_or_insert(expiration); + *nearest_expiration = std::cmp::min(*nearest_expiration, expiration); } - Err(()) => { + Ok(None) | Err(()) => { // the logging wrapper does the logging + errors += 1; } } } @@ -176,8 +226,11 @@ async fn attempt(compute: &Arc) -> anyhow::Result { "failed to advance standby_horizon for {errors} shards, check logs for details" )); } - match min { - Some(min) => Ok(min), + match nearest_expiration { + Some(v) => Ok(ObtainedLease { + lsn, + nearest_expiration: nearest_expiration.expect("we either errors+=1 or set it"), + }), None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen } } @@ -185,21 +238,67 @@ async fn attempt(compute: &Arc) -> anyhow::Result { async fn attempt_one_libpq( connect_info: ConnectInfo, timeline_id: TimelineId, + lease_id: String, lsn: Lsn, -) -> anyhow::Result { +) -> anyhow::Result> { let ConnectInfo { tenant_shard_id, connstring, auth, } = connect_info; - tokio_postgres::Config::from_str(&connstring)?; - todo!() + let mut config = tokio_postgres::Config::from_str(&connstring)?; + if let Some(auth) = auth { + config.password(auth); + } + let (mut client, conn) = config.connect(postgres::NoTls).await?; + tokio::spawn(conn); + let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} "); + let res = client.simple_query(&cmd).await?; + let msg = match res.first() { + Some(msg) => msg, + None => anyhow::bail!("empty response"), + }; + let row = match msg { + SimpleQueryMessage::Row(row) => row, + _ => anyhow::bail!("expected row message type"), + }; + + // Note: this will be None if a lease is explicitly not granted. + let Some(expiration) = row.get("expiration") else { + return Ok(None); + }; + + let expiration = + SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?)); + Ok(expiration) } async fn attempt_one_grpc( connect_info: ConnectInfo, timeline_id: TimelineId, + lease_id: String, lsn: Lsn, -) -> anyhow::Result { - todo!() +) -> anyhow::Result> { + let ConnectInfo { + tenant_shard_id, + connstring, + auth, + } = connect_info; + let mut client = pageserver_page_api::Client::connect( + connstring.to_string(), + tenant_shard_id.tenant_id, + timeline_id, + tenant_shard_id.to_index(), + auth.map(String::from), + None, + ) + .await?; + + let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn }; + match client.lease_standby_horizon(req).await { + Ok(expires) => Ok(Some(expires)), + // Lease couldn't be acquired + Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None), + Err(err) => Err(err.into()), + } } diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 1d6c230916..fc20eef503 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -70,6 +70,18 @@ service PageService { // Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage // collect the LSN until the lease expires. Must be acquired on all relevant shards. rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse); + + // Upserts a standby_horizon lease. RO replicas rely on this type of lease. + // In slightly more detail: RO replicas always lag to some degree behind the + // primary, and request pages at their respective apply LSN. The standby horizon mechanism + // ensures that the Pageserver does not garbage-collect old page versions in + // the interval between `min(valid standby horizon leases)` and the most recent page version. + // + // Each RO replica call this method continuously as it applies more WAL. + // It identifies its lease through an opaque "lease_id" across these requests. + // The response contains the lease expiration time. + // Status `FailedPrecondition` is returned if the lease cannot be granted. + rpc LeaseStandbyHorizon(LeaseStandbyHorizonRequest) returns (LeaseStandbyHorizonResponse); } // The LSN a request should read at. @@ -272,3 +284,16 @@ message LeaseLsnResponse { // The lease expiration time. google.protobuf.Timestamp expires = 1; } + +// Request for LeaseStandbyHorizon rpc. +// The lease_id identifies the lease in subsequent requests. +// The lsn must be monotonic; the request will fail if it is not. +message LeaseStandbyHorizonRequest { + string lease_id = 1; + uint64 lsn = 2; +} + +// Response for the success case of LeaseStandbyHorizon rpc. +message LeaseStandbyHorizonResponse { + google.protobuf.Timestamp expiration = 1; +} diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 6523d00d3d..757e8b284c 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -143,6 +143,12 @@ impl Client { let resp = self.inner.lease_lsn(req).await?.into_inner(); Ok(resp.try_into()?) } + + pub async fn lease_standby_horizon(&mut self, req: LeaseStandbyHorizonRequest) -> tonic::Result { + let req = proto::LeaseStandbyHorizonRequest::from(req); + let resp = self.inner.lease_standby_horizon(req).await?.into_inner(); + Ok(resp.try_into()?) + } } /// Adds authentication metadata to gRPC requests. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 4497fc6fc7..477003af16 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -755,3 +755,64 @@ impl From for proto::LeaseLsnResponse { } } } + +pub struct LeaseStandbyHorizonRequest { + pub lease_id: String, + pub lsn: Lsn, +} + +impl TryFrom for LeaseStandbyHorizonRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::LeaseStandbyHorizonRequest) -> Result { + if pb.lsn == 0 { + return Err(ProtocolError::Missing("lsn")); + } + if pb.lease_id.len() == 0 { + return Err(ProtocolError::Invalid("lease_id", pb.lease_id)); + } + Ok(Self { + lease_id: pb.lease_id, + lsn: Lsn(pb.lsn), + }) + } +} + +impl From for proto::LeaseStandbyHorizonRequest { + fn from(request: LeaseStandbyHorizonRequest) -> Self { + Self { + lease_id: request.lease_id, + lsn: request.lsn.0, + } + } +} + +/// Lease expiration time. If the lease could not be granted because the LSN has already been +/// garbage collected, a FailedPrecondition status will be returned instead. +pub type LeaseStandbyHorizonResponse = SystemTime; + +impl TryFrom for LeaseStandbyHorizonResponse { + type Error = ProtocolError; + + fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result { + let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?; + UNIX_EPOCH + .checked_add(Duration::new( + expiration.seconds as u64, + expiration.nanos as u32, + )) + .ok_or_else(|| ProtocolError::invalid("expiration", expiration)) + } +} + +impl From for proto::LeaseStandbyHorizonResponse { + fn from(response: LeaseStandbyHorizonResponse) -> Self { + let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default(); + Self { + expiration: Some(prost_types::Timestamp { + seconds: expiration.as_secs() as i64, + nanos: expiration.subsec_nanos() as i32, + }), + } + } +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 4680d20697..65432f2cd1 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2318,6 +2318,7 @@ pub(crate) enum ComputeCommandKind { Basebackup, Fullbackup, LeaseLsn, + LeaseStandbyHorizon, } pub(crate) struct ComputeCommandCounters { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6b614deac8..087e85738f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -76,6 +76,7 @@ use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, + debug_assert_current_span_has_tenant_id, }; use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind}; use crate::tenant::mgr::{ @@ -2218,7 +2219,7 @@ impl PageServerHandler { valid_until_str.as_deref().unwrap_or("") ); - let bytes = valid_until_str.as_ref().map(|x| x.as_bytes()); + let bytes: Option<&[u8]> = valid_until_str.as_ref().map(|x| x.as_bytes()); pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( b"valid_until", @@ -2228,6 +2229,51 @@ impl PageServerHandler { Ok(()) } + #[instrument(skip_all, fields(shard_id, %lsn))] + async fn handle_lease_standby_horizon( + &mut self, + pgb: &mut PostgresBackend, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result<(), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { + debug_assert_current_span_has_tenant_id(); + + let timeline = self + .timeline_handles + .as_mut() + .unwrap() + .get( + tenant_shard_id.tenant_id, + timeline_id, + ShardSelector::Known(tenant_shard_id.to_index()), + ) + .await?; + set_tracing_field_shard_id(&timeline); + + let result: Option = todo!(); + + // Encode result as Option + let bytes = result.map(|t| { + t.duration_since(SystemTime::UNIX_EPOCH) + .expect("we wouldn't allow a lease at epoch, system time would be horribly off") + .as_millis() + .to_string() + .into_bytes() + }); + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( + b"expiration", + )]))? + .write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?; + + Ok(()) + } + #[instrument(skip_all, fields(shard_id))] async fn handle_get_rel_exists_request( timeline: &Timeline, @@ -2718,6 +2764,14 @@ struct LeaseLsnCmd { lsn: Lsn, } +#[derive(Debug, Clone, Eq, PartialEq)] +struct LeaseStandbyHorizonCmd { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, +} + #[derive(Debug, Clone, Eq, PartialEq)] enum PageServiceCmd { Set, @@ -2725,6 +2779,7 @@ enum PageServiceCmd { BaseBackup(BaseBackupCmd), FullBackup(FullBackupCmd), LeaseLsn(LeaseLsnCmd), + LeaseStandbyHorizon(LeaseStandbyHorizonCmd), } impl PageStreamCmd { @@ -2874,6 +2929,31 @@ impl LeaseLsnCmd { } } +impl LeaseStandbyHorizonCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() != 4 { + bail!( + "invalid number of parameters for lease lsn command: {}", + query + ); + } + let tenant_shard_id = TenantShardId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + let lease_id = parameters[2].to_string(); + let standby_horizon = Lsn::from_str(parameters[3]) + .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?; + Ok(Self { + tenant_shard_id, + timeline_id, + lease_id, + lsn: standby_horizon, + }) + } +} + impl PageServiceCmd { fn parse(query: &str) -> anyhow::Result { let query = query.trim(); @@ -2898,6 +2978,10 @@ impl PageServiceCmd { let cmd2 = cmd2.to_ascii_lowercase(); if cmd2 == "lsn" { Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?)) + } else if cmd2 == "standby_horizon" { + Ok(Self::LeaseStandbyHorizon(LeaseStandbyHorizonCmd::parse( + other, + )?)) } else { bail!("invalid lease command: {cmd}"); } @@ -3161,6 +3245,45 @@ where } }; } + PageServiceCmd::LeaseStandbyHorizon(LeaseStandbyHorizonCmd { + tenant_shard_id, + timeline_id, + lease_id, + lsn, + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_shard_id)) + .record("timeline_id", field::display(timeline_id)); + + self.check_permission(Some(tenant_shard_id.tenant_id))?; + + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::LeaseStandbyHorizon) + .inc(); + + match self + .handle_lease_standby_horizon( + pgb, + tenant_shard_id, + timeline_id, + lease_id, + lsn, + &ctx, + ) + .await + { + Ok(()) => { + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))? + } + Err(e) => { + error!("error obtaining standby_horizon lease for {lsn}: {e:?}"); + pgb.write_message_noflush(&BeMessage::ErrorResponse( + &e.to_string(), + Some(e.pg_error_code()), + ))? + } + }; + } } Ok(()) @@ -3801,6 +3924,11 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(expires.into())) } + + #[instrument(skip_all, fields(lease_id, lsn))] + async fn lease_standby_horizon(&self, req: tonic::Request) -> Result, tonic::Status> { + todo!() + } } /// gRPC middleware layer that handles observability concerns: