[BRC-3051] Walproposer: Safekeeper quorum health metrics (#930)

Today we don't have any indications (other than spammy logs in PG that
nobody monitors) if the Walproposer in PG cannot connect to/get votes
from all Safekeepers. This means we don't have signals indicating that
the Safekeepers are operating at degraded redundancy. We need these
signals.

Added plumbing in PG extension so that the `neon_perf_counters` view
exports the following gauge metrics on safekeeper health:
- `num_configured_safekeepers`: The total number of safekeepers
configured in PG.
- `num_active_safekeepers`: The number of safekeepers that PG is
actively streaming WAL to.

An alert should be raised whenever `num_active_safekeepers` <
`num_configured_safekeepers`.

The metrics are implemented by adding additional state to the
Walproposer shared memory keeping track of the active statuses of
safekeepers using a simple array. The status of the safekeeper is set to
active (1) after the Walproposer acquires a quorum and starts streaming
data to the safekeeper, and is set to inactive (0) when the connection
with a safekeeper is shut down. We scan the safekeeper status array in
Walproposer shared memory when collecting the metrics to produce results
for the gauges.

Added coverage for the metrics to integration test
`test_wal_acceptor.py::test_timeline_disk_usage_limit`.
This commit is contained in:
William Huang
2025-05-02 10:59:22 -07:00
committed by Suhas Thalanki
parent 5510b2b750
commit 4897921de6
8 changed files with 197 additions and 4 deletions

View File

@@ -545,11 +545,22 @@ pgxn/neon/neon_perf_counters.c: metric_t *metrics = neon_perf_counters_to_met
pgxn/neon/neon_perf_counters.c:PG_FUNCTION_INFO_V1(neon_get_perf_counters);
pgxn/neon/neon_perf_counters.c:neon_get_perf_counters(PG_FUNCTION_ARGS)
pgxn/neon/neon_perf_counters.c: neon_per_backend_counters totals = {0};
pgxn/neon/neon_perf_counters.c: uint32 num_safekeepers;
pgxn/neon/neon_perf_counters.c: uint32 num_active_safekeepers;
pgxn/neon/neon_perf_counters.c: for (int procno = 0; procno < NUM_NEON_PERF_COUNTER_SLOTS; procno++)
pgxn/neon/neon_perf_counters.c: neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno];
pgxn/neon/neon_perf_counters.c: metrics = neon_perf_counters_to_metrics(&totals);
pgxn/neon/neon_perf_counters.c: // Not ideal but piggyback our databricks counters into the neon perf counters view
pgxn/neon/neon_perf_counters.c: // so that we don't need to introduce neon--1.x+1.sql to add a new view.
pgxn/neon/neon_perf_counters.c: // Read safekeeper status from wal proposer shared memory first.
pgxn/neon/neon_perf_counters.c: // Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
pgxn/neon/neon_perf_counters.c: num_safekeepers = wp_shmem->num_safekeepers;
pgxn/neon/neon_perf_counters.c: num_active_safekeepers = 0;
pgxn/neon/neon_perf_counters.c: for (int i = 0; i < num_safekeepers; i++) {
pgxn/neon/neon_perf_counters.c: if (wp_shmem->safekeeper_status[i] == 1) {
pgxn/neon/neon_perf_counters.c: num_active_safekeepers++;
pgxn/neon/neon_perf_counters.c: {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
pgxn/neon/neon_perf_counters.c: {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
pgxn/neon/neon_perf_counters.h: * neon_perf_counters.h
pgxn/neon/neon_perf_counters.h: * Performance counters for neon storage requests
pgxn/neon/neon_perf_counters.h:#ifndef NEON_PERF_COUNTERS_H
@@ -1246,6 +1257,7 @@ pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].port = port;
pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE;
pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].active_state = SS_ACTIVE_SEND;
pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].wp = wp;
pgxn/neon/walproposer.c: wp->safekeeper[wp->n_safekeepers].index = wp->n_safekeepers;
pgxn/neon/walproposer.c: Safekeeper *sk = &wp->safekeeper[wp->n_safekeepers];
pgxn/neon/walproposer.c: sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant);
pgxn/neon/walproposer.c: wp_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port);
@@ -1256,6 +1268,7 @@ pgxn/neon/walproposer.c: wp->n_safekeepers += 1;
pgxn/neon/walproposer.c: if (wp->n_safekeepers < 1)
pgxn/neon/walproposer.c: wp_log(FATAL, "safekeepers addresses are not specified");
pgxn/neon/walproposer.c: wp->quorum = wp->n_safekeepers / 2 + 1;
pgxn/neon/walproposer.c: wp->api.reset_safekeeper_statuses_for_metrics(wp, wp->n_safekeepers);
pgxn/neon/walproposer.c: if (!wp->config->neon_timeline)
pgxn/neon/walproposer.c: if (*wp->config->neon_timeline != '\0' &&
pgxn/neon/walproposer.c: !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16))
@@ -1280,6 +1293,7 @@ pgxn/neon/walproposer.c: for (int i = 0; i < wp->n_safekeepers; i++)
pgxn/neon/walproposer.c: ResetConnection(&wp->safekeeper[i]);
pgxn/neon/walproposer.c:/* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */
pgxn/neon/walproposer.c:ShutdownConnection(Safekeeper *sk)
pgxn/neon/walproposer.c: sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 0);
pgxn/neon/walproposer.c: sk->wp->api.rm_safekeeper_event_set(sk);
pgxn/neon/walproposer.c:ResetConnection(Safekeeper *sk)
pgxn/neon/walproposer.c: wp->api.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE);
@@ -1367,6 +1381,7 @@ pgxn/neon/walproposer.c: * one; there it is flush_lsn in case of safekeeper or
pgxn/neon/walproposer.c: * safekeeper pos as it obviously can't be higher.
pgxn/neon/walproposer.c: * Start streaming to safekeeper sk, always updates state to SS_ACTIVE and sets
pgxn/neon/walproposer.c:StartStreaming(Safekeeper *sk)
pgxn/neon/walproposer.c: sk->wp->api.update_safekeeper_status_for_metrics(sk->wp, sk->index, 1);
pgxn/neon/walproposer.c: * Can be used only for safekeepers in SS_ACTIVE state. State can be changed
pgxn/neon/walproposer.c:SendMessageToNode(Safekeeper *sk)
pgxn/neon/walproposer.c: * Note: we always send everything to the safekeeper until WOULDBLOCK or
@@ -1468,12 +1483,17 @@ pgxn/neon/walproposer.h: * Header of request with WAL message sent from proposer
pgxn/neon/walproposer.h: XLogRecPtr commitLsn; /* LSN committed by quorum of safekeepers */
pgxn/neon/walproposer.h: * minimal LSN which may be needed for recovery of some safekeeper (end
pgxn/neon/walproposer.h: /* standby_status_update fields that safekeeper received from pageserver */
pgxn/neon/walproposer.h: /* Number of safekeepers in the config */
pgxn/neon/walproposer.h: uint32 num_safekeepers;
pgxn/neon/walproposer.h: /* Per-safekeeper status flags: 0=inactive, 1=active */
pgxn/neon/walproposer.h: uint8 safekeeper_status[MAX_SAFEKEEPERS];
pgxn/neon/walproposer.h: * Report safekeeper state to proposer
pgxn/neon/walproposer.h: * Current term of the safekeeper; if it is higher than proposer's, the
pgxn/neon/walproposer.h: /* Safekeeper reports back his awareness about which WAL is committed, as */
pgxn/neon/walproposer.h: /* and custom neon feedback. */
pgxn/neon/walproposer.h: * Descriptor of safekeeper
pgxn/neon/walproposer.h:typedef struct Safekeeper
pgxn/neon/walproposer.h: /* index of this safekeeper in the WalProposer array */
pgxn/neon/walproposer.h: * Temporary buffer for the message being sent to the safekeeper.
pgxn/neon/walproposer.h: AppendRequestHeader appendRequest; /* request for sending to safekeeper */
pgxn/neon/walproposer.h: SafekeeperState state; /* safekeeper state machine state */
@@ -1515,6 +1535,11 @@ pgxn/neon/walproposer.h: void (*finish_sync_safekeepers) (WalProposer *wp, XLog
pgxn/neon/walproposer.h: * Called after every AppendResponse from the safekeeper. Used to
pgxn/neon/walproposer.h: * been commited on the quorum of safekeepers).
pgxn/neon/walproposer.h: void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
pgxn/neon/walproposer.h: * APIs manipulating shared memory state used for Safekeeper quorum health metrics.
pgxn/neon/walproposer.h: * Reset the safekeeper statuses in shared memory for metric purposes.
pgxn/neon/walproposer.h: void (*reset_safekeeper_statuses_for_metrics) (WalProposer *wp, uint32 num_safekeepers);
pgxn/neon/walproposer.h: * Update the safekeeper status in shared memory for metric purposes.
pgxn/neon/walproposer.h: void (*update_safekeeper_status_for_metrics) (WalProposer *wp, uint32 sk_index, uint8 status);
pgxn/neon/walproposer.h: char *neon_tenant;
pgxn/neon/walproposer.h: char *neon_timeline;
pgxn/neon/walproposer.h: * Comma-separated list of safekeepers, in the following format:
@@ -1701,10 +1726,18 @@ pgxn/neon/walproposer_pg.c: /* flush_lsn - This is what durably stored in safek
pgxn/neon/walproposer_pg.c:SetNeonCurrentClusterSize(uint64 size)
pgxn/neon/walproposer_pg.c:GetNeonCurrentClusterSize(void)
pgxn/neon/walproposer_pg.c:uint64 GetNeonCurrentClusterSize(void);
pgxn/neon/walproposer_pg.c:walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers)
pgxn/neon/walproposer_pg.c: shmem->num_safekeepers = num_safekeepers;
pgxn/neon/walproposer_pg.c: memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
pgxn/neon/walproposer_pg.c:walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status)
pgxn/neon/walproposer_pg.c: Assert(sk_index < MAX_SAFEKEEPERS);
pgxn/neon/walproposer_pg.c: shmem->safekeeper_status[sk_index] = status;
pgxn/neon/walproposer_pg.c: .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set,
pgxn/neon/walproposer_pg.c: .rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set,
pgxn/neon/walproposer_pg.c: .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers,
pgxn/neon/walproposer_pg.c: .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback,
pgxn/neon/walproposer_pg.c: .reset_safekeeper_statuses_for_metrics = walprop_pg_reset_safekeeper_statuses_for_metrics,
pgxn/neon/walproposer_pg.c: .update_safekeeper_status_for_metrics = walprop_pg_update_safekeeper_status_for_metrics,
pgxn/neon/walsender_hooks.c: * Implements XLogReaderRoutine in terms of NeonWALReader. Allows for
pgxn/neon/walsender_hooks.c: * fetching WAL from safekeepers, which normal xlogreader can't do.
pgxn/neon/walsender_hooks.c:#include "neon.h"