From 842a5091d5db4c23aeb29aea070c37ad06b12d63 Mon Sep 17 00:00:00 2001 From: Suhas Thalanki <54014218+thesuhas@users.noreply.github.com> Date: Wed, 30 Jul 2025 11:14:59 -0400 Subject: [PATCH] [BRC-3051] Walproposer: Safekeeper quorum health metrics (#930) (#12750) Today we don't have any indications (other than spammy logs in PG that nobody monitors) if the Walproposer in PG cannot connect to/get votes from all Safekeepers. This means we don't have signals indicating that the Safekeepers are operating at degraded redundancy. We need these signals. Added plumbing in PG extension so that the `neon_perf_counters` view exports the following gauge metrics on safekeeper health: - `num_configured_safekeepers`: The total number of safekeepers configured in PG. - `num_active_safekeepers`: The number of safekeepers that PG is actively streaming WAL to. An alert should be raised whenever `num_active_safekeepers` < `num_configured_safekeepers`. The metrics are implemented by adding additional state to the Walproposer shared memory keeping track of the active statuses of safekeepers using a simple array. The status of the safekeeper is set to active (1) after the Walproposer acquires a quorum and starts streaming data to the safekeeper, and is set to inactive (0) when the connection with a safekeeper is shut down. We scan the safekeeper status array in Walproposer shared memory when collecting the metrics to produce results for the gauges. Added coverage for the metrics to integration test `test_wal_acceptor.py::test_timeline_disk_usage_limit`. ## Problem ## Summary of changes --------- Co-authored-by: William Huang --- libs/walproposer/src/api_bindings.rs | 34 ++++++++++++++++++++++++ libs/walproposer/src/walproposer.rs | 15 +++++++++++ pgxn/neon/neon_perf_counters.c | 27 +++++++++++++++++++ pgxn/neon/walproposer.c | 16 ++++++++++- pgxn/neon/walproposer.h | 26 ++++++++++++++++++ pgxn/neon/walproposer_pg.c | 23 ++++++++++++++++ test_runner/regress/test_wal_acceptor.py | 33 ++++++++++++++++++++--- 7 files changed, 170 insertions(+), 4 deletions(-) diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 9f88ea6b11..9c90beb379 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -341,6 +341,34 @@ extern "C-unwind" fn log_internal( } } +/* BEGIN_HADRON */ +extern "C" fn reset_safekeeper_statuses_for_metrics(wp: *mut WalProposer, num_safekeepers: u32) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + if api.is_null() { + return; + } + (*api).reset_safekeeper_statuses_for_metrics(&mut (*wp), num_safekeepers); + } +} + +extern "C" fn update_safekeeper_status_for_metrics( + wp: *mut WalProposer, + sk_index: u32, + status: u8, +) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + if api.is_null() { + return; + } + (*api).update_safekeeper_status_for_metrics(&mut (*wp), sk_index, status); + } +} +/* END_HADRON */ + #[derive(Debug, PartialEq)] pub enum Level { Debug5, @@ -414,6 +442,10 @@ pub(crate) fn create_api() -> walproposer_api { finish_sync_safekeepers: Some(finish_sync_safekeepers), process_safekeeper_feedback: Some(process_safekeeper_feedback), log_internal: Some(log_internal), + /* BEGIN_HADRON */ + reset_safekeeper_statuses_for_metrics: Some(reset_safekeeper_statuses_for_metrics), + update_safekeeper_status_for_metrics: Some(update_safekeeper_status_for_metrics), + /* END_HADRON */ } } @@ -451,6 +483,8 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { replica_promote: false, min_ps_feedback: empty_feedback, wal_rate_limiter: empty_wal_rate_limiter, + num_safekeepers: 0, + safekeeper_status: [0; 32], } } diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 93bb0d5eb0..8453279c5c 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -159,6 +159,21 @@ pub trait ApiImpl { fn after_election(&self, _wp: &mut WalProposer) { todo!() } + + /* BEGIN_HADRON */ + fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) { + // Do nothing for testing purposes. + } + + fn update_safekeeper_status_for_metrics( + &self, + _wp: &mut WalProposer, + _sk_index: u32, + _status: u8, + ) { + // Do nothing for testing purposes. + } + /* END_HADRON */ } #[derive(Debug)] diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index fada4cba1e..4527084514 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -391,6 +391,12 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) neon_per_backend_counters totals = {0}; metric_t *metrics; + /* BEGIN_HADRON */ + WalproposerShmemState *wp_shmem; + uint32 num_safekeepers; + uint32 num_active_safekeepers; + /* END_HADRON */ + /* We put all the tuples into a tuplestore in one go. */ InitMaterializedSRF(fcinfo, 0); @@ -437,11 +443,32 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) // Not ideal but piggyback our databricks counters into the neon perf counters view // so that we don't need to introduce neon--1.x+1.sql to add a new view. { + // Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define + // the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025. + + // Read safekeeper status from wal proposer shared memory first. + // Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is + // consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should + // not be a huge deal. + wp_shmem = GetWalpropShmemState(); + SpinLockAcquire(&wp_shmem->mutex); + num_safekeepers = wp_shmem->num_safekeepers; + num_active_safekeepers = 0; + for (int i = 0; i < num_safekeepers; i++) { + if (wp_shmem->safekeeper_status[i] == 1) { + num_active_safekeepers++; + } + } + SpinLockRelease(&wp_shmem->mutex); + } + { metric_t databricks_metrics[] = { {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, {"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)}, + {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers}, + {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers}, {NULL, false, 0, 0}, }; for (int i = 0; databricks_metrics[i].name != NULL; i++) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index c85a6f4b6f..dd42eaf18e 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -154,7 +154,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND; wp->safekeeper[wp->n_safekeepers].wp = wp; - + /* BEGIN_HADRON */ + wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers; + /* END_HADRON */ { Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; int written = 0; @@ -183,6 +185,10 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3) wp_log(FATAL, "enabling generations requires protocol version 3"); wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version); + + /* BEGIN_HADRON */ + wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers); + /* END_HADRON */ /* Fill the greeting package */ wp->greetRequest.pam.tag = 'g'; @@ -355,6 +361,10 @@ ShutdownConnection(Safekeeper *sk) sk->state = SS_OFFLINE; sk->streamingAt = InvalidXLogRecPtr; + /* BEGIN_HADRON */ + sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0); + /* END_HADRON */ + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); @@ -1530,6 +1540,10 @@ StartStreaming(Safekeeper *sk) sk->active_state = SS_ACTIVE_SEND; sk->streamingAt = sk->startStreamingAt; + /* BEGIN_HADRON */ + sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1); + /* END_HADRON */ + /* * Donors can only be in SS_ACTIVE state, so we potentially update the * donor when we switch one to SS_ACTIVE. diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index d6cd532bec..ac42c2925d 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -432,6 +432,10 @@ typedef struct WalproposerShmemState /* BEGIN_HADRON */ /* The WAL rate limiter */ WalRateLimiter wal_rate_limiter; + /* Number of safekeepers in the config */ + uint32 num_safekeepers; + /* Per-safekeeper status flags: 0=inactive, 1=active */ + uint8 safekeeper_status[MAX_SAFEKEEPERS]; /* END_HADRON */ } WalproposerShmemState; @@ -483,6 +487,11 @@ typedef struct Safekeeper char const *host; char const *port; + /* BEGIN_HADRON */ + /* index of this safekeeper in the WalProposer array */ + uint32 index; + /* END_HADRON */ + /* * connection string for connecting/reconnecting. * @@ -731,6 +740,23 @@ typedef struct walproposer_api * handled by elog(). */ void (*log_internal) (WalProposer *wp, int level, const char *line); + + /* + * BEGIN_HADRON + * APIs manipulating shared memory state used for Safekeeper quorum health metrics. + */ + + /* + * Reset the safekeeper statuses in shared memory for metric purposes. + */ + void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers); + + /* + * Update the safekeeper status in shared memory for metric purposes. + */ + void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status); + + /* END_HADRON */ } walproposer_api; /* diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index da86c5d498..47b5ec523f 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -2261,6 +2261,27 @@ GetNeonCurrentClusterSize(void) } uint64 GetNeonCurrentClusterSize(void); +/* BEGIN_HADRON */ +static void +walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers) +{ + WalproposerShmemState* shmem = wp->api.get_shmem_state(wp); + SpinLockAcquire(&shmem->mutex); + shmem->num_safekeepers = num_safekeepers; + memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status)); + SpinLockRelease(&shmem->mutex); +} + +static void +walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status) +{ + WalproposerShmemState* shmem = wp->api.get_shmem_state(wp); + Assert(sk_index < MAX_SAFEKEEPERS); + SpinLockAcquire(&shmem->mutex); + shmem->safekeeper_status[sk_index] = status; + SpinLockRelease(&shmem->mutex); +} +/* END_HADRON */ static const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, @@ -2294,4 +2315,6 @@ static const walproposer_api walprop_pg = { .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers, .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, .log_internal = walprop_pg_log_internal, + .reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics, + .update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics, }; diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index c691087259..33d308fb5a 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2742,6 +2742,7 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde wait_until(unevicted) +@pytest.mark.skip(reason="Lakebase mode") def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): """ Test that the timeline disk usage circuit breaker works as expected. We test that: @@ -2757,18 +2758,32 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): remote_storage_kind = s3_storage() neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind) - # Set a very small disk usage limit (1KB) - neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"] - env = neon_env_builder.init_start() # Create a timeline and endpoint env.create_branch("test_timeline_disk_usage_limit") endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit") + # Install the neon extension in the test database. We need it to query perf counter metrics. + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CREATE EXTENSION IF NOT EXISTS neon") + # Sanity-check safekeeper connection status in neon_perf_counters in the happy case. + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 active safekeeper" + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 configured safekeeper" + # Get the safekeeper sk = env.safekeepers[0] + # Restart the safekeeper with a very small disk usage limit (1KB) + sk.stop().start(["--max-timeline-disk-usage-bytes=1024"]) + # Inject a failpoint to stop WAL backup with sk.http_client() as http_cli: http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")]) @@ -2794,6 +2809,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): wait_until(error_logged) log.info("Found expected error message in compute log, resuming.") + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # Confirm that neon_perf_counters also indicates that there are no active safekeepers + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'" + ) + assert cur.fetchone() == (0,), "Expected 0 active safekeepers" + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 configured safekeeper" + # Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we # implemented didn't work as expected. time.sleep(2)