mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
Compare commits
36 Commits
bodobolero
...
thesuhas/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bf6f7808c0 | ||
|
|
2b9f9ca877 | ||
|
|
8579272f65 | ||
|
|
f3561c4601 | ||
|
|
815e4a0986 | ||
|
|
4095ccce66 | ||
|
|
c8b02ed3f1 | ||
|
|
591fc820c9 | ||
|
|
904e63c576 | ||
|
|
78aa0342b0 | ||
|
|
912fcf5fcf | ||
|
|
2790461441 | ||
|
|
1a0e100627 | ||
|
|
323b75cfdc | ||
|
|
3e49e071e5 | ||
|
|
d524e5ebd6 | ||
|
|
9bd0045869 | ||
|
|
2caf3e7dd2 | ||
|
|
1805f055ad | ||
|
|
2de46df059 | ||
|
|
e4e1a50ea1 | ||
|
|
18cadae0f1 | ||
|
|
4c0efd73a3 | ||
|
|
fae734b15c | ||
|
|
4897921de6 | ||
|
|
5510b2b750 | ||
|
|
81a684c308 | ||
|
|
f8ffacbfef | ||
|
|
f7dd2108bb | ||
|
|
043408a88d | ||
|
|
1e91a3b63e | ||
|
|
4f38ffcd29 | ||
|
|
3635fdce4e | ||
|
|
a371e40550 | ||
|
|
39c76e2226 | ||
|
|
df12db2cec |
@@ -485,6 +485,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
wal_rate_limiter: empty_wal_rate_limiter,
|
||||
num_safekeepers: 0,
|
||||
safekeeper_status: [0; 32],
|
||||
safekeeper_commit_lsn: [0; 32],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -395,6 +395,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
WalproposerShmemState *wp_shmem;
|
||||
uint32 num_safekeepers;
|
||||
uint32 num_active_safekeepers;
|
||||
XLogRecPtr max_active_safekeeper_commit_lag;
|
||||
/* END_HADRON */
|
||||
|
||||
/* We put all the tuples into a tuplestore in one go. */
|
||||
@@ -443,25 +444,43 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
// Not ideal but piggyback our databricks counters into the neon perf counters view
|
||||
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
|
||||
{
|
||||
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
|
||||
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
|
||||
// Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define
|
||||
// the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025.
|
||||
|
||||
// Read safekeeper status from wal proposer shared memory first.
|
||||
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
|
||||
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
|
||||
// not be a huge deal.
|
||||
wp_shmem = GetWalpropShmemState();
|
||||
SpinLockAcquire(&wp_shmem->mutex);
|
||||
num_safekeepers = wp_shmem->num_safekeepers;
|
||||
num_active_safekeepers = 0;
|
||||
for (int i = 0; i < num_safekeepers; i++) {
|
||||
if (wp_shmem->safekeeper_status[i] == 1) {
|
||||
num_active_safekeepers++;
|
||||
// Read safekeeper status from wal proposer shared memory first.
|
||||
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
|
||||
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
|
||||
// not be a huge deal.
|
||||
XLogRecPtr min_commit_lsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr max_commit_lsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr lsn;
|
||||
|
||||
wp_shmem = GetWalpropShmemState();
|
||||
SpinLockAcquire(&wp_shmem->mutex);
|
||||
|
||||
num_safekeepers = wp_shmem->num_safekeepers;
|
||||
num_active_safekeepers = 0;
|
||||
for (int i = 0; i < num_safekeepers; i++) {
|
||||
if (wp_shmem->safekeeper_status[i] == 1) {
|
||||
num_active_safekeepers++;
|
||||
// Only track the commit LSN lag among active safekeepers.
|
||||
// If there are inactive safekeepers we will raise another alert so this lag value
|
||||
// is less critical.
|
||||
lsn = wp_shmem->safekeeper_commit_lsn[i];
|
||||
if (XLogRecPtrIsInvalid(min_commit_lsn) || lsn < min_commit_lsn) {
|
||||
min_commit_lsn = lsn;
|
||||
}
|
||||
if (XLogRecPtrIsInvalid(max_commit_lsn) || lsn > max_commit_lsn) {
|
||||
max_commit_lsn = lsn;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Calculate max commit LSN lag across active safekeepers
|
||||
max_active_safekeeper_commit_lag = (XLogRecPtrIsInvalid(min_commit_lsn) ? 0 : max_commit_lsn - min_commit_lsn);
|
||||
|
||||
SpinLockRelease(&wp_shmem->mutex);
|
||||
}
|
||||
SpinLockRelease(&wp_shmem->mutex);
|
||||
}
|
||||
{
|
||||
{
|
||||
metric_t databricks_metrics[] = {
|
||||
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
|
||||
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
|
||||
@@ -469,6 +488,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
|
||||
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
|
||||
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
|
||||
{"max_active_safekeeper_commit_lag", false, 0.0, (double) max_active_safekeeper_commit_lag},
|
||||
{NULL, false, 0, 0},
|
||||
};
|
||||
for (int i = 0; databricks_metrics[i].name != NULL; i++)
|
||||
@@ -479,7 +499,6 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
}
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
pfree(metrics);
|
||||
|
||||
return (Datum) 0;
|
||||
|
||||
@@ -436,6 +436,8 @@ typedef struct WalproposerShmemState
|
||||
uint32 num_safekeepers;
|
||||
/* Per-safekeeper status flags: 0=inactive, 1=active */
|
||||
uint8 safekeeper_status[MAX_SAFEKEEPERS];
|
||||
/* Per-safekeeper commit LSN for metrics */
|
||||
XLogRecPtr safekeeper_commit_lsn[MAX_SAFEKEEPERS];
|
||||
/* END_HADRON */
|
||||
} WalproposerShmemState;
|
||||
|
||||
|
||||
@@ -2132,6 +2132,18 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
|
||||
if (wp->config->syncSafekeepers)
|
||||
return;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
if (lakebase_mode) {
|
||||
// Record safekeeper commit LSN in shared memory for lag monitoring
|
||||
{
|
||||
WalproposerShmemState *shmem = wp->api.get_shmem_state(wp);
|
||||
Assert(sk->index < MAX_SAFEKEEPERS);
|
||||
SpinLockAcquire(&shmem->mutex);
|
||||
shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
|
||||
SpinLockRelease(&shmem->mutex);
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/* handle fresh ps_feedback */
|
||||
if (sk->appendResponse.ps_feedback.present)
|
||||
@@ -2269,6 +2281,7 @@ walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_saf
|
||||
SpinLockAcquire(&shmem->mutex);
|
||||
shmem->num_safekeepers = num_safekeepers;
|
||||
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
|
||||
memset(shmem->safekeeper_commit_lsn, 0, sizeof(shmem->safekeeper_commit_lsn));
|
||||
SpinLockRelease(&shmem->mutex);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::pausable_failpoint;
|
||||
|
||||
use crate::GlobalTimelines;
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
@@ -598,6 +599,8 @@ impl WalAcceptor {
|
||||
// Note that a flush can still happen on segment bounds, which will result
|
||||
// in an AppendResponse.
|
||||
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
|
||||
// allow tests to pause AppendRequest processing to simulate lag
|
||||
pausable_failpoint!("sk-acceptor-pausable");
|
||||
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
dirty = true;
|
||||
}
|
||||
|
||||
@@ -373,6 +373,8 @@ def test_max_wal_rate(neon_simple_env: NeonEnv):
|
||||
config_lines=[
|
||||
# we need this option because default max_cluster_size < 0 will disable throttling completely
|
||||
"neon.max_cluster_size=10GB",
|
||||
# enable lakebase mode for WAL Rate Limiting
|
||||
"neon.lakebase_mode=true",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -2781,6 +2781,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
# Check that max_active_safekeeper_commit_lag metric exists and is zero with single safekeeper
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
|
||||
)
|
||||
assert cur.fetchone() == (0,), "Expected zero commit lag with one safekeeper"
|
||||
|
||||
# Get the safekeeper
|
||||
sk = env.safekeepers[0]
|
||||
@@ -2824,6 +2829,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
|
||||
)
|
||||
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
|
||||
# Check that max_active_safekeeper_commit_lag metric exists and is zero with no active safekeepers
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
|
||||
)
|
||||
assert cur.fetchone() == (0,), "Expected zero commit lag with no active safekeepers"
|
||||
|
||||
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
|
||||
# implemented didn't work as expected.
|
||||
@@ -2938,3 +2948,79 @@ def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select count(*) from t2")
|
||||
assert cur.fetchone() == (3000,)
|
||||
|
||||
|
||||
def test_max_active_safekeeper_commit_lag(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
This test validates the `max_active_safekeeper_commit_lag` metric. The
|
||||
strategy is to intentionally create a scenario where one safekeeper falls
|
||||
behind (by pausing it with a failpoint), observe that the metric correctly
|
||||
reports this lag, and then confirm that the metric returns to zero after the
|
||||
lagging safekeeper catches up (once the failpoint is removed).
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
# Create branch and start endpoint
|
||||
env.create_branch("test_commit_lsn_lag_failpoint")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_commit_lsn_lag_failpoint", config_lines=["neon.lakebase_mode=true"]
|
||||
)
|
||||
# Enable neon extension and table
|
||||
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon")
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
# Identify the lagging safekeeper and configure failpoint to pause
|
||||
lagging_sk = env.safekeepers[1]
|
||||
with lagging_sk.http_client() as http_cli:
|
||||
http_cli.configure_failpoints(("sk-acceptor-pausable", "pause"))
|
||||
|
||||
# Note: Insert could hang because the failpoint above causes the safekeepers to lose quorum.
|
||||
def run_hanging_insert():
|
||||
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,500), 'payload'")
|
||||
|
||||
# Start the insert in a background thread
|
||||
bg_thread = threading.Thread(target=run_hanging_insert)
|
||||
bg_thread.start()
|
||||
|
||||
# Wait for the lag metric to become positive
|
||||
def lag_is_positive():
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
|
||||
)
|
||||
row = cur.fetchone()
|
||||
assert row is not None, "max_active_safekeeper_commit_lag metric not found"
|
||||
lag = row[0]
|
||||
log.info(f"Current commit lag: {lag}")
|
||||
if lag == 0.0:
|
||||
raise Exception("Commit lag is still zero, trying again...")
|
||||
|
||||
# Confirm that we can observe a positive lag value
|
||||
wait_until(lag_is_positive)
|
||||
|
||||
# Unpause the failpoint so that the safekeepers sync back up. This should also unstuck the hanging insert.
|
||||
with lagging_sk.http_client() as http_cli:
|
||||
http_cli.configure_failpoints(("sk-acceptor-pausable", "off"))
|
||||
|
||||
# Wait for the hanging insert to complete
|
||||
bg_thread.join(timeout=30)
|
||||
assert not bg_thread.is_alive(), "Hanging insert did not complete within timeout"
|
||||
log.info("Hanging insert is unstuck successfully")
|
||||
|
||||
def lag_is_zero():
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
|
||||
)
|
||||
row = cur.fetchone()
|
||||
assert row is not None, (
|
||||
"max_active_safekeeper_commit_lag metric not found in lag_is_zero"
|
||||
)
|
||||
lag = row[0]
|
||||
log.info(f"Current commit lag: {lag}")
|
||||
return lag == 0.0
|
||||
|
||||
# Confirm that the lag eventually returns to zero
|
||||
wait_until(lag_is_zero)
|
||||
|
||||
Reference in New Issue
Block a user