diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 78ac423a9b..f5c8613b2e 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -407,8 +407,8 @@ fn get_database_stats(cli: &mut Client) -> anyhow::Result<(f64, i64)> { // like `postgres_exporter` use it to query Postgres statistics. // Use explicit 8 bytes type casts to match Rust types. let stats = cli.query_one( - "SELECT pg_catalog.coalesce(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time, - pg_catalog.coalesce(pg_catalog.sum(sessions), 0)::pg_catalog.bigint AS total_sessions + "SELECT COALESCE(pg_catalog.sum(active_time), 0.0)::pg_catalog.float8 AS total_active_time, + COALESCE(pg_catalog.sum(sessions), 0)::pg_catalog.int8 AS total_sessions FROM pg_catalog.pg_stat_database WHERE datname NOT IN ( 'postgres', diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 814ee2a52f..d4047f1832 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -241,7 +241,7 @@ impl ComputeControlPlane { drop_subscriptions_before_start, grpc, reconfigure_concurrency: 1, - features: vec![], + features: vec![ComputeFeature::ActivityMonitorExperimental], cluster: None, compute_ctl_config: compute_ctl_config.clone(), privileged_role_name: privileged_role_name.clone(), @@ -263,7 +263,7 @@ impl ComputeControlPlane { skip_pg_catalog_updates, drop_subscriptions_before_start, reconfigure_concurrency: 1, - features: vec![], + features: vec![ComputeFeature::ActivityMonitorExperimental], cluster: None, compute_ctl_config, privileged_role_name, diff --git a/test_runner/regress/test_compute_monitor.py b/test_runner/regress/test_compute_monitor.py index c0cc9a6e3b..b8e1a57b1f 100644 --- a/test_runner/regress/test_compute_monitor.py +++ b/test_runner/regress/test_compute_monitor.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time from typing import TYPE_CHECKING from fixtures.metrics import parse_metrics @@ -9,13 +10,13 @@ if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnv -def test_compute_monitor(neon_simple_env: NeonEnv): +def test_compute_monitor_downtime_calculation(neon_simple_env: NeonEnv): """ Test that compute_ctl can detect Postgres going down (unresponsive) and reconnect when it comes back online. Also check that the downtime metrics are properly emitted. """ - TEST_DB = "test_compute_monitor" + TEST_DB = "test_compute_monitor_downtime_calculation" env = neon_simple_env endpoint = env.endpoints.create_start("main") @@ -68,3 +69,56 @@ def test_compute_monitor(neon_simple_env: NeonEnv): # Just a sanity check that we log the downtime info endpoint.log_contains("downtime_info") + + +def test_compute_monitor_activity(neon_simple_env: NeonEnv): + """ + Test compute monitor correctly detects user activity inside Postgres + and updates last_active timestamp in the /status response. + """ + TEST_DB = "test_compute_monitor_activity_db" + + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + + with endpoint.cursor() as cursor: + # Create a new database because `postgres` DB is excluded + # from activity monitoring. + cursor.execute(f"CREATE DATABASE {TEST_DB}") + + client = endpoint.http_client() + + prev_last_active = None + + def check_last_active(): + nonlocal prev_last_active + + with endpoint.cursor(dbname=TEST_DB) as cursor: + # Execute some dummy query to generate 'activity'. + cursor.execute("SELECT * FROM generate_series(1, 10000)") + + status = client.status() + assert status["last_active"] is not None + prev_last_active = status["last_active"] + + wait_until(check_last_active) + + assert prev_last_active is not None + + # Sleep for everything to settle down. It's not strictly necessary, + # but should still remove any potential noise and/or prevent test from passing + # even if compute monitor is not working. + time.sleep(3) + + with endpoint.cursor(dbname=TEST_DB) as cursor: + cursor.execute("SELECT * FROM generate_series(1, 10000)") + + def check_last_active_updated(): + nonlocal prev_last_active + + status = client.status() + assert status["last_active"] is not None + assert status["last_active"] != prev_last_active + assert status["last_active"] > prev_last_active + + wait_until(check_last_active_updated)