take two: separate propagation path

This commit is contained in:
Christian Schwarz
2025-07-17 15:03:45 +00:00
parent 5c877c95ec
commit c9dbfd737d
6 changed files with 65 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -1348,6 +1348,7 @@ dependencies = [
"p256 0.13.2",
"pageserver_page_api",
"postgres",
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"regex",

View File

@@ -67,6 +67,7 @@ uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -46,7 +46,6 @@ use crate::logger::startup_context_from_env;
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
use crate::metrics::COMPUTE_CTL_UP;
use crate::monitor::launch_monitor;
use crate::pg_helpers::*;
use crate::pgbouncer::*;
use crate::rsyslog::{
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
@@ -57,6 +56,7 @@ use crate::swap::resize_swap;
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::tls::watch_cert_for_changes;
use crate::{config, extension_server, local_proxy};
use crate::{pg_helpers::*, ro_replica};
pub static SYNC_SAFEKEEPERS_PID: AtomicU32 = AtomicU32::new(0);
pub static PG_PID: AtomicU32 = AtomicU32::new(0);
@@ -131,6 +131,8 @@ pub struct ComputeNode {
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub compute_ctl_config: ComputeCtlConfig,
pub(crate) ro_replica: Arc<ro_replica::GlobalState>,
/// Handle to the extension stats collection task
extension_stats_task: TaskHandle,
lfc_offload_task: TaskHandle,
@@ -438,6 +440,7 @@ impl ComputeNode {
compute_ctl_config: config.compute_ctl_config,
extension_stats_task: Mutex::new(None),
lfc_offload_task: Mutex::new(None),
ro_replica: Arc::new(ro_replica::GlobalState::default()),
})
}

View File

@@ -23,6 +23,7 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub(crate) mod ro_replica;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

View File

@@ -4,9 +4,10 @@ use std::time::Duration;
use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use compute_api::spec::{ComputeFeature, ComputeMode};
use postgres::{Client, NoTls};
use tracing::{Level, error, info, instrument, span};
use utils::lsn::Lsn;
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
@@ -344,6 +345,42 @@ impl ComputeMonitor {
}
}
let mode: ComputeMode = self
.compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.expect("we launch ComputeMonitor only after we received a spec")
.spec
.mode;
match mode {
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
ComputeMode::Primary => (),
ComputeMode::Static(_) => (),
ComputeMode::Replica => {
// TODO: instead of apply_lsn, use min inflight request LSN
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
Ok(apply_lsn) => {
let apply_lsn = Lsn(apply_lsn.into());
self.compute
.ro_replica
.update_min_inflight_request_lsn(apply_lsn);
}
Err(e) => {
anyhow::bail!("parse apply_lsn: {e}");
}
},
Err(e) => {
anyhow::bail!("query apply_lsn: {e}");
}
}
}
}
Ok(())
}
}

View File

@@ -0,0 +1,20 @@
use tracing::warn;
use utils::lsn::Lsn;
#[derive(Default)]
pub(crate) struct GlobalState {
min_inflight_request_lsn: tokio::sync::watch::Sender<Option<Lsn>>,
}
impl GlobalState {
pub fn update_min_inflight_request_lsn(&self, update: Lsn) {
self.min_inflight_request_lsn.send_if_modified(|value| {
let modified = *value != Some(update);
if let Some(value) = *value && value > update {
warn!(current=%value, new=%update, "min inflight request lsn moving backwards, this should not happen, bug in communicator");
}
*value = Some(update);
modified
});
}
}