diff --git a/Cargo.lock b/Cargo.lock index 893932fb9d..40f8b04463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "p256 0.13.2", "pageserver_page_api", "postgres", + "postgres-types", "postgres_initdb", "postgres_versioninfo", "regex", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 1a03022d89..c0e67851a4 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -67,6 +67,7 @@ uuid.workspace = true walkdir.workspace = true x509-cert.workspace = true +postgres-types.workspace = true postgres_versioninfo.workspace = true postgres_initdb.workspace = true compute_api.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ec6e6c1634..b263182e8b 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -43,7 +43,7 @@ use crate::configurator::launch_configurator; use crate::disk_quota::set_disk_quota; use crate::installed_extensions::get_installed_extensions; use crate::logger::startup_context_from_env; -use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; +use crate::lsn_lease::{self, launch_lsn_lease_bg_task_for_static}; use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; use crate::pg_helpers::*; @@ -131,6 +131,8 @@ pub struct ComputeNode { pub ext_download_progress: RwLock, bool)>>, pub compute_ctl_config: ComputeCtlConfig, + pub(crate) lsn_lease_state: Arc, + /// Handle to the extension stats collection task extension_stats_task: TaskHandle, lfc_offload_task: TaskHandle, @@ -438,6 +440,7 @@ impl ComputeNode { compute_ctl_config: config.compute_ctl_config, extension_stats_task: Mutex::new(None), lfc_offload_task: Mutex::new(None), + lsn_lease_state: Arc::new(lsn_lease::State::default()), }) } diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index bb0828429d..d6ebce38ba 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -15,6 +15,24 @@ use utils::shard::{ShardCount, ShardNumber, TenantShardId}; use crate::compute::ComputeNode; +#[derive(Default)] +pub(crate) struct State { + desired_lease_lsn: tokio::sync::watch::Sender>, +} + +impl State { + pub fn update_desired_lease_lsn(&self, update: Lsn) { + self.desired_lease_lsn.send_if_modified(|value| { + let modified = *value != Some(update); + if let Some(value) = *value && value > update { + warn!(current=%value, new=%update, "desired lease lsn moving backwards, this shouldn't happen"); + } + *value = Some(update); + modified + }); + } +} + /// Spawns a background thread to periodically renew LSN leases for static compute. /// Do nothing if the compute is not in static mode. pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 8a2f6addad..cc3b3adf7f 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -4,9 +4,10 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeFeature; +use compute_api::spec::{ComputeFeature, ComputeMode}; use postgres::{Client, NoTls}; use tracing::{Level, error, info, instrument, span}; +use utils::lsn::Lsn; use crate::compute::ComputeNode; use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; @@ -344,6 +345,41 @@ impl ComputeMonitor { } } + let mode: ComputeMode = self + .compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .expect("we launch ComputeMonitor only after we received a spec") + .spec + .mode; + match mode { + // TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary) + // then we should make sure that lsn_lease_state transitions back to None so we stop renewing it. + ComputeMode::Primary => (), + ComputeMode::Static(lsn) => { + self.compute.lsn_lease_state.update_desired_lease_lsn(lsn); + } + ComputeMode::Replica => { + match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) { + Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") { + Ok(apply_lsn) => { + let apply_lsn = Lsn(apply_lsn.into()); + self.compute.lsn_lease_state.update_desired_lease_lsn(apply_lsn); + } + Err(e) => { + anyhow::bail!("parse apply_lsn: {e}"); + } + }, + Err(e) => { + anyhow::bail!("query apply_lsn: {e}"); + } + } + } + } + Ok(()) } }