diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 2eaad5c3c0..f064928c17 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -231,9 +231,9 @@ fn main() -> Result<()> { let compute = Arc::new(compute_node); // If this is a pooled VM, prewarm before starting HTTP server and becoming - // available for binding. Prewarming helps postgres start quicker later, + // available for binding. Prewarming helps Postgres start quicker later, // because QEMU will already have it's memory allocated from the host, and - // the necessary binaries will alreaady be cached. + // the necessary binaries will already be cached. if !spec_set { compute.prewarm_postgres()?; } @@ -276,6 +276,11 @@ fn main() -> Result<()> { state.status = ComputeStatus::Init; compute.state_changed.notify_all(); + + info!( + "running compute with features: {:?}", + state.pspec.as_ref().unwrap().spec.features + ); drop(state); // Launch remaining service threads diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c2c1952521..e701a60d77 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,7 +20,7 @@ use futures::StreamExt; use postgres::{Client, NoTls}; use tokio; use tokio_postgres; -use tracing::{error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -964,6 +964,16 @@ impl ComputeNode { Ok(pg_process) } + /// Update the `last_active` in the shared state, but ensure that it's a more recent one. + pub fn update_last_active(&self, last_active: Option>) { + let mut state = self.state.lock().unwrap(); + // NB: `Some()` is always greater than `None`. + if last_active > state.last_active { + state.last_active = last_active; + debug!("set the last compute activity time to: {:?}", last_active); + } + } + // Look for core dumps and collect backtraces. // // EKS worker nodes have following core dump settings: diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index fd19b7e53f..1a28aab479 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -3,88 +3,118 @@ use std::{thread, time::Duration}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use crate::compute::ComputeNode; +use compute_api::responses::ComputeStatus; +use compute_api::spec::ComputeFeature; const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); // Spin in a loop and figure out the last activity time in the Postgres. // Then update it in the shared state. This function never errors out. -// XXX: the only expected panic is at `RwLock` unwrap(). +// NB: the only expected panic is at `Mutex` unwrap(), all other errors +// should be handled gracefully. fn watch_compute_activity(compute: &ComputeNode) { // Suppose that `connstr` doesn't change let connstr = compute.connstr.as_str(); + + // During startup and configuration we connect to every Postgres database, + // but we don't want to count this as some user activity. So wait until + // the compute fully started before monitoring activity. + wait_for_postgres_start(compute); + // Define `client` outside of the loop to reuse existing connection if it's active. let mut client = Client::connect(connstr, NoTls); - info!("watching Postgres activity at {}", connstr); + let mut sleep = false; + let mut prev_active_time: Option = None; + let mut prev_sessions: Option = None; + + if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) { + info!("starting experimental activity monitor for {}", connstr); + } else { + info!("starting activity monitor for {}", connstr); + } loop { - // Should be outside of the write lock to allow others to read while we sleep. - thread::sleep(MONITOR_CHECK_INTERVAL); + // We use `continue` a lot, so it's more convenient to sleep at the top of the loop. + // But skip the first sleep, so we can connect to Postgres immediately. + if sleep { + // Should be outside of the mutex lock to allow others to read while we sleep. + thread::sleep(MONITOR_CHECK_INTERVAL); + } else { + sleep = true; + } match &mut client { Ok(cli) => { if cli.is_closed() { - info!("connection to postgres closed, trying to reconnect"); + info!("connection to Postgres is closed, trying to reconnect"); // Connection is closed, reconnect and try again. client = Client::connect(connstr, NoTls); continue; } - // Get all running client backends except ourself, use RFC3339 DateTime format. - let backends = cli - .query( - "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change - FROM pg_stat_activity - WHERE backend_type = 'client backend' - AND pid != pg_backend_pid() - AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors? - &[], - ); - let mut last_active = compute.state.lock().unwrap().last_active; + // This is a new logic, only enable if the feature flag is set. + // TODO: remove this once we are sure that it works OR drop it altogether. + if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) { + // First, check if the total active time or sessions across all databases has changed. + // If it did, it means that user executed some queries. In theory, it can even go down if + // some databases were dropped, but it's still a user activity. + match get_database_stats(cli) { + Ok((active_time, sessions)) => { + let mut detected_activity = false; - if let Ok(backs) = backends { - let mut idle_backs: Vec> = vec![]; - - for b in backs.into_iter() { - let state: String = match b.try_get("state") { - Ok(state) => state, - Err(_) => continue, - }; - - if state == "idle" { - let change: String = match b.try_get("state_change") { - Ok(state_change) => state_change, - Err(_) => continue, - }; - let change = DateTime::parse_from_rfc3339(&change); - match change { - Ok(t) => idle_backs.push(t.with_timezone(&Utc)), - Err(e) => { - info!("cannot parse backend state_change DateTime: {}", e); - continue; + prev_active_time = match prev_active_time { + Some(prev_active_time) => { + if active_time != prev_active_time { + detected_activity = true; + } + Some(active_time) } - } - } else { - // Found non-idle backend, so the last activity is NOW. - // Save it and exit the for loop. Also clear the idle backend - // `state_change` timestamps array as it doesn't matter now. - last_active = Some(Utc::now()); - idle_backs.clear(); - break; - } - } + None => Some(active_time), + }; + prev_sessions = match prev_sessions { + Some(prev_sessions) => { + if sessions != prev_sessions { + detected_activity = true; + } + Some(sessions) + } + None => Some(sessions), + }; - // Get idle backend `state_change` with the max timestamp. - if let Some(last) = idle_backs.iter().max() { - last_active = Some(*last); + if detected_activity { + // Update the last active time and continue, we don't need to + // check backends state change. + compute.update_last_active(Some(Utc::now())); + continue; + } + } + Err(e) => { + error!("could not get database statistics: {}", e); + continue; + } } } - // If there are existing (logical) walsenders, do not suspend. + // Second, if database statistics is the same, check all backends state change, + // maybe there is some with more recent activity. `get_backends_state_change()` + // can return None or stale timestamp, so it's `compute.update_last_active()` + // responsibility to check if the new timestamp is more recent than the current one. + // This helps us to discover new sessions, that did nothing yet. + match get_backends_state_change(cli) { + Ok(last_active) => { + compute.update_last_active(last_active); + } + Err(e) => { + error!("could not get backends state change: {}", e); + } + } + + // Finally, if there are existing (logical) walsenders, do not suspend. // // walproposer doesn't currently show up in pg_stat_replication, // but protect if it will be @@ -93,11 +123,12 @@ fn watch_compute_activity(compute: &ComputeNode) { Ok(r) => match r.try_get::<&str, i64>("count") { Ok(num_ws) => { if num_ws > 0 { - last_active = Some(Utc::now()); + compute.update_last_active(Some(Utc::now())); + continue; } } Err(e) => { - warn!("failed to parse ws count: {:?}", e); + warn!("failed to parse walsenders count: {:?}", e); continue; } }, @@ -106,17 +137,9 @@ fn watch_compute_activity(compute: &ComputeNode) { continue; } } - - // Update the last activity in the shared state if we got a more recent one. - let mut state = compute.state.lock().unwrap(); - // NB: `Some()` is always greater than `None`. - if last_active > state.last_active { - state.last_active = last_active; - debug!("set the last compute activity time to: {:?}", last_active); - } } Err(e) => { - debug!("cannot connect to postgres: {}, retrying", e); + debug!("could not connect to Postgres: {}, retrying", e); // Establish a new connection and try again. client = Client::connect(connstr, NoTls); @@ -125,12 +148,124 @@ fn watch_compute_activity(compute: &ComputeNode) { } } +// Hang on condition variable waiting until the compute status is `Running`. +fn wait_for_postgres_start(compute: &ComputeNode) { + let mut state = compute.state.lock().unwrap(); + while state.status != ComputeStatus::Running { + info!("compute is not running, waiting before monitoring activity"); + state = compute.state_changed.wait(state).unwrap(); + + if state.status == ComputeStatus::Running { + break; + } + } +} + +// Figure out the total active time and sessions across all non-system databases. +// Returned tuple is `(active_time, sessions)`. +// It can return `0.0` active time or `0` sessions, which means no user databases exist OR +// it was a start with skipped `pg_catalog` updates and user didn't do any queries +// (or open any sessions) yet. +fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> { + // Filter out `postgres` database as `compute_ctl` and other monitoring tools + // like `postgres_exporter` use it to query Postgres statistics. + // Use explicit 8 bytes type casts to match Rust types. + let stats = cli.query_one( + "SELECT coalesce(sum(active_time), 0.0)::float8 AS total_active_time, + coalesce(sum(sessions), 0)::bigint AS total_sessions + FROM pg_stat_database + WHERE datname NOT IN ( + 'postgres', + 'template0', + 'template1' + );", + &[], + ); + let stats = match stats { + Ok(stats) => stats, + Err(e) => { + return Err(anyhow::anyhow!("could not query active_time: {}", e)); + } + }; + + let active_time: f64 = match stats.try_get("total_active_time") { + Ok(active_time) => active_time, + Err(e) => return Err(anyhow::anyhow!("could not get total_active_time: {}", e)), + }; + + let sessions: i64 = match stats.try_get("total_sessions") { + Ok(sessions) => sessions, + Err(e) => return Err(anyhow::anyhow!("could not get total_sessions: {}", e)), + }; + + Ok((active_time, sessions)) +} + +// Figure out the most recent state change time across all client backends. +// If there is currently active backend, timestamp will be `Utc::now()`. +// It can return `None`, which means no client backends exist or we were +// unable to parse the timestamp. +fn get_backends_state_change(cli: &mut Client) -> anyhow::Result>> { + let mut last_active: Option> = None; + // Get all running client backends except ourself, use RFC3339 DateTime format. + let backends = cli.query( + "SELECT state, to_char(state_change, 'YYYY-MM-DD\"T\"HH24:MI:SS.US\"Z\"') AS state_change + FROM pg_stat_activity + WHERE backend_type = 'client backend' + AND pid != pg_backend_pid() + AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors? + &[], + ); + + match backends { + Ok(backs) => { + let mut idle_backs: Vec> = vec![]; + + for b in backs.into_iter() { + let state: String = match b.try_get("state") { + Ok(state) => state, + Err(_) => continue, + }; + + if state == "idle" { + let change: String = match b.try_get("state_change") { + Ok(state_change) => state_change, + Err(_) => continue, + }; + let change = DateTime::parse_from_rfc3339(&change); + match change { + Ok(t) => idle_backs.push(t.with_timezone(&Utc)), + Err(e) => { + info!("cannot parse backend state_change DateTime: {}", e); + continue; + } + } + } else { + // Found non-idle backend, so the last activity is NOW. + // Return immediately, no need to check other backends. + return Ok(Some(Utc::now())); + } + } + + // Get idle backend `state_change` with the max timestamp. + if let Some(last) = idle_backs.iter().max() { + last_active = Some(*last); + } + } + Err(e) => { + return Err(anyhow::anyhow!("could not query backends: {}", e)); + } + } + + Ok(last_active) +} + /// Launch a separate compute monitor thread and return its `JoinHandle`. -pub fn launch_monitor(state: &Arc) -> thread::JoinHandle<()> { - let state = Arc::clone(state); +pub fn launch_monitor(compute: &Arc) -> thread::JoinHandle<()> { + let compute = Arc::clone(compute); thread::Builder::new() .name("compute-monitor".into()) - .spawn(move || watch_compute_activity(&state)) + .spawn(move || watch_compute_activity(&compute)) .expect("cannot launch compute monitor thread") } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 4ff6831272..810b5c90d4 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -82,10 +82,13 @@ pub struct ComputeSpec { #[serde(rename_all = "snake_case")] pub enum ComputeFeature { // XXX: Add more feature flags here. + /// Enable the experimental activity monitor logic, which uses `pg_stat_database` to + /// track short-lived connections as user activity. + ActivityMonitorExperimental, - // This is a special feature flag that is used to represent unknown feature flags. - // Basically all unknown to enum flags are represented as this one. See unit test - // `parse_unknown_features()` for more details. + /// This is a special feature flag that is used to represent unknown feature flags. + /// Basically all unknown to enum flags are represented as this one. See unit test + /// `parse_unknown_features()` for more details. #[serde(other)] UnknownFeature, } @@ -282,4 +285,23 @@ mod tests { assert!(spec.features.contains(&ComputeFeature::UnknownFeature)); assert_eq!(spec.features, vec![ComputeFeature::UnknownFeature; 2]); } + + #[test] + fn parse_known_features() { + // Test that we can properly parse known feature flags. + let file = File::open("tests/cluster_spec.json").unwrap(); + let mut json: serde_json::Value = serde_json::from_reader(file).unwrap(); + let ob = json.as_object_mut().unwrap(); + + // Add known feature flags. + let features = vec!["activity_monitor_experimental"]; + ob.insert("features".into(), features.into()); + + let spec: ComputeSpec = serde_json::from_value(json).unwrap(); + + assert_eq!( + spec.features, + vec![ComputeFeature::ActivityMonitorExperimental] + ); + } }