From 9eb60807d883db2a0573d1d3ec4be564f7265617 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 9 Jul 2025 23:39:49 +0000 Subject: [PATCH] switch lease renewal to use lsn from the state fed by compute monitor --- compute_tools/src/compute.rs | 4 +-- compute_tools/src/lsn_lease.rs | 50 ++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index b263182e8b..0479a44c73 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -43,7 +43,7 @@ use crate::configurator::launch_configurator; use crate::disk_quota::set_disk_quota; use crate::installed_extensions::get_installed_extensions; use crate::logger::startup_context_from_env; -use crate::lsn_lease::{self, launch_lsn_lease_bg_task_for_static}; +use crate::lsn_lease::{self, launch_lsn_lease_bg_task}; use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; use crate::pg_helpers::*; @@ -491,7 +491,7 @@ impl ComputeNode { this.wait_spec()? }; - launch_lsn_lease_bg_task_for_static(&this); + launch_lsn_lease_bg_task(&this); // We have a spec, start the compute let mut delay_exit = false; diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index d6ebce38ba..ed956d9134 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -4,10 +4,11 @@ use std::thread; use std::time::{Duration, SystemTime}; use anyhow::{Result, bail}; -use compute_api::spec::{ComputeMode, PageserverProtocol}; +use compute_api::spec::PageserverProtocol; use itertools::Itertools as _; use pageserver_page_api as page_api; use postgres::{NoTls, SimpleQueryMessage}; +use tokio::sync::watch; use tracing::{info, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -33,23 +34,19 @@ impl State { } } -/// Spawns a background thread to periodically renew LSN leases for static compute. -/// Do nothing if the compute is not in static mode. -pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { - let (tenant_id, timeline_id, lsn) = { +/// Spawns a background thread to periodically renew LSN leases. +pub fn launch_lsn_lease_bg_task(compute: &Arc) { + let (tenant_id, timeline_id) = { let state = compute.state.lock().unwrap(); let spec = state.pspec.as_ref().expect("Spec must be set"); - match spec.spec.mode { - ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn), - _ => return, - } + (spec.tenant_id, spec.timeline_id) }; let compute = compute.clone(); - let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id, %lsn); + let span = tracing::info_span!("lsn_lease_bg_task", %tenant_id, %timeline_id); thread::spawn(move || { let _entered = span.entered(); - if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id, lsn) { + if let Err(e) = lsn_lease_bg_task(compute, tenant_id, timeline_id) { // TODO: might need stronger error feedback than logging an warning. warn!("Exited with error: {e}"); } @@ -61,10 +58,22 @@ fn lsn_lease_bg_task( compute: Arc, tenant_id: TenantId, timeline_id: TimelineId, - lsn: Lsn, ) -> Result<()> { + let mut rx = compute.lsn_lease_state.desired_lease_lsn.subscribe(); loop { - let valid_until = acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, lsn)?; + let Some(valid_until) = + acquire_lsn_lease_with_retry(&compute, tenant_id, timeline_id, &rx)? + else { + info!(current=?*rx.borrow(), "sleeping until desired_lease_lsn changes to Some()"); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(rx.wait_for(|current| current.is_some())) + .expect("the Sender is kept alive in ComputeNode, which we hold a reference to"); + info!(current=?*rx.borrow(), "waking up from sleep"); + continue; + }; let valid_duration = valid_until .duration_since(SystemTime::now()) .unwrap_or(Duration::ZERO); @@ -83,13 +92,15 @@ fn lsn_lease_bg_task( } /// Acquires lsn lease in a retry loop. Returns the expiration time if a lease is granted. -/// Returns an error if a lease is explicitly not granted. Otherwise, we keep sending requests. +/// Returns an error if a lease is explicitly not granted. +/// Returns None if the rx is None. +/// Otherwise, we keep sending requests. fn acquire_lsn_lease_with_retry( compute: &Arc, tenant_id: TenantId, timeline_id: TimelineId, - lsn: Lsn, -) -> Result { + rx: &watch::Receiver>, +) -> Result> { let mut attempts = 0usize; let mut retry_period_ms: f64 = 500.0; const MAX_RETRY_PERIOD_MS: f64 = 60.0 * 1000.0; @@ -105,11 +116,16 @@ fn acquire_lsn_lease_with_retry( ) }; + // immediately copy the value so the borrow is short-lived, it's a best practice + let Some(lsn) = *rx.borrow() else { + return Ok(None); + }; + let result = try_acquire_lsn_lease(&connstrings, auth.as_deref(), tenant_id, timeline_id, lsn); match result { Ok(Some(res)) => { - return Ok(res); + return Ok(Some(res)); } Ok(None) => { bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");