diff --git a/Cargo.lock b/Cargo.lock index 893932fb9d..40f8b04463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "p256 0.13.2", "pageserver_page_api", "postgres", + "postgres-types", "postgres_initdb", "postgres_versioninfo", "regex", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 1a03022d89..c0e67851a4 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -67,6 +67,7 @@ uuid.workspace = true walkdir.workspace = true x509-cert.workspace = true +postgres-types.workspace = true postgres_versioninfo.workspace = true postgres_initdb.workspace = true compute_api.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index ec6e6c1634..8ccb625ff9 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -46,7 +46,6 @@ use crate::logger::startup_context_from_env; use crate::lsn_lease::launch_lsn_lease_bg_task_for_static; use crate::metrics::COMPUTE_CTL_UP; use crate::monitor::launch_monitor; -use crate::pg_helpers::*; use crate::pgbouncer::*; use crate::rsyslog::{ PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export, @@ -57,6 +56,7 @@ use crate::swap::resize_swap; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::tls::watch_cert_for_changes; use crate::{config, extension_server, local_proxy}; +use crate::{pg_helpers::*, ro_replica}; pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0); pub static PG_PID: AtomicU32 = AtomicU32::new(0); @@ -131,6 +131,8 @@ pub struct ComputeNode { pub ext_download_progress: RwLock, bool)>>, pub compute_ctl_config: ComputeCtlConfig, + pub(crate) ro_replica: Arc, + /// Handle to the extension stats collection task extension_stats_task: TaskHandle, lfc_offload_task: TaskHandle, @@ -438,6 +440,7 @@ impl ComputeNode { compute_ctl_config: config.compute_ctl_config, extension_stats_task: Mutex::new(None), lfc_offload_task: Mutex::new(None), + ro_replica: Arc::new(ro_replica::GlobalState::default()), }) } diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 3899a1ca76..1874bdea9a 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -23,6 +23,7 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub(crate) mod ro_replica; pub mod rsyslog; pub mod spec; mod spec_apply; diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 8a2f6addad..390158078a 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -4,9 +4,10 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeFeature; +use compute_api::spec::{ComputeFeature, ComputeMode}; use postgres::{Client, NoTls}; use tracing::{Level, error, info, instrument, span}; +use utils::lsn::Lsn; use crate::compute::ComputeNode; use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; @@ -344,6 +345,42 @@ impl ComputeMonitor { } } + let mode: ComputeMode = self + .compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .expect("we launch ComputeMonitor only after we received a spec") + .spec + .mode; + match mode { + // TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary) + // then we should make sure that lsn_lease_state transitions back to None so we stop renewing it. + ComputeMode::Primary => (), + ComputeMode::Static(_) => (), + ComputeMode::Replica => { + // TODO: instead of apply_lsn, use min inflight request LSN + match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) { + Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") { + Ok(apply_lsn) => { + let apply_lsn = Lsn(apply_lsn.into()); + self.compute + .ro_replica + .update_min_inflight_request_lsn(apply_lsn); + } + Err(e) => { + anyhow::bail!("parse apply_lsn: {e}"); + } + }, + Err(e) => { + anyhow::bail!("query apply_lsn: {e}"); + } + } + } + } + Ok(()) } } diff --git a/compute_tools/src/ro_replica.rs b/compute_tools/src/ro_replica.rs new file mode 100644 index 0000000000..d221d0251d --- /dev/null +++ b/compute_tools/src/ro_replica.rs @@ -0,0 +1,20 @@ +use tracing::warn; +use utils::lsn::Lsn; + +#[derive(Default)] +pub(crate) struct GlobalState { + min_inflight_request_lsn: tokio::sync::watch::Sender>, +} + +impl GlobalState { + pub fn update_min_inflight_request_lsn(&self, update: Lsn) { + self.min_inflight_request_lsn.send_if_modified(|value| { + let modified = *value != Some(update); + if let Some(value) = *value && value > update { + warn!(current=%value, new=%update, "min inflight request lsn moving backwards, this should not happen, bug in communicator"); + } + *value = Some(update); + modified + }); + } +}