Compare commits

...

36 Commits

Author SHA1 Message Date
Suhas Thalanki
bf6f7808c0 python fmt 2025-07-31 10:45:24 -04:00
Suhas Thalanki
2b9f9ca877 added lakebasemode 2025-07-31 10:42:10 -04:00
Suhas Thalanki
8579272f65 Merge branch 'main' into thesuhas/brc-3082 2025-07-31 10:36:56 -04:00
Suhas Thalanki
f3561c4601 removed lakebase_mode condition for wal rate limiting 2025-07-30 14:39:26 -04:00
Suhas Thalanki
815e4a0986 enable lakebase mode 2025-07-30 13:55:54 -04:00
Suhas Thalanki
4095ccce66 Merge branch 'main' into thesuhas/brc-3082 2025-07-30 11:29:23 -04:00
Suhas Thalanki
c8b02ed3f1 added lakebase_mode wrappers 2025-07-30 11:12:56 -04:00
Suhas Thalanki
591fc820c9 Merge branch 'thesuhas/brc-3051' into thesuhas/brc-3082 2025-07-30 10:36:04 -04:00
Suhas Thalanki
904e63c576 add nullptr checks 2025-07-29 17:48:43 -04:00
Suhas Thalanki
78aa0342b0 applied changes from neon #12126 2025-07-29 17:45:06 -04:00
Suhas Thalanki
912fcf5fcf Merge branch 'main' into thesuhas/brc-3051 2025-07-29 17:34:21 -04:00
Suhas Thalanki
2790461441 Delete scripts/neon_grep.txt 2025-07-29 14:36:22 -04:00
Suhas Thalanki
1a0e100627 reset pg 16 2025-07-29 14:35:46 -04:00
Suhas Thalanki
323b75cfdc Merge branch 'thesuhas/brc-3051' of github.com:neondatabase/neon into thesuhas/brc-3051 2025-07-29 14:33:55 -04:00
Suhas Thalanki
3e49e071e5 reset pg16 2025-07-29 14:33:40 -04:00
Suhas Thalanki
d524e5ebd6 Delete scripts/neon_grep.txt 2025-07-29 11:01:55 -04:00
Suhas Thalanki
9bd0045869 remove hadron k8s 2025-07-25 16:59:41 -04:00
Suhas Thalanki
2caf3e7dd2 Merge branch 'thesuhas/brc-3051' into thesuhas/brc-3082 2025-07-25 16:00:50 -04:00
Suhas Thalanki
1805f055ad fixed perfcounters 2025-07-25 15:32:52 -04:00
Suhas Thalanki
2de46df059 skipping test for lakebase 2025-07-25 15:30:15 -04:00
Suhas Thalanki
e4e1a50ea1 removed kind test 2025-07-25 15:28:13 -04:00
Suhas Thalanki
18cadae0f1 python fmt 2025-07-25 15:22:25 -04:00
Suhas Thalanki
4c0efd73a3 fixed syntax issues 2025-07-25 15:18:27 -04:00
William Huang
fae734b15c [BRC-3082] Monitor commit LSN lag among active SKs (#1002)
Commit
e69c3d632b
added metrics (used for alerting) to indicate whether Safekeepers are
operating with a degraded quorum due to some of them being down.
However, even if all SKs are active/reachable, we probably still want to
raise an alert if some of them are really slow or otherwise lagging
behind, as it is technically still a "degraded quorum" situation.

Added a new field `max_active_safekeeper_commit_lag` to the
`neon_perf_counters` view that reports the lag between the most advanced
and most lagging commit LSNs among active Safekeepers.

Commit LSNs are received from `AppendResponse` messages from SKs and
recorded in the `WalProposer`'s shared memory state.

Note that this lag is calculated among active SKs only to keep this
alert clean. If there are inactive SKs the previous metric on active SKs
will capture that instead.

Note: @chen-luo_data pointed out during the PR review that we can
probably get the benefits of this metric with PromQL query like `max
(safekeeper_commit_lsn) by (tenant_id, timeline_id) -
min(safekeeper_commit_lsn) by (tenant_id, timeline_id)` on existing
metrics exported by SKs.

Given that this code is already ready, @haoyu-huang_data suggested that
I just check in this change anyway, as the reliability of prometheus
metrics (and especially the aggregation operators when the result set
cardinality is high) is somewhat questionable based on our prior
experience.

Added integration test
`test_wal_acceptor.py::test_max_active_safekeeper_commit_lag`.
2025-07-25 15:08:49 -04:00
William Huang
4897921de6 [BRC-3051] Walproposer: Safekeeper quorum health metrics (#930)
Today we don't have any indications (other than spammy logs in PG that
nobody monitors) if the Walproposer in PG cannot connect to/get votes
from all Safekeepers. This means we don't have signals indicating that
the Safekeepers are operating at degraded redundancy. We need these
signals.

Added plumbing in PG extension so that the `neon_perf_counters` view
exports the following gauge metrics on safekeeper health:
- `num_configured_safekeepers`: The total number of safekeepers
configured in PG.
- `num_active_safekeepers`: The number of safekeepers that PG is
actively streaming WAL to.

An alert should be raised whenever `num_active_safekeepers` <
`num_configured_safekeepers`.

The metrics are implemented by adding additional state to the
Walproposer shared memory keeping track of the active statuses of
safekeepers using a simple array. The status of the safekeeper is set to
active (1) after the Walproposer acquires a quorum and starts streaming
data to the safekeeper, and is set to inactive (0) when the connection
with a safekeeper is shut down. We scan the safekeeper status array in
Walproposer shared memory when collecting the metrics to produce results
for the gauges.

Added coverage for the metrics to integration test
`test_wal_acceptor.py::test_timeline_disk_usage_limit`.
2025-07-25 14:50:59 -04:00
Suhas Thalanki
5510b2b750 python fmt 2025-07-25 13:26:30 -04:00
Suhas Thalanki
81a684c308 cargo fmt 2025-07-25 13:22:25 -04:00
Suhas Thalanki
f8ffacbfef skipping lakebase test 2025-07-25 13:21:36 -04:00
William Huang
f7dd2108bb [BRC-2905] Feed back PS-detected data corruption signals to SK and PG walproposer (#895)
Data corruptions are typically detected on the pageserver side when it
replays WAL records. However, since PS doesn't synchronously replay WAL
records as they are being ingested through safekeepers, we need some
extra plumbing to feed information about pageserver-detected corruptions
during compaction (and/or WAL redo in general) back to SK and PG for
proper action.

We don't yet know what actions PG/SK should take upon receiving the
signal, but we should have the detection and feedback in place.

Add an extra `corruption_detected` field to the `PageserverFeedback`
message that is sent from PS -> SK -> PG. It's a boolean value that is
set to true when PS detects a "critical error" that signals data
corruption, and it's sent in all `PageserverFeedback` messages. Upon
receiving this signal, the safekeeper raises a
`safekeeper_ps_corruption_detected` gauge metric (value set to 1). The
safekeeper then forwards this signal to PG where a
`ps_corruption_detected` gauge metric (value also set to 1) is raised in
the `neon_perf_counters` view.

Added an integration test in
`test_compaction.py::test_ps_corruption_detection_feedback` that
confirms that the safekeeper and PG can receive the data corruption
signal in the `PageserverFeedback` message in a simulated data
corruption.
2025-07-25 12:10:03 -04:00
Suhas Thalanki
043408a88d fixed PR, removed spillover content from other PRs 2025-07-24 17:58:09 -04:00
Suhas Thalanki
1e91a3b63e fixed failing test 2025-07-24 17:25:24 -04:00
Suhas Thalanki
4f38ffcd29 updated pg 2025-07-24 16:08:25 -04:00
Suhas Thalanki
3635fdce4e updated pg 2025-07-24 15:31:23 -04:00
Suhas Thalanki
a371e40550 updated pg 2025-07-24 15:10:36 -04:00
Suhas Thalanki
39c76e2226 fixed syntax issues 2025-07-24 14:51:51 -04:00
Suhas Thalanki
df12db2cec migrated changes from hadron 2025-07-24 13:41:29 -04:00
7 changed files with 143 additions and 17 deletions

View File

@@ -485,6 +485,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
wal_rate_limiter: empty_wal_rate_limiter,
num_safekeepers: 0,
safekeeper_status: [0; 32],
safekeeper_commit_lsn: [0; 32],
}
}

View File

@@ -395,6 +395,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
WalproposerShmemState *wp_shmem;
uint32 num_safekeepers;
uint32 num_active_safekeepers;
XLogRecPtr max_active_safekeeper_commit_lag;
/* END_HADRON */
/* We put all the tuples into a tuplestore in one go. */
@@ -443,25 +444,43 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
// Not ideal but piggyback our databricks counters into the neon perf counters view
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
{
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
// Read safekeeper status from wal proposer shared memory first.
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
// not be a huge deal.
wp_shmem = GetWalpropShmemState();
SpinLockAcquire(&wp_shmem->mutex);
num_safekeepers = wp_shmem->num_safekeepers;
num_active_safekeepers = 0;
for (int i = 0; i < num_safekeepers; i++) {
if (wp_shmem->safekeeper_status[i] == 1) {
num_active_safekeepers++;
// Read safekeeper status from wal proposer shared memory first.
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
// not be a huge deal.
XLogRecPtr min_commit_lsn = InvalidXLogRecPtr;
XLogRecPtr max_commit_lsn = InvalidXLogRecPtr;
XLogRecPtr lsn;
wp_shmem = GetWalpropShmemState();
SpinLockAcquire(&wp_shmem->mutex);
num_safekeepers = wp_shmem->num_safekeepers;
num_active_safekeepers = 0;
for (int i = 0; i < num_safekeepers; i++) {
if (wp_shmem->safekeeper_status[i] == 1) {
num_active_safekeepers++;
// Only track the commit LSN lag among active safekeepers.
// If there are inactive safekeepers we will raise another alert so this lag value
// is less critical.
lsn = wp_shmem->safekeeper_commit_lsn[i];
if (XLogRecPtrIsInvalid(min_commit_lsn) || lsn < min_commit_lsn) {
min_commit_lsn = lsn;
}
if (XLogRecPtrIsInvalid(max_commit_lsn) || lsn > max_commit_lsn) {
max_commit_lsn = lsn;
}
}
}
// Calculate max commit LSN lag across active safekeepers
max_active_safekeeper_commit_lag = (XLogRecPtrIsInvalid(min_commit_lsn) ? 0 : max_commit_lsn - min_commit_lsn);
SpinLockRelease(&wp_shmem->mutex);
}
SpinLockRelease(&wp_shmem->mutex);
}
{
{
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
@@ -469,6 +488,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
{"max_active_safekeeper_commit_lag", false, 0.0, (double) max_active_safekeeper_commit_lag},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)
@@ -479,7 +499,6 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
}
/* END_HADRON */
}
pfree(metrics);
return (Datum) 0;

View File

@@ -436,6 +436,8 @@ typedef struct WalproposerShmemState
uint32 num_safekeepers;
/* Per-safekeeper status flags: 0=inactive, 1=active */
uint8 safekeeper_status[MAX_SAFEKEEPERS];
/* Per-safekeeper commit LSN for metrics */
XLogRecPtr safekeeper_commit_lsn[MAX_SAFEKEEPERS];
/* END_HADRON */
} WalproposerShmemState;

View File

@@ -2132,6 +2132,18 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
if (wp->config->syncSafekeepers)
return;
/* BEGIN_HADRON */
if (lakebase_mode) {
// Record safekeeper commit LSN in shared memory for lag monitoring
{
WalproposerShmemState *shmem = wp->api.get_shmem_state(wp);
Assert(sk->index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
SpinLockRelease(&shmem->mutex);
}
}
/* END_HADRON */
/* handle fresh ps_feedback */
if (sk->appendResponse.ps_feedback.present)
@@ -2269,6 +2281,7 @@ walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_saf
SpinLockAcquire(&shmem->mutex);
shmem->num_safekeepers = num_safekeepers;
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
memset(shmem->safekeeper_commit_lsn, 0, sizeof(shmem->safekeeper_commit_lsn));
SpinLockRelease(&shmem->mutex);
}

View File

@@ -24,6 +24,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::pausable_failpoint;
use crate::GlobalTimelines;
use crate::handler::SafekeeperPostgresHandler;
@@ -598,6 +599,8 @@ impl WalAcceptor {
// Note that a flush can still happen on segment bounds, which will result
// in an AppendResponse.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
// allow tests to pause AppendRequest processing to simulate lag
pausable_failpoint!("sk-acceptor-pausable");
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}

View File

@@ -373,6 +373,8 @@ def test_max_wal_rate(neon_simple_env: NeonEnv):
config_lines=[
# we need this option because default max_cluster_size < 0 will disable throttling completely
"neon.max_cluster_size=10GB",
# enable lakebase mode for WAL Rate Limiting
"neon.lakebase_mode=true",
],
)

View File

@@ -2781,6 +2781,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Check that max_active_safekeeper_commit_lag metric exists and is zero with single safekeeper
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
assert cur.fetchone() == (0,), "Expected zero commit lag with one safekeeper"
# Get the safekeeper
sk = env.safekeepers[0]
@@ -2824,6 +2829,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Check that max_active_safekeeper_commit_lag metric exists and is zero with no active safekeepers
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
assert cur.fetchone() == (0,), "Expected zero commit lag with no active safekeepers"
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
@@ -2938,3 +2948,79 @@ def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
with conn.cursor() as cur:
cur.execute("select count(*) from t2")
assert cur.fetchone() == (3000,)
def test_max_active_safekeeper_commit_lag(neon_env_builder: NeonEnvBuilder):
"""
This test validates the `max_active_safekeeper_commit_lag` metric. The
strategy is to intentionally create a scenario where one safekeeper falls
behind (by pausing it with a failpoint), observe that the metric correctly
reports this lag, and then confirm that the metric returns to zero after the
lagging safekeeper catches up (once the failpoint is removed).
"""
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
# Create branch and start endpoint
env.create_branch("test_commit_lsn_lag_failpoint")
endpoint = env.endpoints.create_start(
"test_commit_lsn_lag_failpoint", config_lines=["neon.lakebase_mode=true"]
)
# Enable neon extension and table
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# Identify the lagging safekeeper and configure failpoint to pause
lagging_sk = env.safekeepers[1]
with lagging_sk.http_client() as http_cli:
http_cli.configure_failpoints(("sk-acceptor-pausable", "pause"))
# Note: Insert could hang because the failpoint above causes the safekeepers to lose quorum.
def run_hanging_insert():
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,500), 'payload'")
# Start the insert in a background thread
bg_thread = threading.Thread(target=run_hanging_insert)
bg_thread.start()
# Wait for the lag metric to become positive
def lag_is_positive():
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
row = cur.fetchone()
assert row is not None, "max_active_safekeeper_commit_lag metric not found"
lag = row[0]
log.info(f"Current commit lag: {lag}")
if lag == 0.0:
raise Exception("Commit lag is still zero, trying again...")
# Confirm that we can observe a positive lag value
wait_until(lag_is_positive)
# Unpause the failpoint so that the safekeepers sync back up. This should also unstuck the hanging insert.
with lagging_sk.http_client() as http_cli:
http_cli.configure_failpoints(("sk-acceptor-pausable", "off"))
# Wait for the hanging insert to complete
bg_thread.join(timeout=30)
assert not bg_thread.is_alive(), "Hanging insert did not complete within timeout"
log.info("Hanging insert is unstuck successfully")
def lag_is_zero():
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
row = cur.fetchone()
assert row is not None, (
"max_active_safekeeper_commit_lag metric not found in lag_is_zero"
)
lag = row[0]
log.info(f"Current commit lag: {lag}")
return lag == 0.0
# Confirm that the lag eventually returns to zero
wait_until(lag_is_zero)