diff --git a/compute_tools/src/metrics.rs b/compute_tools/src/metrics.rs index fa00476fd2..e37d6120ac 100644 --- a/compute_tools/src/metrics.rs +++ b/compute_tools/src/metrics.rs @@ -1,8 +1,8 @@ -use metrics::core::{AtomicF64, Collector, GenericGauge}; +use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge}; use metrics::proto::MetricFamily; use metrics::{ - IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec, - register_int_gauge_vec, register_uint_gauge_vec, + IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter, + register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, }; use once_cell::sync::Lazy; @@ -81,6 +81,22 @@ pub(crate) static COMPUTE_CTL_UP: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +pub(crate) static PG_CURR_DOWNTIME_MS: Lazy> = Lazy::new(|| { + register_gauge!( + "compute_pg_current_downtime_ms", + "Non-cumulative duration of Postgres downtime in ms; resets after successful check", + ) + .expect("failed to define a metric") +}); + +pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy> = Lazy::new(|| { + register_int_counter!( + "compute_pg_downtime_ms_total", + "Cumulative duration of Postgres downtime in ms", + ) + .expect("failed to define a metric") +}); + pub fn collect() -> Vec { let mut metrics = COMPUTE_CTL_UP.collect(); metrics.extend(INSTALLED_EXTENSIONS.collect()); @@ -88,5 +104,7 @@ pub fn collect() -> Vec { metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect()); metrics.extend(DB_MIGRATION_FAILED.collect()); metrics.extend(AUDIT_LOG_DIR_SIZE.collect()); + metrics.extend(PG_CURR_DOWNTIME_MS.collect()); + metrics.extend(PG_TOTAL_DOWNTIME_MS.collect()); metrics } diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 83318538cd..5a07eec833 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -6,197 +6,294 @@ use chrono::{DateTime, Utc}; use compute_api::responses::ComputeStatus; use compute_api::spec::ComputeFeature; use postgres::{Client, NoTls}; -use tracing::{debug, error, info, warn}; +use tracing::{Level, error, info, instrument, span}; use crate::compute::ComputeNode; +use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); -// Spin in a loop and figure out the last activity time in the Postgres. -// Then update it in the shared state. This function never errors out. -// NB: the only expected panic is at `Mutex` unwrap(), all other errors -// should be handled gracefully. -fn watch_compute_activity(compute: &ComputeNode) { - // Suppose that `connstr` doesn't change - let connstr = compute.params.connstr.clone(); - let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor")); +struct ComputeMonitor { + compute: Arc, - // During startup and configuration we connect to every Postgres database, - // but we don't want to count this as some user activity. So wait until - // the compute fully started before monitoring activity. - wait_for_postgres_start(compute); + /// The moment when Postgres had some activity, + /// that should prevent compute from being suspended. + last_active: Option>, - // Define `client` outside of the loop to reuse existing connection if it's active. - let mut client = conf.connect(NoTls); + /// The moment when we last tried to check Postgres. + last_checked: DateTime, + /// The last moment we did a successful Postgres check. + last_up: DateTime, - let mut sleep = false; - let mut prev_active_time: Option = None; - let mut prev_sessions: Option = None; + /// Only used for internal statistics change tracking + /// between monitor runs and can be outdated. + active_time: Option, + /// Only used for internal statistics change tracking + /// between monitor runs and can be outdated. + sessions: Option, - if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) { - info!("starting experimental activity monitor for {}", connstr); - } else { - info!("starting activity monitor for {}", connstr); + /// Use experimental statistics-based activity monitor. It's no longer + /// 'experimental' per se, as it's enabled for everyone, but we still + /// keep the flag as an option to turn it off in some cases if it will + /// misbehave. + experimental: bool, +} + +impl ComputeMonitor { + fn report_down(&self) { + let now = Utc::now(); + + // Calculate and report current downtime + // (since the last time Postgres was up) + let downtime = now.signed_duration_since(self.last_up); + PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64); + + // Calculate and update total downtime + // (cumulative duration of Postgres downtime in ms) + let inc = now + .signed_duration_since(self.last_checked) + .num_milliseconds(); + PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64); } - loop { - // We use `continue` a lot, so it's more convenient to sleep at the top of the loop. - // But skip the first sleep, so we can connect to Postgres immediately. - if sleep { - // Should be outside of the mutex lock to allow others to read while we sleep. - thread::sleep(MONITOR_CHECK_INTERVAL); - } else { - sleep = true; - } + fn report_up(&mut self) { + self.last_up = Utc::now(); + PG_CURR_DOWNTIME_MS.set(0.0); + } - match &mut client { - Ok(cli) => { - if cli.is_closed() { - info!("connection to Postgres is closed, trying to reconnect"); + fn downtime_info(&self) -> String { + format!( + "total_ms: {}, current_ms: {}, last_up: {}", + PG_TOTAL_DOWNTIME_MS.get(), + PG_CURR_DOWNTIME_MS.get(), + self.last_up + ) + } - // Connection is closed, reconnect and try again. - client = conf.connect(NoTls); - continue; - } + /// Spin in a loop and figure out the last activity time in the Postgres. + /// Then update it in the shared state. This function never errors out. + /// NB: the only expected panic is at `Mutex` unwrap(), all other errors + /// should be handled gracefully. + #[instrument(skip_all)] + pub fn run(&mut self) { + // Suppose that `connstr` doesn't change + let connstr = self.compute.params.connstr.clone(); + let conf = self + .compute + .get_conn_conf(Some("compute_ctl:compute_monitor")); - // This is a new logic, only enable if the feature flag is set. - // TODO: remove this once we are sure that it works OR drop it altogether. - if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) { - // First, check if the total active time or sessions across all databases has changed. - // If it did, it means that user executed some queries. In theory, it can even go down if - // some databases were dropped, but it's still a user activity. - match get_database_stats(cli) { - Ok((active_time, sessions)) => { - let mut detected_activity = false; + // During startup and configuration we connect to every Postgres database, + // but we don't want to count this as some user activity. So wait until + // the compute fully started before monitoring activity. + wait_for_postgres_start(&self.compute); - prev_active_time = match prev_active_time { - Some(prev_active_time) => { - if active_time != prev_active_time { - detected_activity = true; - } - Some(active_time) - } - None => Some(active_time), - }; - prev_sessions = match prev_sessions { - Some(prev_sessions) => { - if sessions != prev_sessions { - detected_activity = true; - } - Some(sessions) - } - None => Some(sessions), - }; + // Define `client` outside of the loop to reuse existing connection if it's active. + let mut client = conf.connect(NoTls); - if detected_activity { - // Update the last active time and continue, we don't need to - // check backends state change. - compute.update_last_active(Some(Utc::now())); - continue; - } - } - Err(e) => { - error!("could not get database statistics: {}", e); - continue; - } - } - } + info!("starting compute monitor for {}", connstr); - // Second, if database statistics is the same, check all backends state change, - // maybe there is some with more recent activity. `get_backends_state_change()` - // can return None or stale timestamp, so it's `compute.update_last_active()` - // responsibility to check if the new timestamp is more recent than the current one. - // This helps us to discover new sessions, that did nothing yet. - match get_backends_state_change(cli) { - Ok(last_active) => { - compute.update_last_active(last_active); - } - Err(e) => { - error!("could not get backends state change: {}", e); - } - } - - // Finally, if there are existing (logical) walsenders, do not suspend. - // - // walproposer doesn't currently show up in pg_stat_replication, - // but protect if it will be - let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';"; - match cli.query_one(ws_count_query, &[]) { - Ok(r) => match r.try_get::<&str, i64>("count") { - Ok(num_ws) => { - if num_ws > 0 { - compute.update_last_active(Some(Utc::now())); - continue; - } - } - Err(e) => { - warn!("failed to parse walsenders count: {:?}", e); - continue; - } - }, - Err(e) => { - warn!("failed to get list of walsenders: {:?}", e); - continue; - } - } - // - // Don't suspend compute if there is an active logical replication subscription - // - // `where pid is not null` – to filter out read only computes and subscription on branches - // - let logical_subscriptions_query = - "select count(*) from pg_stat_subscription where pid is not null;"; - match cli.query_one(logical_subscriptions_query, &[]) { - Ok(row) => match row.try_get::<&str, i64>("count") { - Ok(num_subscribers) => { - if num_subscribers > 0 { - compute.update_last_active(Some(Utc::now())); - continue; - } - } - Err(e) => { - warn!("failed to parse `pg_stat_subscription` count: {:?}", e); - continue; - } - }, - Err(e) => { - warn!( - "failed to get list of active logical replication subscriptions: {:?}", - e + loop { + match &mut client { + Ok(cli) => { + if cli.is_closed() { + info!( + downtime_info = self.downtime_info(), + "connection to Postgres is closed, trying to reconnect" ); - continue; - } - } - // - // Do not suspend compute if autovacuum is running - // - let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'"; - match cli.query_one(autovacuum_count_query, &[]) { - Ok(r) => match r.try_get::<&str, i64>("count") { - Ok(num_workers) => { - if num_workers > 0 { - compute.update_last_active(Some(Utc::now())); - continue; + self.report_down(); + + // Connection is closed, reconnect and try again. + client = conf.connect(NoTls); + } else { + match self.check(cli) { + Ok(_) => { + self.report_up(); + self.compute.update_last_active(self.last_active); + } + Err(e) => { + // Although we have many places where we can return errors in `check()`, + // normally it shouldn't happen. I.e., we will likely return error if + // connection got broken, query timed out, Postgres returned invalid data, etc. + // In all such cases it's suspicious, so let's report this as downtime. + self.report_down(); + error!( + downtime_info = self.downtime_info(), + "could not check Postgres: {}", e + ); + + // Reconnect to Postgres just in case. During tests, I noticed + // that queries in `check()` can fail with `connection closed`, + // but `cli.is_closed()` above doesn't detect it. Even if old + // connection is still alive, it will be dropped when we reassign + // `client` to a new connection. + client = conf.connect(NoTls); } } - Err(e) => { - warn!("failed to parse autovacuum workers count: {:?}", e); - continue; - } - }, - Err(e) => { - warn!("failed to get list of autovacuum workers: {:?}", e); - continue; } } - } - Err(e) => { - debug!("could not connect to Postgres: {}, retrying", e); + Err(e) => { + info!( + downtime_info = self.downtime_info(), + "could not connect to Postgres: {}, retrying", e + ); + self.report_down(); - // Establish a new connection and try again. - client = conf.connect(NoTls); + // Establish a new connection and try again. + client = conf.connect(NoTls); + } + } + + // Reset the `last_checked` timestamp and sleep before the next iteration. + self.last_checked = Utc::now(); + thread::sleep(MONITOR_CHECK_INTERVAL); + } + } + + #[instrument(skip_all)] + fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> { + // This is new logic, only enable if the feature flag is set. + // TODO: remove this once we are sure that it works OR drop it altogether. + if self.experimental { + // Check if the total active time or sessions across all databases has changed. + // If it did, it means that user executed some queries. In theory, it can even go down if + // some databases were dropped, but it's still user activity. + match get_database_stats(cli) { + Ok((active_time, sessions)) => { + let mut detected_activity = false; + + if let Some(prev_active_time) = self.active_time { + if active_time != prev_active_time { + detected_activity = true; + } + } + self.active_time = Some(active_time); + + if let Some(prev_sessions) = self.sessions { + if sessions != prev_sessions { + detected_activity = true; + } + } + self.sessions = Some(sessions); + + if detected_activity { + // Update the last active time and continue, we don't need to + // check backends state change. + self.last_active = Some(Utc::now()); + return Ok(()); + } + } + Err(e) => { + return Err(anyhow::anyhow!("could not get database statistics: {}", e)); + } } } + + // If database statistics are the same, check all backends for state changes. + // Maybe there are some with more recent activity. `get_backends_state_change()` + // can return None or stale timestamp, so it's `compute.update_last_active()` + // responsibility to check if the new timestamp is more recent than the current one. + // This helps us to discover new sessions that have not done anything yet. + match get_backends_state_change(cli) { + Ok(last_active) => match (last_active, self.last_active) { + (Some(last_active), Some(prev_last_active)) => { + if last_active > prev_last_active { + self.last_active = Some(last_active); + return Ok(()); + } + } + (Some(last_active), None) => { + self.last_active = Some(last_active); + return Ok(()); + } + _ => {} + }, + Err(e) => { + return Err(anyhow::anyhow!( + "could not get backends state change: {}", + e + )); + } + } + + // If there are existing (logical) walsenders, do not suspend. + // + // N.B. walproposer doesn't currently show up in pg_stat_replication, + // but protect if it will. + const WS_COUNT_QUERY: &str = + "select count(*) from pg_stat_replication where application_name != 'walproposer';"; + match cli.query_one(WS_COUNT_QUERY, &[]) { + Ok(r) => match r.try_get::<&str, i64>("count") { + Ok(num_ws) => { + if num_ws > 0 { + self.last_active = Some(Utc::now()); + return Ok(()); + } + } + Err(e) => { + let err: anyhow::Error = e.into(); + return Err(err.context("failed to parse walsenders count")); + } + }, + Err(e) => { + return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e)); + } + } + + // Don't suspend compute if there is an active logical replication subscription + // + // `where pid is not null` – to filter out read only computes and subscription on branches + const LOGICAL_SUBSCRIPTIONS_QUERY: &str = + "select count(*) from pg_stat_subscription where pid is not null;"; + match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) { + Ok(row) => match row.try_get::<&str, i64>("count") { + Ok(num_subscribers) => { + if num_subscribers > 0 { + self.last_active = Some(Utc::now()); + return Ok(()); + } + } + Err(e) => { + return Err(anyhow::anyhow!( + "failed to parse 'pg_stat_subscription' count: {}", + e + )); + } + }, + Err(e) => { + return Err(anyhow::anyhow!( + "failed to get list of active logical replication subscriptions: {}", + e + )); + } + } + + // Do not suspend compute if autovacuum is running + const AUTOVACUUM_COUNT_QUERY: &str = + "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'"; + match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) { + Ok(r) => match r.try_get::<&str, i64>("count") { + Ok(num_workers) => { + if num_workers > 0 { + self.last_active = Some(Utc::now()); + return Ok(()); + }; + } + Err(e) => { + return Err(anyhow::anyhow!( + "failed to parse autovacuum workers count: {}", + e + )); + } + }, + Err(e) => { + return Err(anyhow::anyhow!( + "failed to get list of autovacuum workers: {}", + e + )); + } + } + + Ok(()) } } @@ -315,9 +412,24 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result) -> thread::JoinHandle<()> { let compute = Arc::clone(compute); + let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental); + let now = Utc::now(); + let mut monitor = ComputeMonitor { + compute, + last_active: None, + last_checked: now, + last_up: now, + active_time: None, + sessions: None, + experimental, + }; + let span = span!(Level::INFO, "compute_monitor"); thread::Builder::new() .name("compute-monitor".into()) - .spawn(move || watch_compute_activity(&compute)) + .spawn(move || { + let _enter = span.enter(); + monitor.run(); + }) .expect("cannot launch compute monitor thread") } diff --git a/test_runner/regress/test_compute_monitor.py b/test_runner/regress/test_compute_monitor.py new file mode 100644 index 0000000000..c0cc9a6e3b --- /dev/null +++ b/test_runner/regress/test_compute_monitor.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from fixtures.metrics import parse_metrics +from fixtures.utils import wait_until + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnv + + +def test_compute_monitor(neon_simple_env: NeonEnv): + """ + Test that compute_ctl can detect Postgres going down (unresponsive) and + reconnect when it comes back online. Also check that the downtime metrics + are properly emitted. + """ + TEST_DB = "test_compute_monitor" + + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + # Check that default postgres database is present + with endpoint.cursor() as cursor: + cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'") + catalog_db = cursor.fetchone() + assert catalog_db is not None + assert len(catalog_db) == 1 + + # Create a new database + cursor.execute(f"CREATE DATABASE {TEST_DB}") + + # Drop database 'postgres' + with endpoint.cursor(dbname=TEST_DB) as cursor: + # Use FORCE to terminate all connections to the database + cursor.execute("DROP DATABASE postgres WITH (FORCE)") + + client = endpoint.http_client() + + def check_metrics_down(): + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms") + assert len(compute_pg_current_downtime_ms) == 1 + assert compute_pg_current_downtime_ms[0].value > 0 + compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total") + assert len(compute_pg_downtime_ms_total) == 1 + assert compute_pg_downtime_ms_total[0].value > 0 + + wait_until(check_metrics_down) + + # Recreate postgres database + with endpoint.cursor(dbname=TEST_DB) as cursor: + cursor.execute("CREATE DATABASE postgres") + + # Current downtime should reset to 0, but not total downtime + def check_metrics_up(): + raw_metrics = client.metrics() + metrics = parse_metrics(raw_metrics) + compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms") + assert len(compute_pg_current_downtime_ms) == 1 + assert compute_pg_current_downtime_ms[0].value == 0 + compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total") + assert len(compute_pg_downtime_ms_total) == 1 + assert compute_pg_downtime_ms_total[0].value > 0 + + wait_until(check_metrics_up) + + # Just a sanity check that we log the downtime info + endpoint.log_contains("downtime_info")