diff --git a/Cargo.lock b/Cargo.lock index 893932fb9d..40f8b04463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "p256 0.13.2", "pageserver_page_api", "postgres", + "postgres-types", "postgres_initdb", "postgres_versioninfo", "regex", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 1a03022d89..c0e67851a4 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -67,6 +67,7 @@ uuid.workspace = true walkdir.workspace = true x509-cert.workspace = true +postgres-types.workspace = true postgres_versioninfo.workspace = true postgres_initdb.workspace = true compute_api.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ec6e6c1634..e287800164 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -46,7 +46,6 @@ use crate::logger::startup_context_from_env; use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; -use crate::pg_helpers::*; use crate::pgbouncer::*; use crate::rsyslog::{ PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export, @@ -57,6 +56,7 @@ use crate::swap::resize_swap; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::tls::watch_cert_for_changes; use crate::{config, extension_server, local_proxy}; +use crate::{pg_helpers::*, ro_replica}; pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); pub static PG_PID: AtomicU32 = AtomicU32::new(0); @@ -131,6 +131,8 @@ pub struct ComputeNode { pub ext_download_progress: RwLock, bool)>>, pub compute_ctl_config: ComputeCtlConfig, + pub(crate) ro_replica: Arc, + /// Handle to the extension stats collection task extension_stats_task: TaskHandle, lfc_offload_task: TaskHandle, @@ -438,6 +440,7 @@ impl ComputeNode { compute_ctl_config: config.compute_ctl_config, extension_stats_task: Mutex::new(None), lfc_offload_task: Mutex::new(None), + ro_replica: Arc::new(ro_replica::GlobalState::default()), }) } @@ -490,6 +493,8 @@ impl ComputeNode { launch_lsn_lease_bg_task_for_static(&this); + ro_replica::spawn_bg_task(Arc::clone(&this)); + // We have a spec, start the compute let mut delay_exit = false; let mut vm_monitor = None; diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 3899a1ca76..b231f6db8c 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -23,9 +23,11 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub(crate) mod ro_replica; pub mod rsyslog; pub mod spec; mod spec_apply; pub mod swap; pub mod sync_sk; pub mod tls; +pub mod pageserver_client; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index bb0828429d..16e59c3112 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -5,15 +5,15 @@ use std::time::{Duration, SystemTime}; use anyhow::{Result, bail}; use compute_api::spec::{ComputeMode, PageserverProtocol}; -use itertools::Itertools as _; use pageserver_page_api as page_api; use postgres::{NoTls, SimpleQueryMessage}; use tracing::{info, warn}; -use utils::id::{TenantId, TimelineId}; +use utils::id::TimelineId; use utils::lsn::Lsn; -use utils::shard::{ShardCount, ShardNumber, TenantShardId}; +use utils::shard::TenantShardId; use crate::compute::ComputeNode; +use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect}; /// Spawns a background thread to periodically renew LSN leases for static compute. /// Do nothing if the compute is not in static mode. @@ -31,7 +31,7 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); thread::spawn(move || { let _entered = span.entered(); - if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { + if let Err(e) = lsn_lease_bg_task(compute, timeline_id, lsn) { // TODO: might need stronger error feedback than logging an warning. warn!("Exited with error: {e}"); } @@ -39,14 +39,9 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { } /// Renews lsn lease periodically so static compute are not affected by GC. -fn lsn_lease_bg_task( - compute: Arc, - tenant_id: TenantId, - timeline_id: TimelineId, - lsn: Lsn, -) -> Result<()> { +fn lsn_lease_bg_task(compute: Arc, timeline_id: TimelineId, lsn: Lsn) -> Result<()> { loop { - let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let valid_until = acquire_lsn_lease_with_retry(&compute, timeline_id, lsn)?; let valid_duration = valid_until .duration_since(SystemTime::now()) .unwrap_or(Duration::ZERO); @@ -68,7 +63,6 @@ fn lsn_lease_bg_task( /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. fn acquire_lsn_lease_with_retry( compute: &Arc, - tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Result { @@ -78,17 +72,13 @@ fn acquire_lsn_lease_with_retry( loop { // Note: List of pageservers is dynamic, need to re-read configs before each attempt. - let (connstrings, auth) = { + let shards = { let state = compute.state.lock().unwrap(); - let spec = state.pspec.as_ref().expect("spec must be set"); - ( - spec.pageserver_connstr.clone(), - spec.storage_auth_token.clone(), - ) + let pspec = state.pspec.as_ref().expect("spec must be set"); + pageserver_connstrings_for_connect(pspec) }; - let result = - try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn); + let result = try_acquire_lsn_lease(shards, timeline_id, lsn); match result { Ok(Some(res)) => { return Ok(res); @@ -112,33 +102,32 @@ fn acquire_lsn_lease_with_retry( /// Tries to acquire LSN leases on all Pageserver shards. fn try_acquire_lsn_lease( - connstrings: &str, - auth: Option<&str>, - tenant_id: TenantId, + shards: Vec, timeline_id: TimelineId, lsn: Lsn, ) -> Result> { - let connstrings = connstrings.split(',').collect_vec(); - let shard_count = connstrings.len(); let mut leases = Vec::new(); - - for (shard_number, &connstring) in connstrings.iter().enumerate() { - let tenant_shard_id = match shard_count { - 0 | 1 => TenantShardId::unsharded(tenant_id), - shard_count => TenantShardId { - tenant_id, - shard_number: ShardNumber(shard_number as u8), - shard_count: ShardCount::new(shard_count as u8), - }, - }; - - let lease = match PageserverProtocol::from_connstring(connstring)? { - PageserverProtocol::Libpq => { - acquire_lsn_lease_libpq(connstring, auth, tenant_shard_id, timeline_id, lsn)? - } - PageserverProtocol::Grpc => { - acquire_lsn_lease_grpc(connstring, auth, tenant_shard_id, timeline_id, lsn)? - } + for ConnectInfo { + tenant_shard_id, + connstring, + auth, + } in shards + { + let lease = match PageserverProtocol::from_connstring(&connstring)? { + PageserverProtocol::Libpq => acquire_lsn_lease_libpq( + &connstring, + auth.as_deref(), + tenant_shard_id, + timeline_id, + lsn, + )?, + PageserverProtocol::Grpc => acquire_lsn_lease_grpc( + &connstring, + auth.as_deref(), + tenant_shard_id, + timeline_id, + lsn, + )?, }; leases.push(lease); } diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 8a2f6addad..390158078a 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -4,9 +4,10 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeFeature; +use compute_api::spec::{ComputeFeature, ComputeMode}; use postgres::{Client, NoTls}; use tracing::{Level, error, info, instrument, span}; +use utils::lsn::Lsn; use crate::compute::ComputeNode; use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; @@ -344,6 +345,42 @@ impl ComputeMonitor { } } + let mode: ComputeMode = self + .compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .expect("we launch ComputeMonitor only after we received a spec") + .spec + .mode; + match mode { + // TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary) + // then we should make sure that lsn_lease_state transitions back to None so we stop renewing it. + ComputeMode::Primary => (), + ComputeMode::Static(_) => (), + ComputeMode::Replica => { + // TODO: instead of apply_lsn, use min inflight request LSN + match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) { + Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") { + Ok(apply_lsn) => { + let apply_lsn = Lsn(apply_lsn.into()); + self.compute + .ro_replica + .update_min_inflight_request_lsn(apply_lsn); + } + Err(e) => { + anyhow::bail!("parse apply_lsn: {e}"); + } + }, + Err(e) => { + anyhow::bail!("query apply_lsn: {e}"); + } + } + } + } + Ok(()) } } diff --git a/compute_tools/src/pageserver_client.rs b/compute_tools/src/pageserver_client.rs new file mode 100644 index 0000000000..e00c8232ce --- /dev/null +++ b/compute_tools/src/pageserver_client.rs @@ -0,0 +1,38 @@ +use itertools::Itertools; +use utils::shard::{ShardCount, ShardNumber, TenantShardId}; + +use crate::compute::ParsedSpec; + +pub struct ConnectInfo { + pub tenant_shard_id: TenantShardId, + pub connstring: String, + pub auth: Option, +} + +pub fn pageserver_connstrings_for_connect(pspec: &ParsedSpec) -> Vec { + let connstrings = &pspec.pageserver_connstr; + let auth = pspec.storage_auth_token.clone(); + + let connstrings = connstrings.split(',').collect_vec(); + let shard_count = connstrings.len(); + + let mut infos = Vec::with_capacity(connstrings.len()); + for (shard_number, connstring) in connstrings.iter().enumerate() { + let tenant_shard_id = match shard_count { + 0 | 1 => TenantShardId::unsharded(pspec.tenant_id), + shard_count => TenantShardId { + tenant_id: pspec.tenant_id, + shard_number: ShardNumber(shard_number as u8), + shard_count: ShardCount::new(shard_count as u8), + }, + }; + + infos.push(ConnectInfo { + tenant_shard_id, + connstring: connstring.to_string(), + auth: auth.clone(), + }); + } + + infos +} diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs new file mode 100644 index 0000000000..2cc463d321 --- /dev/null +++ b/compute_tools/src/ro_replica.rs @@ -0,0 +1,297 @@ +use std::{ + pin::Pin, + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use compute_api::spec::PageserverProtocol; +use futures::{StreamExt, stream::FuturesUnordered}; +use postgres::SimpleQueryMessage; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, error, info, info_span, instrument, warn}; +use utils::{backoff::retry, id::TimelineId, lsn::Lsn}; + +use crate::{ + compute::ComputeNode, + pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect}, +}; + +#[derive(Default)] +pub(crate) struct GlobalState { + min_inflight_request_lsn: tokio::sync::watch::Sender>, +} + +impl GlobalState { + pub fn update_min_inflight_request_lsn(&self, update: Lsn) { + self.min_inflight_request_lsn.send_if_modified(|value| { + let modified = *value != Some(update); + if let Some(value) = *value && value > update { + warn!(current=%value, new=%update, "min inflight request lsn moving backwards, this should not happen, bug in communicator"); + } + *value = Some(update); + modified + }); + } +} + +pub fn spawn_bg_task(compute: Arc) { + std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(bg_task(compute)) + }); +} + +#[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))] +async fn bg_task(compute: Arc) { + // Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation. + let lease_id = format!("v1-{}-{}", compute.params.compute_id, std::process::id()); + tracing::Span::current().record("lease_id", tracing::field::display(&lease_id)); + + // Wait until we have the first value. + // Allows us to simply .unwrap() later because it never transitions back to None. + info!("waiting for first lease lsn to be fetched from postgres"); + let mut min_inflight_request_lsn_changed = + compute.ro_replica.min_inflight_request_lsn.subscribe(); + min_inflight_request_lsn_changed.mark_changed(); // it could have been set already + min_inflight_request_lsn_changed + .wait_for(|value| value.is_some()) + .await + .expect("we never drop the sender"); + + // React to connstring changes. Sadly there is no async API for this yet. + let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None); + std::thread::spawn({ + let compute = Arc::clone(&compute); + move || { + loop { + compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::MAX); + let new = compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .and_then(|pspec| pspec.spec.pageserver_connstring.clone()); + connstr_watch_tx.send_if_modified(|existing| { + if &new != existing { + *existing = new; + true + } else { + false + } + }); + } + } + }); + + let mut obtained = ObtainedLease { + lsn: Lsn(0), + nearest_expiration: SystemTime::UNIX_EPOCH, + }; + loop { + let valid_duration = obtained + .nearest_expiration + .duration_since(SystemTime::now()) + .unwrap_or_default(); + // Sleep for 60 seconds less than the valid duration but no more than half of the valid duration. + let sleep_duration = valid_duration + .saturating_sub(Duration::from_secs(60)) + .max(valid_duration / 2); + + tokio::select! { + _ = tokio::time::sleep(sleep_duration) => { + info!("updating because lease is going to expire soon"); + } + _ = connstr_watch_rx.changed() => { + info!("updating due to changed pageserver_connstr") + } + _ = async { + // debounce; TODO make this lower in tests + tokio::time::sleep(Duration::from_secs(10)).await; + // every 10 GiB; TODO make this tighter in tests? + let max_horizon_lag = 10 * (1<<30); + min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await + } => { + info!(%obtained.lsn, "updating due to max horizon lag"); + } + } + // retry forever + let compute = Arc::clone(&compute); + let lease_id = lease_id.clone(); + obtained = retry( + || attempt(lease_id.clone(), &compute), + |_| false, + 0, + u32::MAX, // forever + "update standby_horizon position in pageserver", + // There is no cancellation story in compute_ctl + &CancellationToken::new(), + ) + .await + .expect("is_permanent returns false, so, retry always returns Some") + .expect("u32::MAX exceeded"); + } +} + +struct ObtainedLease { + lsn: Lsn, + nearest_expiration: SystemTime, +} + +async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result { + let (shards, timeline_id) = { + let state = compute.state.lock().unwrap(); + let pspec = state.pspec.as_ref().expect("spec must be set"); + (pageserver_connstrings_for_connect(pspec), pspec.timeline_id) + }; + + let lsn = compute + .ro_replica + .min_inflight_request_lsn + .borrow() + .expect("we only call this function once it has been transitioned to Some"); + + let mut futs = FuturesUnordered::new(); + for connect_info in shards { + let logging_span = info_span!( + "attempt_one", + tenant_id=%connect_info.tenant_shard_id.tenant_id, + shard_id=%connect_info.tenant_shard_id.shard_slug(), + timeline_id=%timeline_id, + ); + let logging_wrapper = + |fut: Pin>>>>| { + async move { + // TODO: timeout? + match fut.await { + Ok(Some(v)) => { + info!("lease obtained"); + Ok(Some(v)) + } + Ok(None) => { + error!("pageserver rejected our request"); + Ok(None) + } + Err(err) => { + error!("communication failure: {err:?}"); + Err(()) + } + } + } + .instrument(logging_span) + }; + let fut = match PageserverProtocol::from_connstring(&connect_info.connstring)? { + PageserverProtocol::Libpq => logging_wrapper(Box::pin(attempt_one_libpq( + connect_info, + timeline_id, + lease_id.clone(), + lsn, + ))), + PageserverProtocol::Grpc => logging_wrapper(Box::pin(attempt_one_grpc( + connect_info, + timeline_id, + lease_id.clone(), + lsn, + ))), + }; + futs.push(fut); + } + let mut errors = 0; + let mut nearest_expiration = None; + while let Some(res) = futs.next().await { + match res { + Ok(Some(expiration)) => { + let nearest_expiration = nearest_expiration.get_or_insert(expiration); + *nearest_expiration = std::cmp::min(*nearest_expiration, expiration); + } + Ok(None) | Err(()) => { + // the logging wrapper does the logging + errors += 1; + } + } + } + if errors > 0 { + return Err(anyhow::anyhow!( + "failed to advance standby_horizon for {errors} shards, check logs for details" + )); + } + match nearest_expiration { + Some(nearest_expiration) => Ok(ObtainedLease { + lsn, + nearest_expiration, + }), + None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen + } +} + +async fn attempt_one_libpq( + connect_info: ConnectInfo, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, +) -> anyhow::Result> { + let ConnectInfo { + tenant_shard_id, + connstring, + auth, + } = connect_info; + let mut config = tokio_postgres::Config::from_str(&connstring)?; + if let Some(auth) = auth { + config.password(auth); + } + let (client, conn) = config.connect(postgres::NoTls).await?; + tokio::spawn(conn); + let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} "); + let res = client.simple_query(&cmd).await?; + let msg = match res.first() { + Some(msg) => msg, + None => anyhow::bail!("empty response"), + }; + let row = match msg { + SimpleQueryMessage::Row(row) => row, + _ => anyhow::bail!("expected row message type"), + }; + + // Note: this will be None if a lease is explicitly not granted. + let Some(expiration) = row.get("expiration") else { + return Ok(None); + }; + + let expiration = + SystemTime::UNIX_EPOCH.checked_add(Duration::from_millis(u64::from_str(expiration)?)); + Ok(expiration) +} + +async fn attempt_one_grpc( + connect_info: ConnectInfo, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, +) -> anyhow::Result> { + let ConnectInfo { + tenant_shard_id, + connstring, + auth, + } = connect_info; + let mut client = pageserver_page_api::Client::connect( + connstring.to_string(), + tenant_shard_id.tenant_id, + timeline_id, + tenant_shard_id.to_index(), + auth.map(String::from), + None, + ) + .await?; + + let req = pageserver_page_api::LeaseStandbyHorizonRequest { lease_id, lsn }; + match client.lease_standby_horizon(req).await { + Ok(expires) => Ok(Some(expires)), + // Lease couldn't be acquired + Err(err) if err.code() == tonic::Code::FailedPrecondition => Ok(None), + Err(err) => Err(err.into()), + } +} diff --git a/pageserver/page_api/proto/page_service.proto b/pageserver/page_api/proto/page_service.proto index 1d6c230916..fc20eef503 100644 --- a/pageserver/page_api/proto/page_service.proto +++ b/pageserver/page_api/proto/page_service.proto @@ -70,6 +70,18 @@ service PageService { // Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't garbage // collect the LSN until the lease expires. Must be acquired on all relevant shards. rpc LeaseLsn (LeaseLsnRequest) returns (LeaseLsnResponse); + + // Upserts a standby_horizon lease. RO replicas rely on this type of lease. + // In slightly more detail: RO replicas always lag to some degree behind the + // primary, and request pages at their respective apply LSN. The standby horizon mechanism + // ensures that the Pageserver does not garbage-collect old page versions in + // the interval between `min(valid standby horizon leases)` and the most recent page version. + // + // Each RO replica call this method continuously as it applies more WAL. + // It identifies its lease through an opaque "lease_id" across these requests. + // The response contains the lease expiration time. + // Status `FailedPrecondition` is returned if the lease cannot be granted. + rpc LeaseStandbyHorizon(LeaseStandbyHorizonRequest) returns (LeaseStandbyHorizonResponse); } // The LSN a request should read at. @@ -272,3 +284,16 @@ message LeaseLsnResponse { // The lease expiration time. google.protobuf.Timestamp expires = 1; } + +// Request for LeaseStandbyHorizon rpc. +// The lease_id identifies the lease in subsequent requests. +// The lsn must be monotonic; the request will fail if it is not. +message LeaseStandbyHorizonRequest { + string lease_id = 1; + uint64 lsn = 2; +} + +// Response for the success case of LeaseStandbyHorizon rpc. +message LeaseStandbyHorizonResponse { + google.protobuf.Timestamp expiration = 1; +} diff --git a/pageserver/page_api/src/client.rs b/pageserver/page_api/src/client.rs index 6523d00d3d..757e8b284c 100644 --- a/pageserver/page_api/src/client.rs +++ b/pageserver/page_api/src/client.rs @@ -143,6 +143,12 @@ impl Client { let resp = self.inner.lease_lsn(req).await?.into_inner(); Ok(resp.try_into()?) } + + pub async fn lease_standby_horizon(&mut self, req: LeaseStandbyHorizonRequest) -> tonic::Result { + let req = proto::LeaseStandbyHorizonRequest::from(req); + let resp = self.inner.lease_standby_horizon(req).await?.into_inner(); + Ok(resp.try_into()?) + } } /// Adds authentication metadata to gRPC requests. diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index 4497fc6fc7..477003af16 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -755,3 +755,64 @@ impl From for proto::LeaseLsnResponse { } } } + +pub struct LeaseStandbyHorizonRequest { + pub lease_id: String, + pub lsn: Lsn, +} + +impl TryFrom for LeaseStandbyHorizonRequest { + type Error = ProtocolError; + + fn try_from(pb: proto::LeaseStandbyHorizonRequest) -> Result { + if pb.lsn == 0 { + return Err(ProtocolError::Missing("lsn")); + } + if pb.lease_id.len() == 0 { + return Err(ProtocolError::Invalid("lease_id", pb.lease_id)); + } + Ok(Self { + lease_id: pb.lease_id, + lsn: Lsn(pb.lsn), + }) + } +} + +impl From for proto::LeaseStandbyHorizonRequest { + fn from(request: LeaseStandbyHorizonRequest) -> Self { + Self { + lease_id: request.lease_id, + lsn: request.lsn.0, + } + } +} + +/// Lease expiration time. If the lease could not be granted because the LSN has already been +/// garbage collected, a FailedPrecondition status will be returned instead. +pub type LeaseStandbyHorizonResponse = SystemTime; + +impl TryFrom for LeaseStandbyHorizonResponse { + type Error = ProtocolError; + + fn try_from(pb: proto::LeaseStandbyHorizonResponse) -> Result { + let expiration = pb.expiration.ok_or(ProtocolError::Missing("expiration"))?; + UNIX_EPOCH + .checked_add(Duration::new( + expiration.seconds as u64, + expiration.nanos as u32, + )) + .ok_or_else(|| ProtocolError::invalid("expiration", expiration)) + } +} + +impl From for proto::LeaseStandbyHorizonResponse { + fn from(response: LeaseStandbyHorizonResponse) -> Self { + let expiration = response.duration_since(UNIX_EPOCH).unwrap_or_default(); + Self { + expiration: Some(prost_types::Timestamp { + seconds: expiration.as_secs() as i64, + nanos: expiration.subsec_nanos() as i32, + }), + } + } +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index eb89e166b2..93ce315edb 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2327,6 +2327,7 @@ pub(crate) enum ComputeCommandKind { Basebackup, Fullbackup, LeaseLsn, + LeaseStandbyHorizon, } pub(crate) struct ComputeCommandCounters { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6b614deac8..05e55e6092 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -76,6 +76,7 @@ use crate::pgdatadir_mapping::{LsnRange, Version}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, + debug_assert_current_span_has_tenant_id, }; use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind}; use crate::tenant::mgr::{ @@ -2218,7 +2219,7 @@ impl PageServerHandler { valid_until_str.as_deref().unwrap_or("") ); - let bytes = valid_until_str.as_ref().map(|x| x.as_bytes()); + let bytes: Option<&[u8]> = valid_until_str.as_ref().map(|x| x.as_bytes()); pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( b"valid_until", @@ -2228,6 +2229,56 @@ impl PageServerHandler { Ok(()) } + #[instrument(skip_all, fields(shard_id, %lsn))] + async fn handle_lease_standby_horizon( + &mut self, + pgb: &mut PostgresBackend, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result<(), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { + debug_assert_current_span_has_tenant_id(); + + let timeline = self + .timeline_handles + .as_mut() + .unwrap() + .get( + tenant_shard_id.tenant_id, + timeline_id, + ShardSelector::Known(tenant_shard_id.to_index()), + ) + .await?; + set_tracing_field_shard_id(&timeline); + + let result: Option = timeline + .lease_standby_horizon(lease_id, lsn, ctx) + .inspect_err(|e| { + warn!("{e}"); + }) + .ok(); + + // Encode result as Option + let bytes = result.map(|t| { + t.duration_since(SystemTime::UNIX_EPOCH) + .expect("we wouldn't allow a lease at epoch, system time would be horribly off") + .as_millis() + .to_string() + .into_bytes() + }); + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( + b"expiration", + )]))? + .write_message_noflush(&BeMessage::DataRow(&[bytes.as_deref()]))?; + + Ok(()) + } + #[instrument(skip_all, fields(shard_id))] async fn handle_get_rel_exists_request( timeline: &Timeline, @@ -2718,6 +2769,14 @@ struct LeaseLsnCmd { lsn: Lsn, } +#[derive(Debug, Clone, Eq, PartialEq)] +struct LeaseStandbyHorizonCmd { + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + lease_id: String, + lsn: Lsn, +} + #[derive(Debug, Clone, Eq, PartialEq)] enum PageServiceCmd { Set, @@ -2725,6 +2784,7 @@ enum PageServiceCmd { BaseBackup(BaseBackupCmd), FullBackup(FullBackupCmd), LeaseLsn(LeaseLsnCmd), + LeaseStandbyHorizon(LeaseStandbyHorizonCmd), } impl PageStreamCmd { @@ -2874,6 +2934,31 @@ impl LeaseLsnCmd { } } +impl LeaseStandbyHorizonCmd { + fn parse(query: &str) -> anyhow::Result { + let parameters = query.split_whitespace().collect_vec(); + if parameters.len() != 4 { + bail!( + "invalid number of parameters for lease lsn command: {}", + query + ); + } + let tenant_shard_id = TenantShardId::from_str(parameters[0]) + .with_context(|| format!("Failed to parse tenant id from {}", parameters[0]))?; + let timeline_id = TimelineId::from_str(parameters[1]) + .with_context(|| format!("Failed to parse timeline id from {}", parameters[1]))?; + let lease_id = parameters[2].to_string(); + let standby_horizon = Lsn::from_str(parameters[3]) + .with_context(|| format!("Failed to parse lsn from {}", parameters[2]))?; + Ok(Self { + tenant_shard_id, + timeline_id, + lease_id, + lsn: standby_horizon, + }) + } +} + impl PageServiceCmd { fn parse(query: &str) -> anyhow::Result { let query = query.trim(); @@ -2898,6 +2983,10 @@ impl PageServiceCmd { let cmd2 = cmd2.to_ascii_lowercase(); if cmd2 == "lsn" { Ok(Self::LeaseLsn(LeaseLsnCmd::parse(other)?)) + } else if cmd2 == "standby_horizon" { + Ok(Self::LeaseStandbyHorizon(LeaseStandbyHorizonCmd::parse( + other, + )?)) } else { bail!("invalid lease command: {cmd}"); } @@ -3161,6 +3250,45 @@ where } }; } + PageServiceCmd::LeaseStandbyHorizon(LeaseStandbyHorizonCmd { + tenant_shard_id, + timeline_id, + lease_id, + lsn, + }) => { + tracing::Span::current() + .record("tenant_id", field::display(tenant_shard_id)) + .record("timeline_id", field::display(timeline_id)); + + self.check_permission(Some(tenant_shard_id.tenant_id))?; + + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::LeaseStandbyHorizon) + .inc(); + + match self + .handle_lease_standby_horizon( + pgb, + tenant_shard_id, + timeline_id, + lease_id, + lsn, + &ctx, + ) + .await + { + Ok(()) => { + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))? + } + Err(e) => { + error!("error obtaining standby_horizon lease for {lsn}: {e:?}"); + pgb.write_message_noflush(&BeMessage::ErrorResponse( + &e.to_string(), + Some(e.pg_error_code()), + ))? + } + }; + } } Ok(()) @@ -3801,6 +3929,28 @@ impl proto::PageService for GrpcPageServiceHandler { Ok(tonic::Response::new(expires.into())) } + + #[instrument(skip_all, fields(lease_id, lsn))] + async fn lease_standby_horizon( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_timeline(&timeline); + + // Validate and convert the request, and decorate the span. + let page_api::LeaseStandbyHorizonRequest { lease_id, lsn } = req.into_inner().try_into()?; + + span_record!(lease_id=%lease_id, lsn=%lsn); + + // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted. + let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) { + Ok(expiration) => expiration, + Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))), + }; + + Ok(tonic::Response::new(expiration.into())) + } } /// gRPC middleware layer that handles observability concerns: diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6088f40669..47eb20cf61 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1860,6 +1860,15 @@ impl Timeline { Ok(lease) } + pub(crate) fn lease_standby_horizon( + &self, + lease_id: String, + lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result { + todo!() + } + /// Freeze the current open in-memory layer. It will be written to disk on next iteration. /// Returns the flush request ID which can be awaited with wait_flush_completion(). #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]