From 4897921de6c0cd721b14df4982fe4bc5a0bb330b Mon Sep 17 00:00:00 2001 From: William Huang Date: Fri, 2 May 2025 10:59:22 -0700 Subject: [PATCH] [BRC-3051] Walproposer: Safekeeper quorum health metrics (#930) Today we don't have any indications (other than spammy logs in PG that nobody monitors) if the Walproposer in PG cannot connect to/get votes from all Safekeepers. This means we don't have signals indicating that the Safekeepers are operating at degraded redundancy. We need these signals. Added plumbing in PG extension so that the `neon_perf_counters` view exports the following gauge metrics on safekeeper health: - `num_configured_safekeepers`: The total number of safekeepers configured in PG. - `num_active_safekeepers`: The number of safekeepers that PG is actively streaming WAL to. An alert should be raised whenever `num_active_safekeepers` < `num_configured_safekeepers`. The metrics are implemented by adding additional state to the Walproposer shared memory keeping track of the active statuses of safekeepers using a simple array. The status of the safekeeper is set to active (1) after the Walproposer acquires a quorum and starts streaming data to the safekeeper, and is set to inactive (0) when the connection with a safekeeper is shut down. We scan the safekeeper status array in Walproposer shared memory when collecting the metrics to produce results for the gauges. Added coverage for the metrics to integration test `test_wal_acceptor.py::test_timeline_disk_usage_limit`. --- libs/walproposer/src/api_bindings.rs | 28 ++++++++++++++++++++ libs/walproposer/src/walproposer.rs | 15 +++++++++++ pgxn/neon/neon_perf_counters.c | 28 ++++++++++++++++++++ pgxn/neon/walproposer.c | 16 +++++++++++- pgxn/neon/walproposer.h | 26 +++++++++++++++++++ pgxn/neon/walproposer_pg.c | 23 +++++++++++++++++ scripts/neon_grep.txt | 33 ++++++++++++++++++++++++ test_runner/regress/test_wal_acceptor.py | 32 ++++++++++++++++++++--- 8 files changed, 197 insertions(+), 4 deletions(-) diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 9f88ea6b11..7b09ee8080 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -341,6 +341,28 @@ extern "C-unwind" fn log_internal( } } +/* BEGIN_HADRON */ +extern "C" fn reset_safekeeper_statuses_for_metrics(wp: *mut WalProposer, num_safekeepers: u32) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).reset_safekeeper_statuses_for_metrics(&mut (*wp), num_safekeepers); + } +} + +extern "C" fn update_safekeeper_status_for_metrics( + wp: *mut WalProposer, + sk_index: u32, + status: u8, +) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).update_safekeeper_status_for_metrics(&mut (*wp), sk_index, status); + } +} +/* END_HADRON */ + #[derive(Debug, PartialEq)] pub enum Level { Debug5, @@ -414,6 +436,10 @@ pub(crate) fn create_api() -> walproposer_api { finish_sync_safekeepers: Some(finish_sync_safekeepers), process_safekeeper_feedback: Some(process_safekeeper_feedback), log_internal: Some(log_internal), + /* BEGIN_HADRON */ + reset_safekeeper_statuses_for_metrics: Some(reset_safekeeper_statuses_for_metrics), + update_safekeeper_status_for_metrics: Some(update_safekeeper_status_for_metrics), + /* END_HADRON */ } } @@ -451,6 +477,8 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { replica_promote: false, min_ps_feedback: empty_feedback, wal_rate_limiter: empty_wal_rate_limiter, + num_safekeepers: 0, + safekeeper_status: [0; 32], } } diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs index 93bb0d5eb0..8453279c5c 100644 --- a/libs/walproposer/src/walproposer.rs +++ b/libs/walproposer/src/walproposer.rs @@ -159,6 +159,21 @@ pub trait ApiImpl { fn after_election(&self, _wp: &mut WalProposer) { todo!() } + + /* BEGIN_HADRON */ + fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) { + // Do nothing for testing purposes. + } + + fn update_safekeeper_status_for_metrics( + &self, + _wp: &mut WalProposer, + _sk_index: u32, + _status: u8, + ) { + // Do nothing for testing purposes. + } + /* END_HADRON */ } #[derive(Debug)] diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index fada4cba1e..ed624ea6d6 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -20,6 +20,7 @@ #include "neon.h" #include "neon_perf_counters.h" #include "walproposer.h" +#include "walproposer.h" /* BEGIN_HADRON */ databricks_metrics *databricks_metrics_shared; @@ -391,6 +392,12 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) neon_per_backend_counters totals = {0}; metric_t *metrics; + /* BEGIN_HADRON */ + WalproposerShmemState *wp_shmem; + uint32 num_safekeepers; + uint32 num_active_safekeepers; + /* END_HADRON */ + /* We put all the tuples into a tuplestore in one go. */ InitMaterializedSRF(fcinfo, 0); @@ -437,11 +444,32 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) // Not ideal but piggyback our databricks counters into the neon perf counters view // so that we don't need to introduce neon--1.x+1.sql to add a new view. { + // Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define + // the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025. + + // Read safekeeper status from wal proposer shared memory first. + // Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is + // consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should + // not be a huge deal. + wp_shmem = GetWalpropShmemState(); + SpinLockAcquire(&wp_shmem->mutex); + num_safekeepers = wp_shmem->num_safekeepers; + num_active_safekeepers = 0; + for (int i = 0; i < num_safekeepers; i++) { + if (wp_shmem->safekeeper_status[i] == 1) { + num_active_safekeepers++; + } + } + SpinLockRelease(&wp_shmem->mutex); + } + { metric_t databricks_metrics[] = { {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, {"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)}, + {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers}, + {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers}, {NULL, false, 0, 0}, }; for (int i = 0; databricks_metrics[i].name != NULL; i++) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index c85a6f4b6f..dd42eaf18e 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -154,7 +154,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND; wp->safekeeper[wp->n_safekeepers].wp = wp; - + /* BEGIN_HADRON */ + wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers; + /* END_HADRON */ { Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; int written = 0; @@ -183,6 +185,10 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3) wp_log(FATAL, "enabling generations requires protocol version 3"); wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version); + + /* BEGIN_HADRON */ + wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers); + /* END_HADRON */ /* Fill the greeting package */ wp->greetRequest.pam.tag = 'g'; @@ -355,6 +361,10 @@ ShutdownConnection(Safekeeper *sk) sk->state = SS_OFFLINE; sk->streamingAt = InvalidXLogRecPtr; + /* BEGIN_HADRON */ + sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0); + /* END_HADRON */ + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); @@ -1530,6 +1540,10 @@ StartStreaming(Safekeeper *sk) sk->active_state = SS_ACTIVE_SEND; sk->streamingAt = sk->startStreamingAt; + /* BEGIN_HADRON */ + sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1); + /* END_HADRON */ + /* * Donors can only be in SS_ACTIVE state, so we potentially update the * donor when we switch one to SS_ACTIVE. diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index d6cd532bec..ac42c2925d 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -432,6 +432,10 @@ typedef struct WalproposerShmemState /* BEGIN_HADRON */ /* The WAL rate limiter */ WalRateLimiter wal_rate_limiter; + /* Number of safekeepers in the config */ + uint32 num_safekeepers; + /* Per-safekeeper status flags: 0=inactive, 1=active */ + uint8 safekeeper_status[MAX_SAFEKEEPERS]; /* END_HADRON */ } WalproposerShmemState; @@ -483,6 +487,11 @@ typedef struct Safekeeper char const *host; char const *port; + /* BEGIN_HADRON */ + /* index of this safekeeper in the WalProposer array */ + uint32 index; + /* END_HADRON */ + /* * connection string for connecting/reconnecting. * @@ -731,6 +740,23 @@ typedef struct walproposer_api * handled by elog(). */ void (*log_internal) (WalProposer *wp, int level, const char *line); + + /* + * BEGIN_HADRON + * APIs manipulating shared memory state used for Safekeeper quorum health metrics. + */ + + /* + * Reset the safekeeper statuses in shared memory for metric purposes. + */ + void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers); + + /* + * Update the safekeeper status in shared memory for metric purposes. + */ + void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status); + + /* END_HADRON */ } walproposer_api; /* diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 5db9b07ca3..0972b2df17 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -2235,6 +2235,27 @@ GetNeonCurrentClusterSize(void) } uint64 GetNeonCurrentClusterSize(void); +/* BEGIN_HADRON */ +static void +walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers) +{ + WalproposerShmemState* shmem = wp->api.get_shmem_state(wp); + SpinLockAcquire(&shmem->mutex); + shmem->num_safekeepers = num_safekeepers; + memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status)); + SpinLockRelease(&shmem->mutex); +} + +static void +walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status) +{ + WalproposerShmemState* shmem = wp->api.get_shmem_state(wp); + Assert(sk_index < MAX_SAFEKEEPERS); + SpinLockAcquire(&shmem->mutex); + shmem->safekeeper_status[sk_index] = status; + SpinLockRelease(&shmem->mutex); +} +/* END_HADRON */ static const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, @@ -2268,4 +2289,6 @@ static const walproposer_api walprop_pg = { .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers, .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, .log_internal = walprop_pg_log_internal, + .reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics, + .update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics, }; diff --git a/scripts/neon_grep.txt b/scripts/neon_grep.txt index 2014597dc4..8b323ab920 100644 --- a/scripts/neon_grep.txt +++ b/scripts/neon_grep.txt @@ -545,11 +545,22 @@ pgxn/neon/neon_perf_counters.c: metric_t *metrics = neon_perf_counters_to_met pgxn/neon/neon_perf_counters.c:PG_FUNCTION_INFO_V1(neon_get_perf_counters); pgxn/neon/neon_perf_counters.c:neon_get_perf_counters(PG_FUNCTION_ARGS) pgxn/neon/neon_perf_counters.c: neon_per_backend_counters totals = {0}; +pgxn/neon/neon_perf_counters.c: uint32 num_safekeepers; +pgxn/neon/neon_perf_counters.c: uint32 num_active_safekeepers; pgxn/neon/neon_perf_counters.c: for (int procno = 0; procno < NUM_NEON_PERF_COUNTER_SLOTS; procno++) pgxn/neon/neon_perf_counters.c: neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno]; pgxn/neon/neon_perf_counters.c: metrics = neon_perf_counters_to_metrics(&totals); pgxn/neon/neon_perf_counters.c: // Not ideal but piggyback our databricks counters into the neon perf counters view pgxn/neon/neon_perf_counters.c: // so that we don't need to introduce neon--1.x+1.sql to add a new view. +pgxn/neon/neon_perf_counters.c: // Read safekeeper status from wal proposer shared memory first. +pgxn/neon/neon_perf_counters.c: // Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is +pgxn/neon/neon_perf_counters.c: num_safekeepers = wp_shmem->num_safekeepers; +pgxn/neon/neon_perf_counters.c: num_active_safekeepers = 0; +pgxn/neon/neon_perf_counters.c: for (int i = 0; i < num_safekeepers; i++) { +pgxn/neon/neon_perf_counters.c: if (wp_shmem->safekeeper_status[i] == 1) { +pgxn/neon/neon_perf_counters.c: num_active_safekeepers++; +pgxn/neon/neon_perf_counters.c: {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers}, +pgxn/neon/neon_perf_counters.c: {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers}, pgxn/neon/neon_perf_counters.h: * neon_perf_counters.h pgxn/neon/neon_perf_counters.h: * Performance counters for neon storage requests pgxn/neon/neon_perf_counters.h:#ifndef NEON_PERF_COUNTERS_H @@ -1246,6 +1257,7 @@ pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].port = port; pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND; pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].wp = wp; +pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers; pgxn/neon/walproposer.c: Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers]; pgxn/neon/walproposer.c: sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant); pgxn/neon/walproposer.c: wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); @@ -1256,6 +1268,7 @@ pgxn/neon/walproposer.c: wp->n_safekeepers += 1; pgxn/neon/walproposer.c: if (wp->n_safekeepers < 1) pgxn/neon/walproposer.c: wp_log(FATAL, "safekeepers addresses are not specified"); pgxn/neon/walproposer.c: wp->quorum = wp->n_safekeepers / 2 + 1; +pgxn/neon/walproposer.c: wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers); pgxn/neon/walproposer.c: if (!wp->config->neon_timeline) pgxn/neon/walproposer.c: if (*wp->config->neon_timeline != '\0' && pgxn/neon/walproposer.c: !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16)) @@ -1280,6 +1293,7 @@ pgxn/neon/walproposer.c: for (int i = 0; i < wp->n_safekeepers; i++) pgxn/neon/walproposer.c: ResetConnection(&wp->safekeeper[i]); pgxn/neon/walproposer.c:/* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */ pgxn/neon/walproposer.c:ShutdownConnection(Safekeeper *sk) +pgxn/neon/walproposer.c: sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0); pgxn/neon/walproposer.c: sk->wp->api.rm_safekeeper_event_set(sk); pgxn/neon/walproposer.c:ResetConnection(Safekeeper *sk) pgxn/neon/walproposer.c: wp->api.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE); @@ -1367,6 +1381,7 @@ pgxn/neon/walproposer.c: * one; there it is flush_lsn in case of safekeeper or pgxn/neon/walproposer.c: * safekeeper pos as it obviously can't be higher. pgxn/neon/walproposer.c: * Start streaming to safekeeper sk, always updates state to SS_ACTIVE and sets pgxn/neon/walproposer.c:StartStreaming(Safekeeper *sk) +pgxn/neon/walproposer.c: sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1); pgxn/neon/walproposer.c: * Can be used only for safekeepers in SS_ACTIVE state. State can be changed pgxn/neon/walproposer.c:SendMessageToNode(Safekeeper *sk) pgxn/neon/walproposer.c: * Note: we always send everything to the safekeeper until WOULDBLOCK or @@ -1468,12 +1483,17 @@ pgxn/neon/walproposer.h: * Header of request with WAL message sent from proposer pgxn/neon/walproposer.h: XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */ pgxn/neon/walproposer.h: * minimal LSN which may be needed for recovery of some safekeeper (end pgxn/neon/walproposer.h: /* standby_status_update fields that safekeeper received from pageserver */ +pgxn/neon/walproposer.h: /* Number of safekeepers in the config */ +pgxn/neon/walproposer.h: uint32 num_safekeepers; +pgxn/neon/walproposer.h: /* Per-safekeeper status flags: 0=inactive, 1=active */ +pgxn/neon/walproposer.h: uint8 safekeeper_status[MAX_SAFEKEEPERS]; pgxn/neon/walproposer.h: * Report safekeeper state to proposer pgxn/neon/walproposer.h: * Current term of the safekeeper; if it is higher than proposer's, the pgxn/neon/walproposer.h: /* Safekeeper reports back his awareness about which WAL is committed, as */ pgxn/neon/walproposer.h: /* and custom neon feedback. */ pgxn/neon/walproposer.h: * Descriptor of safekeeper pgxn/neon/walproposer.h:typedef struct Safekeeper +pgxn/neon/walproposer.h: /* index of this safekeeper in the WalProposer array */ pgxn/neon/walproposer.h: * Temporary buffer for the message being sent to the safekeeper. pgxn/neon/walproposer.h: AppendRequestHeader appendRequest; /* request for sending to safekeeper */ pgxn/neon/walproposer.h: SafekeeperState state; /* safekeeper state machine state */ @@ -1515,6 +1535,11 @@ pgxn/neon/walproposer.h: void (*finish_sync_safekeepers) (WalProposer *wp, XLog pgxn/neon/walproposer.h: * Called after every AppendResponse from the safekeeper. Used to pgxn/neon/walproposer.h: * been commited on the quorum of safekeepers). pgxn/neon/walproposer.h: void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk); +pgxn/neon/walproposer.h: * APIs manipulating shared memory state used for Safekeeper quorum health metrics. +pgxn/neon/walproposer.h: * Reset the safekeeper statuses in shared memory for metric purposes. +pgxn/neon/walproposer.h: void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers); +pgxn/neon/walproposer.h: * Update the safekeeper status in shared memory for metric purposes. +pgxn/neon/walproposer.h: void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status); pgxn/neon/walproposer.h: char *neon_tenant; pgxn/neon/walproposer.h: char *neon_timeline; pgxn/neon/walproposer.h: * Comma-separated list of safekeepers, in the following format: @@ -1701,10 +1726,18 @@ pgxn/neon/walproposer_pg.c: /* flush_lsn - This is what durably stored in safek pgxn/neon/walproposer_pg.c:SetNeonCurrentClusterSize(uint64 size) pgxn/neon/walproposer_pg.c:GetNeonCurrentClusterSize(void) pgxn/neon/walproposer_pg.c:uint64 GetNeonCurrentClusterSize(void); +pgxn/neon/walproposer_pg.c:walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers) +pgxn/neon/walproposer_pg.c: shmem->num_safekeepers = num_safekeepers; +pgxn/neon/walproposer_pg.c: memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status)); +pgxn/neon/walproposer_pg.c:walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status) +pgxn/neon/walproposer_pg.c: Assert(sk_index < MAX_SAFEKEEPERS); +pgxn/neon/walproposer_pg.c: shmem->safekeeper_status[sk_index] = status; pgxn/neon/walproposer_pg.c: .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set, pgxn/neon/walproposer_pg.c: .rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set, pgxn/neon/walproposer_pg.c: .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers, pgxn/neon/walproposer_pg.c: .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, +pgxn/neon/walproposer_pg.c: .reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics, +pgxn/neon/walproposer_pg.c: .update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics, pgxn/neon/walsender_hooks.c: * Implements XLogReaderRoutine in terms of NeonWALReader. Allows for pgxn/neon/walsender_hooks.c: * fetching WAL from safekeepers, which normal xlogreader can't do. pgxn/neon/walsender_hooks.c:#include "neon.h" diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index c691087259..c478604834 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2757,18 +2757,32 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): remote_storage_kind = s3_storage() neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind) - # Set a very small disk usage limit (1KB) - neon_env_builder.safekeeper_extra_opts = ["--max-timeline-disk-usage-bytes=1024"] - env = neon_env_builder.init_start() # Create a timeline and endpoint env.create_branch("test_timeline_disk_usage_limit") endpoint = env.endpoints.create_start("test_timeline_disk_usage_limit") + # Install the neon extension in the test database. We need it to query perf counter metrics. + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + cur.execute("CREATE EXTENSION IF NOT EXISTS neon") + # Sanity-check safekeeper connection status in neon_perf_counters in the happy case. + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 active safekeeper" + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 configured safekeeper" + # Get the safekeeper sk = env.safekeepers[0] + # Restart the safekeeper with a very small disk usage limit (1KB) + sk.stop().start(["--max-timeline-disk-usage-bytes=1024"]) + # Inject a failpoint to stop WAL backup with sk.http_client() as http_cli: http_cli.configure_failpoints([("backup-lsn-range-pausable", "pause")]) @@ -2794,6 +2808,18 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder): wait_until(error_logged) log.info("Found expected error message in compute log, resuming.") + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # Confirm that neon_perf_counters also indicates that there are no active safekeepers + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_active_safekeepers'" + ) + assert cur.fetchone() == (0,), "Expected 0 active safekeepers" + cur.execute( + "SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'" + ) + assert cur.fetchone() == (1,), "Expected 1 configured safekeeper" + # Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we # implemented didn't work as expected. time.sleep(2)