diff --git a/Cargo.lock b/Cargo.lock index 1c8a8b0c0f..2db012ef56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4037,6 +4037,7 @@ version = "0.2.4" source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2" dependencies = [ "bytes", + "chrono", "fallible-iterator", "postgres-protocol", ] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 8f96530a9d..50522444c8 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -17,7 +17,7 @@ nix.workspace = true notify.workspace = true num_cpus.workspace = true opentelemetry.workspace = true -postgres.workspace = true +postgres = { workspace = true, features = ["with-chrono-0_4"] } regex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 872a3f7750..2a338f4ce4 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -165,6 +165,34 @@ fn watch_compute_activity(compute: &ComputeNode) { continue; } } + // + // Don't suspend compute if there is activity in physical replication + // + let physical_replication_query = + "select last_msg_receipt_time from pg_stat_wal_receiver;"; + match cli.query_opt(physical_replication_query, &[]) + { + Ok(Some(row)) => match row.try_get::<&str, DateTime>("last_msg_receipt_time") + { + Ok(last_msg_receipt_time) => + { + compute.update_last_active(Some(last_msg_receipt_time)); + continue; + } + Err(e) => + { + warn!("failed to parse `pg_stat_wal_receiver` `last_msg_receipt_time`: {:?}", e); + continue; + } + }, + Ok(None) => { /* fall through */ }, + Err(e) => + { + warn!("Failed to query `pg_stat_wal_receiver`: {:?}", e); + continue; + } + } + // // Do not suspend compute if autovacuum is running //