Set an upper limit on PG backpressure throttling (#12675)

## Problem
Tenant split test revealed another bug with PG backpressure throttling
that under some cases PS may never report its progress back to SK (e.g.,
observed when aborting tenant shard where the old shard needs to
re-establish SK connection and re-ingest WALs from a much older LSN). In
this case, PG may get stuck forever.

## Summary of changes
As a general precaution that PS feedback mechanism may not always be
reliable, this PR uses the previously introduced WAL write rate limit
mechanism to slow down write rates instead of completely pausing it. The
idea is to introduce a new
`databricks_effective_max_wal_bytes_per_second`, which is set to
`databricks_max_wal_mb_per_second` when no PS back pressure and is set
to `10KB` when there is back pressure. This way, PG can still write to
SK, though at a very low speed.

The PR also fixes the problem that the current WAL rate limiting
mechanism is too coarse grained and cannot enforce limits < 1MB. This is
because it always resets the rate limiter after 1 second, even if PG
could have written more data in the past second. The fix is to introduce
a `batch_end_time_us` which records the expected end time of the current
batch. For example, if PG writes 10MB of data in a single batch, and max
WAL write rate is set as `1MB/s`, then `batch_end_time_us` will be set
as 10 seconds later.

## How is this tested?
Tweaked the existing test, and also did manual testing on dev. I set
`max_replication_flush_lag` as 1GB, and loaded 500GB pgbench tables.
It's expected to see PG gets throttled periodically because PS will
accumulate 4GB of data before flushing.

Results:
when PG is throttled:
```
9500000 of 3300000000 tuples (0%) done (elapsed 10.36 s, remaining 3587.62 s)
9600000 of 3300000000 tuples (0%) done (elapsed 124.07 s, remaining 42523.59 s)
9700000 of 3300000000 tuples (0%) done (elapsed 255.79 s, remaining 86763.97 s)
9800000 of 3300000000 tuples (0%) done (elapsed 315.89 s, remaining 106056.52 s)
9900000 of 3300000000 tuples (0%) done (elapsed 412.75 s, remaining 137170.58 s)
```

when PS just flushed:
```
18100000 of 3300000000 tuples (0%) done (elapsed 433.80 s, remaining 78655.96 s)
18200000 of 3300000000 tuples (0%) done (elapsed 433.85 s, remaining 78231.71 s)
18300000 of 3300000000 tuples (0%) done (elapsed 433.90 s, remaining 77810.62 s)
18400000 of 3300000000 tuples (0%) done (elapsed 433.96 s, remaining 77395.86 s)
18500000 of 3300000000 tuples (0%) done (elapsed 434.03 s, remaining 76987.27 s)
18600000 of 3300000000 tuples (0%) done (elapsed 434.08 s, remaining 76579.59 s)
18700000 of 3300000000 tuples (0%) done (elapsed 434.13 s, remaining 76177.12 s)
18800000 of 3300000000 tuples (0%) done (elapsed 434.19 s, remaining 75779.45 s)
18900000 of 3300000000 tuples (0%) done (elapsed 434.84 s, remaining 75489.40 s)
19000000 of 3300000000 tuples (0%) done (elapsed 434.89 s, remaining 75097.90 s)
19100000 of 3300000000 tuples (0%) done (elapsed 434.94 s, remaining 74712.56 s)
19200000 of 3300000000 tuples (0%) done (elapsed 498.93 s, remaining 85254.20 s)
19300000 of 3300000000 tuples (0%) done (elapsed 498.97 s, remaining 84817.95 s)
19400000 of 3300000000 tuples (0%) done (elapsed 623.80 s, remaining 105486.76 s)
19500000 of 3300000000 tuples (0%) done (elapsed 745.86 s, remaining 125476.51 s)
```

Co-authored-by: Chen Luo <chen.luo@databricks.com>
This commit is contained in:
Tristan Partin
2025-07-23 17:37:27 -05:00
committed by GitHub
parent 12e87d7a9f
commit 9b2e6f862a
5 changed files with 220 additions and 107 deletions

View File

@@ -429,9 +429,11 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
}; };
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter { let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
effective_max_wal_bytes_per_second: crate::bindings::pg_atomic_uint32 { value: 0 },
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 }, should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0, sent_bytes: 0,
last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 }, batch_start_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
batch_end_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
}; };
crate::bindings::WalproposerShmemState { crate::bindings::WalproposerShmemState {

View File

@@ -389,12 +389,21 @@ typedef struct PageserverFeedback
*/ */
typedef struct WalRateLimiter typedef struct WalRateLimiter
{ {
/* If the value is 1, PG backends will hit backpressure. */ /* The effective wal write rate. Could be changed dynamically
based on whether PG has backpressure or not.*/
pg_atomic_uint32 effective_max_wal_bytes_per_second;
/* If the value is 1, PG backends will hit backpressure until the time has past batch_end_time_us. */
pg_atomic_uint32 should_limit; pg_atomic_uint32 should_limit;
/* The number of bytes sent in the current second. */ /* The number of bytes sent in the current second. */
uint64 sent_bytes; uint64 sent_bytes;
/* The last recorded time in microsecond. */ /* The timestamp when the write starts in the current batch. A batch is a time interval (e.g., )that we
pg_atomic_uint64 last_recorded_time_us; track and throttle writes. Most times a batch is 1s, but it could become larger if the PG overwrites the WALs
and we will adjust the batch accordingly to compensate (e.g., if PG writes 10MB at once and max WAL write rate
is 1MB/s, then the current batch will become 10s). */
pg_atomic_uint64 batch_start_time_us;
/* The timestamp (in the future) that the current batch should end and accept more writes
(after should_limit is set to 1). */
pg_atomic_uint64 batch_end_time_us;
} WalRateLimiter; } WalRateLimiter;
/* END_HADRON */ /* END_HADRON */

View File

@@ -68,6 +68,14 @@ int safekeeper_proto_version = 3;
char *safekeeper_conninfo_options = ""; char *safekeeper_conninfo_options = "";
/* BEGIN_HADRON */ /* BEGIN_HADRON */
int databricks_max_wal_mb_per_second = -1; int databricks_max_wal_mb_per_second = -1;
// during throttling, we will limit the effective WAL write rate to 10KB.
// PG can still push some WAL to SK, but at a very low rate.
int databricks_throttled_max_wal_bytes_per_second = 10 * 1024;
// The max sleep time of a batch. This is to make sure the rate limiter does not
// overshoot too much and block PG for a very long time.
// This is set as 5 minuetes for now. PG can send as much as 10MB of WALs to SK in one batch,
// so this effectively caps the write rate to ~30KB/s in the worst case.
static uint64 kRateLimitMaxBatchUSecs = 300 * USECS_PER_SEC;
/* END_HADRON */ /* END_HADRON */
/* Set to true in the walproposer bgw. */ /* Set to true in the walproposer bgw. */
@@ -86,6 +94,7 @@ static HotStandbyFeedback agg_hs_feedback;
static void nwp_register_gucs(void); static void nwp_register_gucs(void);
static void assign_neon_safekeepers(const char *newval, void *extra); static void assign_neon_safekeepers(const char *newval, void *extra);
static uint64 backpressure_lag_impl(void); static uint64 backpressure_lag_impl(void);
static uint64 hadron_backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void); static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void); static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(void); static void walprop_register_bgworker(void);
@@ -110,9 +119,22 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);
static void CheckGracefulShutdown(WalProposer *wp); static void CheckGracefulShutdown(WalProposer *wp);
// HADRON /* BEGIN_HADRON */
shardno_t get_num_shards(void); shardno_t get_num_shards(void);
static int positive_mb_to_bytes(int mb)
{
if (mb <= 0)
{
return mb;
}
else
{
return mb * 1024 * 1024;
}
}
/* END_HADRON */
static void static void
init_walprop_config(bool syncSafekeepers) init_walprop_config(bool syncSafekeepers)
{ {
@@ -260,6 +282,16 @@ nwp_register_gucs(void)
PGC_SUSET, PGC_SUSET,
GUC_UNIT_MB, GUC_UNIT_MB,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomIntVariable(
"databricks.throttled_max_wal_bytes_per_second",
"The maximum WAL bytes per second when PG is being throttled.",
NULL,
&databricks_throttled_max_wal_bytes_per_second,
10 * 1024, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_BYTE,
NULL, NULL, NULL);
/* END_HADRON */ /* END_HADRON */
} }
@@ -398,19 +430,65 @@ assign_neon_safekeepers(const char *newval, void *extra)
pfree(oldval); pfree(oldval);
} }
/* Check if we need to suspend inserts because of lagging replication. */ /* BEGIN_HADRON */
static uint64 static uint64 hadron_backpressure_lag_impl(void)
backpressure_lag_impl(void)
{ {
struct WalproposerShmemState* state = NULL; struct WalproposerShmemState* state = NULL;
uint64 lag = 0;
/* BEGIN_HADRON */
if(max_cluster_size < 0){ if(max_cluster_size < 0){
// if max cluster size is not set, then we don't apply backpressure because we're reconfiguring PG // if max cluster size is not set, then we don't apply backpressure because we're reconfiguring PG
return 0; return 0;
} }
/* END_HADRON */
lag = backpressure_lag_impl();
state = GetWalpropShmemState();
if ( state != NULL && databricks_max_wal_mb_per_second != -1 )
{
int old_limit = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
int new_limit = (lag == 0)? positive_mb_to_bytes(databricks_max_wal_mb_per_second) : databricks_throttled_max_wal_bytes_per_second;
if( old_limit != new_limit )
{
uint64 batch_start_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_start_time_us);
uint64 batch_end_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_end_time_us);
// the rate limit has changed, we need to reset the rate limiter's batch end time
pg_atomic_write_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second, new_limit);
pg_atomic_write_u64(&state->wal_rate_limiter.batch_end_time_us, Min(batch_start_time + USECS_PER_SEC, batch_end_time));
}
if( new_limit == -1 )
{
return 0;
}
if (pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == true)
{
TimestampTz now = GetCurrentTimestamp();
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if ( now >= batch_end_time )
{
/*
* The backend has past the batch end time and it's time to push more WALs.
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
*/
uint32 expected = true;
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
return 0;
}
return Max(lag, 1);
}
// rate limiter decides to not throttle, then return 0.
return 0;
}
return lag;
}
/* END_HADRON */
/* Check if we need to suspend inserts because of lagging replication. */
static uint64
backpressure_lag_impl(void)
{
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{ {
XLogRecPtr writePtr; XLogRecPtr writePtr;
@@ -444,30 +522,6 @@ backpressure_lag_impl(void)
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
} }
} }
/* BEGIN_HADRON */
if (databricks_max_wal_mb_per_second == -1) {
return 0;
}
state = GetWalpropShmemState();
if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit))
{
TimestampTz now = GetCurrentTimestamp();
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
if (now - last_recorded_time > USECS_PER_SEC)
{
/*
* The backend has past 1 second since the last recorded time and it's time to push more WALs.
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
*/
uint32 expected = true;
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
}
return 1;
}
/* END_HADRON */
return 0; return 0;
} }
@@ -482,9 +536,9 @@ startup_backpressure_wrap(void)
if (AmStartupProcess() || !IsUnderPostmaster) if (AmStartupProcess() || !IsUnderPostmaster)
return 0; return 0;
delay_backend_us = &backpressure_lag_impl; delay_backend_us = &hadron_backpressure_lag_impl;
return backpressure_lag_impl(); return hadron_backpressure_lag_impl();
} }
/* /*
@@ -514,8 +568,10 @@ WalproposerShmemInit(void)
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0); pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
/* BEGIN_HADRON */ /* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1);
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0);
/* END_HADRON */ /* END_HADRON */
} }
} }
@@ -530,8 +586,10 @@ WalproposerShmemInit_SyncSafekeeper(void)
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
/* BEGIN_HADRON */ /* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1);
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0);
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0);
/* END_HADRON */ /* END_HADRON */
} }
@@ -563,7 +621,7 @@ backpressure_throttling_impl(void)
return retry; return retry;
/* Calculate replicas lag */ /* Calculate replicas lag */
lag = backpressure_lag_impl(); lag = hadron_backpressure_lag_impl();
if (lag == 0) if (lag == 0)
return retry; return retry;
@@ -659,7 +717,7 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards
SpinLockAcquire(&walprop_shared->mutex); SpinLockAcquire(&walprop_shared->mutex);
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive // Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
// a new pageserver feedback. // a new pageserver feedback.
walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards); walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards);
@@ -1479,6 +1537,7 @@ XLogBroadcastWalProposer(WalProposer *wp)
XLogRecPtr endptr; XLogRecPtr endptr;
struct WalproposerShmemState *state = NULL; struct WalproposerShmemState *state = NULL;
TimestampTz now = 0; TimestampTz now = 0;
int effective_max_wal_bytes_per_second = 0;
/* Start from the last sent position */ /* Start from the last sent position */
startptr = sentPtr; startptr = sentPtr;
@@ -1533,22 +1592,36 @@ XLogBroadcastWalProposer(WalProposer *wp)
/* BEGIN_HADRON */ /* BEGIN_HADRON */
state = GetWalpropShmemState(); state = GetWalpropShmemState();
if (databricks_max_wal_mb_per_second != -1 && state != NULL) effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
if (effective_max_wal_bytes_per_second != -1 && state != NULL)
{ {
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
struct WalRateLimiter *limiter = &state->wal_rate_limiter; struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us); uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if (now - last_recorded_time > USECS_PER_SEC) if ( now >= batch_end_time )
{ {
/* Reset the rate limiter */ // Reset the rate limiter to start a new batch
limiter->sent_bytes = 0; limiter->sent_bytes = 0;
pg_atomic_write_u64(&limiter->last_recorded_time_us, now);
pg_atomic_write_u32(&limiter->should_limit, false); pg_atomic_write_u32(&limiter->should_limit, false);
pg_atomic_write_u64(&limiter->batch_start_time_us, now);
/* tentatively assign the batch end time as 1s from now. This could result in one of the following cases:
1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s,
then we will reset the current batch and clear sent_bytes. No throttling happens.
2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written,
and throttle PG until the batch end time. */
pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC);
} }
limiter->sent_bytes += (endptr - startptr); limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > max_wal_bytes) if (limiter->sent_bytes > effective_max_wal_bytes_per_second)
{ {
uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us);
uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1);
if (throttle_usecs > kRateLimitMaxBatchUSecs){
elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs);
throttle_usecs = kRateLimitMaxBatchUSecs;
}
pg_atomic_write_u32(&limiter->should_limit, true); pg_atomic_write_u32(&limiter->should_limit, true);
pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs);
} }
} }
/* END_HADRON */ /* END_HADRON */
@@ -2052,7 +2125,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
/* Only one main shard sends non-zero currentClusterSize */ /* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0) if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize); SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn) if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
{ {
standby_apply_lsn = min_feedback.disk_consistent_lsn; standby_apply_lsn = min_feedback.disk_consistent_lsn;

View File

@@ -395,23 +395,6 @@ def test_max_wal_rate(neon_simple_env: NeonEnv):
tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();") tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();")
assert tuples[0][0] == 0, "Backpressure throttling detected" assert tuples[0][0] == 0, "Backpressure throttling detected"
# 0 MB/s max_wal_rate. WAL proposer can still push some WALs but will be super slow.
endpoint.safe_psql_many(
[
"ALTER SYSTEM SET databricks.max_wal_mb_per_second = 0;",
"SELECT pg_reload_conf();",
]
)
# Write ~10 KB data should hit backpressure.
with endpoint.cursor(dbname=DBNAME) as cur:
cur.execute("SET databricks.max_wal_mb_per_second = 0;")
for _ in range(0, 10):
cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);")
tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();")
assert tuples[0][0] > 0, "No backpressure throttling detected"
# 1 MB/s max_wal_rate. # 1 MB/s max_wal_rate.
endpoint.safe_psql_many( endpoint.safe_psql_many(
[ [

View File

@@ -1508,20 +1508,55 @@ def test_sharding_split_failures(
env.storage_controller.consistency_check() env.storage_controller.consistency_check()
@pytest.mark.skip(reason="The backpressure change has not been merged yet.") # HADRON
def test_create_tenant_after_split(neon_env_builder: NeonEnvBuilder):
"""
Tests creating a tenant and a timeline should fail after a tenant split.
"""
env = neon_env_builder.init_start(initial_tenant_shard_count=4)
env.storage_controller.allowed_errors.extend(
[
".*already exists with a different shard count.*",
]
)
ep = env.endpoints.create_start("main", tenant_id=env.initial_tenant)
ep.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);")
ep.safe_psql("INSERT INTO usertable VALUES (1, 'test1');")
ep.safe_psql("INSERT INTO usertable VALUES (2, 'test2');")
ep.safe_psql("INSERT INTO usertable VALUES (3, 'test3');")
# Split the tenant
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=8)
with pytest.raises(RuntimeError):
env.create_tenant(env.initial_tenant, env.initial_timeline, shard_count=4)
# run more queries
ep.safe_psql("SELECT * FROM usertable;")
ep.safe_psql("UPDATE usertable set FIELD0 = 'test4';")
ep.stop_and_destroy()
# HADRON
def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder): def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
""" """
Test backpressure can ignore new shards during tenant split so that if we abort the split, Test backpressure works correctly during a shard split, especially after a split is aborted,
PG can continue without being blocked. PG will not be stuck forever.
""" """
DBNAME = "regression" init_shard_count = 1
init_shard_count = 4
neon_env_builder.num_pageservers = init_shard_count neon_env_builder.num_pageservers = init_shard_count
stripe_size = 32 stripe_size = 32
env = neon_env_builder.init_start( env = neon_env_builder.init_start(
initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size initial_tenant_shard_count=init_shard_count,
initial_tenant_shard_stripe_size=stripe_size,
initial_tenant_conf={
"checkpoint_distance": 1024 * 1024 * 1024,
},
) )
env.storage_controller.allowed_errors.extend( env.storage_controller.allowed_errors.extend(
@@ -1537,19 +1572,31 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
"main", "main",
config_lines=[ config_lines=[
"max_replication_write_lag = 1MB", "max_replication_write_lag = 1MB",
"databricks.max_wal_mb_per_second = 1",
"neon.max_cluster_size = 10GB", "neon.max_cluster_size = 10GB",
"databricks.max_wal_mb_per_second=100",
], ],
) )
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start() endpoint.start()
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}") # generate 10MB of data
endpoint.safe_psql(
endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
)
write_done = Event() write_done = Event()
def write_data(write_done): def get_write_lag():
res = endpoint.safe_psql(
"""
SELECT
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM neon.backpressure_lsns();
""",
log_query=False,
)
return res[0][0]
def write_data(write_done: Event):
while not write_done.is_set(): while not write_done.is_set():
endpoint.safe_psql( endpoint.safe_psql(
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False
@@ -1560,35 +1607,39 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder):
writer_thread.start() writer_thread.start()
env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)")) env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)"))
# sleep 10 seconds before re-activating the old shard when aborting the split.
# this is to add some backpressures to PG
env.pageservers[0].http_client().configure_failpoints(
("attach-before-activate-sleep", "return(10000)"),
)
# split the tenant # split the tenant
with pytest.raises(StorageControllerApiException): with pytest.raises(StorageControllerApiException):
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16) env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4)
def check_tenant_status():
status = (
env.pageservers[0].http_client().tenant_status(TenantShardId(env.initial_tenant, 0, 1))
)
assert status["state"]["slug"] == "Active"
wait_until(check_tenant_status)
write_done.set() write_done.set()
writer_thread.join() writer_thread.join()
log.info(f"current write lag: {get_write_lag()}")
# writing more data to page servers after split is aborted # writing more data to page servers after split is aborted
for _i in range(5000): with endpoint.cursor() as cur:
endpoint.safe_psql( for _i in range(1000):
"INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);")
)
# wait until write lag becomes 0 # wait until write lag becomes 0
def check_write_lag_is_zero(): def check_write_lag_is_zero():
res = endpoint.safe_psql( res = get_write_lag()
""" assert res == 0
SELECT
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM neon.backpressure_lsns();
""",
dbname="databricks_system",
log_query=False,
)
log.info(f"received_lsn_lag = {res[0][0]}")
assert res[0][0] == 0
wait_until(check_write_lag_is_zero) wait_until(check_write_lag_is_zero)
endpoint.stop_and_destroy()
# BEGIN_HADRON # BEGIN_HADRON
@@ -1674,7 +1725,6 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder):
# HADRON # HADRON
@pytest.mark.skip(reason="The backpressure change has not been merged yet.")
def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
""" """
Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level. Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level.
@@ -1703,20 +1753,16 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"neon.max_cluster_size = 10GB", "neon.max_cluster_size = 10GB",
], ],
) )
endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start() endpoint.start()
# generate 20MB of data # generate 10MB of data
endpoint.safe_psql( endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
) )
res = endpoint.safe_psql( res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0]
"SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system"
)[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}" assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"
endpoint.stop()
# HADRON # HADRON
def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder): def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder):
@@ -1880,14 +1926,14 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
shards_info() shards_info()
for _write_iter in range(30): for _write_iter in range(30):
# approximately 1MB of data # approximately 10MB of data
workload.write_rows(8000, upload=False) workload.write_rows(80000, upload=False)
update_write_lsn() update_write_lsn()
infos = shards_info() infos = shards_info()
min_lsn = min(Lsn(info["last_record_lsn"]) for info in infos) min_lsn = min(Lsn(info["last_record_lsn"]) for info in infos)
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos) max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure" assert diff < 8 * 1024 * 1024, f"LSN diff={diff}, expected diff < 8MB due to backpressure"
def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder): def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):