diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index 52a669daa6..16e59c3112 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -5,15 +5,14 @@ use std::time::{Duration, SystemTime}; use anyhow::{Result, bail}; use compute_api::spec::{ComputeMode, PageserverProtocol}; -use itertools::Itertools as _; use pageserver_page_api as page_api; use postgres::{NoTls, SimpleQueryMessage}; use tracing::{info, warn}; -use utils::id::{TenantId, TimelineId}; +use utils::id::TimelineId; use utils::lsn::Lsn; -use utils::shard::{ShardCount, ShardNumber, TenantShardId}; +use utils::shard::TenantShardId; -use crate::compute::{ComputeNode, ParsedSpec}; +use crate::compute::ComputeNode; use crate::pageserver_client::{ConnectInfo, pageserver_connstrings_for_connect}; /// Spawns a background thread to periodically renew LSN leases for static compute. @@ -32,7 +31,7 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); thread::spawn(move || { let _entered = span.entered(); - if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { + if let Err(e) = lsn_lease_bg_task(compute, timeline_id, lsn) { // TODO: might need stronger error feedback than logging an warning. warn!("Exited with error: {e}"); } @@ -40,14 +39,9 @@ pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { } /// Renews lsn lease periodically so static compute are not affected by GC. -fn lsn_lease_bg_task( - compute: Arc, - tenant_id: TenantId, - timeline_id: TimelineId, - lsn: Lsn, -) -> Result<()> { +fn lsn_lease_bg_task(compute: Arc, timeline_id: TimelineId, lsn: Lsn) -> Result<()> { loop { - let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let valid_until = acquire_lsn_lease_with_retry(&compute, timeline_id, lsn)?; let valid_duration = valid_until .duration_since(SystemTime::now()) .unwrap_or(Duration::ZERO); @@ -69,7 +63,6 @@ fn lsn_lease_bg_task( /// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. fn acquire_lsn_lease_with_retry( compute: &Arc, - tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn, ) -> Result { diff --git a/compute_tools/src/pageserver_client.rs b/compute_tools/src/pageserver_client.rs index 27a49575ea..e00c8232ce 100644 --- a/compute_tools/src/pageserver_client.rs +++ b/compute_tools/src/pageserver_client.rs @@ -1,8 +1,5 @@ use itertools::Itertools; -use utils::{ - id::TenantId, - shard::{ShardCount, ShardIndex, ShardNumber, TenantShardId}, -}; +use utils::shard::{ShardCount, ShardNumber, TenantShardId}; use crate::compute::ParsedSpec; @@ -12,9 +9,7 @@ pub struct ConnectInfo { pub auth: Option, } -pub fn pageserver_connstrings_for_connect( - pspec: &ParsedSpec, -) -> Vec { +pub fn pageserver_connstrings_for_connect(pspec: &ParsedSpec) -> Vec { let connstrings = &pspec.pageserver_connstr; let auth = pspec.storage_auth_token.clone(); diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs index 1a56ff0d92..2cc463d321 100644 --- a/compute_tools/src/ro_replica.rs +++ b/compute_tools/src/ro_replica.rs @@ -2,11 +2,9 @@ use std::{ pin::Pin, str::FromStr, sync::Arc, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime}, }; -use anyhow::Context; -use chrono::Utc; use compute_api::spec::PageserverProtocol; use futures::{StreamExt, stream::FuturesUnordered}; use postgres::SimpleQueryMessage; @@ -47,12 +45,6 @@ pub fn spawn_bg_task(compute: Arc) { }); } -#[derive(Clone)] -struct Reservation { - horizon: Lsn, - expiration: SystemTime, -} - #[instrument(name = "standby_horizon_lease", skip_all, fields(lease_id))] async fn bg_task(compute: Arc) { // Use a lease_id that is globally unique to this process to maximize attribution precision & log correlation. @@ -67,7 +59,8 @@ async fn bg_task(compute: Arc) { min_inflight_request_lsn_changed.mark_changed(); // it could have been set already min_inflight_request_lsn_changed .wait_for(|value| value.is_some()) - .await; + .await + .expect("we never drop the sender"); // React to connstring changes. Sadly there is no async API for this yet. let (connstr_watch_tx, mut connstr_watch_rx) = tokio::sync::watch::channel(None); @@ -118,7 +111,7 @@ async fn bg_task(compute: Arc) { } _ = async { // debounce; TODO make this lower in tests - tokio::time::sleep(Duration::from_secs(10)); + tokio::time::sleep(Duration::from_secs(10)).await; // every 10 GiB; TODO make this tighter in tests? let max_horizon_lag = 10 * (1<<30); min_inflight_request_lsn_changed.wait_for(|x| x.unwrap().0 > obtained.lsn.0 + max_horizon_lag).await @@ -227,9 +220,9 @@ async fn attempt(lease_id: String, compute: &Arc) -> anyhow::Result )); } match nearest_expiration { - Some(v) => Ok(ObtainedLease { + Some(nearest_expiration) => Ok(ObtainedLease { lsn, - nearest_expiration: nearest_expiration.expect("we either errors+=1 or set it"), + nearest_expiration, }), None => Err(anyhow::anyhow!("pageservers connstrings is empty")), // this probably can't happen } @@ -250,7 +243,7 @@ async fn attempt_one_libpq( if let Some(auth) = auth { config.password(auth); } - let (mut client, conn) = config.connect(postgres::NoTls).await?; + let (client, conn) = config.connect(postgres::NoTls).await?; tokio::spawn(conn); let cmd = format!("lease standby_horizon {tenant_shard_id} {timeline_id} {lease_id} {lsn} "); let res = client.simple_query(&cmd).await?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 087e85738f..05e55e6092 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2256,7 +2256,12 @@ impl PageServerHandler { .await?; set_tracing_field_shard_id(&timeline); - let result: Option = todo!(); + let result: Option = timeline + .lease_standby_horizon(lease_id, lsn, ctx) + .inspect_err(|e| { + warn!("{e}"); + }) + .ok(); // Encode result as Option let bytes = result.map(|t| { @@ -3926,8 +3931,25 @@ impl proto::PageService for GrpcPageServiceHandler { } #[instrument(skip_all, fields(lease_id, lsn))] - async fn lease_standby_horizon(&self, req: tonic::Request) -> Result, tonic::Status> { - todo!() + async fn lease_standby_horizon( + &self, + req: tonic::Request, + ) -> Result, tonic::Status> { + let timeline = self.get_request_timeline(&req).await?; + let ctx = self.ctx.with_scope_timeline(&timeline); + + // Validate and convert the request, and decorate the span. + let page_api::LeaseStandbyHorizonRequest { lease_id, lsn } = req.into_inner().try_into()?; + + span_record!(lease_id=%lease_id, lsn=%lsn); + + // Attempt to acquire a lease. Return FailedPrecondition if the lease could not be granted. + let expiration = match timeline.lease_standby_horizon(lease_id, lsn, &ctx) { + Ok(expiration) => expiration, + Err(err) => return Err(tonic::Status::failed_precondition(format!("{err:#}"))), + }; + + Ok(tonic::Response::new(expiration.into())) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4320f3b142..2af7b766fe 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1858,6 +1858,15 @@ impl Timeline { Ok(lease) } + pub(crate) fn lease_standby_horizon( + &self, + lease_id: String, + lsn: Lsn, + ctx: &RequestContext, + ) -> anyhow::Result { + todo!() + } + /// Freeze the current open in-memory layer. It will be written to disk on next iteration. /// Returns the flush request ID which can be awaited with wait_flush_completion(). #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]