diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 825a137d0f..c3be1e1dae 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -429,9 +429,11 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { }; let empty_wal_rate_limiter = crate::bindings::WalRateLimiter { + effective_max_wal_bytes_per_second: crate::bindings::pg_atomic_uint32 { value: 0 }, should_limit: crate::bindings::pg_atomic_uint32 { value: 0 }, sent_bytes: 0, - last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 }, + batch_start_time_us: crate::bindings::pg_atomic_uint64 { value: 0 }, + batch_end_time_us: crate::bindings::pg_atomic_uint64 { value: 0 }, }; crate::bindings::WalproposerShmemState { diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 19d23925a5..5507294c3b 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -389,12 +389,21 @@ typedef struct PageserverFeedback */ typedef struct WalRateLimiter { - /* If the value is 1, PG backends will hit backpressure. */ + /* The effective wal write rate. Could be changed dynamically + based on whether PG has backpressure or not.*/ + pg_atomic_uint32 effective_max_wal_bytes_per_second; + /* If the value is 1, PG backends will hit backpressure until the time has past batch_end_time_us. */ pg_atomic_uint32 should_limit; /* The number of bytes sent in the current second. */ uint64 sent_bytes; - /* The last recorded time in microsecond. */ - pg_atomic_uint64 last_recorded_time_us; + /* The timestamp when the write starts in the current batch. A batch is a time interval (e.g., )that we + track and throttle writes. Most times a batch is 1s, but it could become larger if the PG overwrites the WALs + and we will adjust the batch accordingly to compensate (e.g., if PG writes 10MB at once and max WAL write rate + is 1MB/s, then the current batch will become 10s). */ + pg_atomic_uint64 batch_start_time_us; + /* The timestamp (in the future) that the current batch should end and accept more writes + (after should_limit is set to 1). */ + pg_atomic_uint64 batch_end_time_us; } WalRateLimiter; /* END_HADRON */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index d43d372c2e..874a1590ac 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -68,6 +68,14 @@ int safekeeper_proto_version = 3; char *safekeeper_conninfo_options = ""; /* BEGIN_HADRON */ int databricks_max_wal_mb_per_second = -1; +// during throttling, we will limit the effective WAL write rate to 10KB. +// PG can still push some WAL to SK, but at a very low rate. +int databricks_throttled_max_wal_bytes_per_second = 10 * 1024; +// The max sleep time of a batch. This is to make sure the rate limiter does not +// overshoot too much and block PG for a very long time. +// This is set as 5 minuetes for now. PG can send as much as 10MB of WALs to SK in one batch, +// so this effectively caps the write rate to ~30KB/s in the worst case. +static uint64 kRateLimitMaxBatchUSecs = 300 * USECS_PER_SEC; /* END_HADRON */ /* Set to true in the walproposer bgw. */ @@ -86,6 +94,7 @@ static HotStandbyFeedback agg_hs_feedback; static void nwp_register_gucs(void); static void assign_neon_safekeepers(const char *newval, void *extra); static uint64 backpressure_lag_impl(void); +static uint64 hadron_backpressure_lag_impl(void); static uint64 startup_backpressure_wrap(void); static bool backpressure_throttling_impl(void); static void walprop_register_bgworker(void); @@ -110,9 +119,22 @@ static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk); static void CheckGracefulShutdown(WalProposer *wp); -// HADRON +/* BEGIN_HADRON */ shardno_t get_num_shards(void); +static int positive_mb_to_bytes(int mb) +{ + if (mb <= 0) + { + return mb; + } + else + { + return mb * 1024 * 1024; + } +} +/* END_HADRON */ + static void init_walprop_config(bool syncSafekeepers) { @@ -260,6 +282,16 @@ nwp_register_gucs(void) PGC_SUSET, GUC_UNIT_MB, NULL, NULL, NULL); + + DefineCustomIntVariable( + "databricks.throttled_max_wal_bytes_per_second", + "The maximum WAL bytes per second when PG is being throttled.", + NULL, + &databricks_throttled_max_wal_bytes_per_second, + 10 * 1024, 0, INT_MAX, + PGC_SUSET, + GUC_UNIT_BYTE, + NULL, NULL, NULL); /* END_HADRON */ } @@ -398,19 +430,65 @@ assign_neon_safekeepers(const char *newval, void *extra) pfree(oldval); } -/* Check if we need to suspend inserts because of lagging replication. */ -static uint64 -backpressure_lag_impl(void) +/* BEGIN_HADRON */ +static uint64 hadron_backpressure_lag_impl(void) { struct WalproposerShmemState* state = NULL; + uint64 lag = 0; - /* BEGIN_HADRON */ if(max_cluster_size < 0){ // if max cluster size is not set, then we don't apply backpressure because we're reconfiguring PG return 0; } - /* END_HADRON */ + lag = backpressure_lag_impl(); + state = GetWalpropShmemState(); + if ( state != NULL && databricks_max_wal_mb_per_second != -1 ) + { + int old_limit = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second); + int new_limit = (lag == 0)? positive_mb_to_bytes(databricks_max_wal_mb_per_second) : databricks_throttled_max_wal_bytes_per_second; + if( old_limit != new_limit ) + { + uint64 batch_start_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_start_time_us); + uint64 batch_end_time = pg_atomic_read_u64(&state->wal_rate_limiter.batch_end_time_us); + // the rate limit has changed, we need to reset the rate limiter's batch end time + pg_atomic_write_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second, new_limit); + pg_atomic_write_u64(&state->wal_rate_limiter.batch_end_time_us, Min(batch_start_time + USECS_PER_SEC, batch_end_time)); + } + if( new_limit == -1 ) + { + return 0; + } + + if (pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == true) + { + TimestampTz now = GetCurrentTimestamp(); + struct WalRateLimiter *limiter = &state->wal_rate_limiter; + uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us); + if ( now >= batch_end_time ) + { + /* + * The backend has past the batch end time and it's time to push more WALs. + * If the backends are pushing WALs too fast, the wal proposer will rate limit them again. + */ + uint32 expected = true; + pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false); + return 0; + } + return Max(lag, 1); + } + // rate limiter decides to not throttle, then return 0. + return 0; + } + + return lag; +} +/* END_HADRON */ + +/* Check if we need to suspend inserts because of lagging replication. */ +static uint64 +backpressure_lag_impl(void) +{ if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) { XLogRecPtr writePtr; @@ -444,30 +522,6 @@ backpressure_lag_impl(void) return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); } } - - /* BEGIN_HADRON */ - if (databricks_max_wal_mb_per_second == -1) { - return 0; - } - - state = GetWalpropShmemState(); - if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit)) - { - TimestampTz now = GetCurrentTimestamp(); - struct WalRateLimiter *limiter = &state->wal_rate_limiter; - uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us); - if (now - last_recorded_time > USECS_PER_SEC) - { - /* - * The backend has past 1 second since the last recorded time and it's time to push more WALs. - * If the backends are pushing WALs too fast, the wal proposer will rate limit them again. - */ - uint32 expected = true; - pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false); - } - return 1; - } - /* END_HADRON */ return 0; } @@ -482,9 +536,9 @@ startup_backpressure_wrap(void) if (AmStartupProcess() || !IsUnderPostmaster) return 0; - delay_backend_us = &backpressure_lag_impl; + delay_backend_us = &hadron_backpressure_lag_impl; - return backpressure_lag_impl(); + return hadron_backpressure_lag_impl(); } /* @@ -514,8 +568,10 @@ WalproposerShmemInit(void) pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0); /* BEGIN_HADRON */ + pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1); pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); - pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0); /* END_HADRON */ } } @@ -530,8 +586,10 @@ WalproposerShmemInit_SyncSafekeeper(void) pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0); pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); /* BEGIN_HADRON */ + pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.effective_max_wal_bytes_per_second, -1); pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); - pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_start_time_us, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.batch_end_time_us, 0); /* END_HADRON */ } @@ -563,7 +621,7 @@ backpressure_throttling_impl(void) return retry; /* Calculate replicas lag */ - lag = backpressure_lag_impl(); + lag = hadron_backpressure_lag_impl(); if (lag == 0) return retry; @@ -659,7 +717,7 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards SpinLockAcquire(&walprop_shared->mutex); - // Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive + // Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive // a new pageserver feedback. walprop_shared->num_shards = Max(walprop_shared->num_shards, num_shards); @@ -1479,6 +1537,7 @@ XLogBroadcastWalProposer(WalProposer *wp) XLogRecPtr endptr; struct WalproposerShmemState *state = NULL; TimestampTz now = 0; + int effective_max_wal_bytes_per_second = 0; /* Start from the last sent position */ startptr = sentPtr; @@ -1533,22 +1592,36 @@ XLogBroadcastWalProposer(WalProposer *wp) /* BEGIN_HADRON */ state = GetWalpropShmemState(); - if (databricks_max_wal_mb_per_second != -1 && state != NULL) + effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second); + if (effective_max_wal_bytes_per_second != -1 && state != NULL) { - uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024; struct WalRateLimiter *limiter = &state->wal_rate_limiter; - uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us); - if (now - last_recorded_time > USECS_PER_SEC) + uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us); + if ( now >= batch_end_time ) { - /* Reset the rate limiter */ + // Reset the rate limiter to start a new batch limiter->sent_bytes = 0; - pg_atomic_write_u64(&limiter->last_recorded_time_us, now); pg_atomic_write_u32(&limiter->should_limit, false); + pg_atomic_write_u64(&limiter->batch_start_time_us, now); + /* tentatively assign the batch end time as 1s from now. This could result in one of the following cases: + 1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s, + then we will reset the current batch and clear sent_bytes. No throttling happens. + 2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written, + and throttle PG until the batch end time. */ + pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC); } limiter->sent_bytes += (endptr - startptr); - if (limiter->sent_bytes > max_wal_bytes) + if (limiter->sent_bytes > effective_max_wal_bytes_per_second) { + uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us); + uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1); + if (throttle_usecs > kRateLimitMaxBatchUSecs){ + elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs); + throttle_usecs = kRateLimitMaxBatchUSecs; + } + pg_atomic_write_u32(&limiter->should_limit, true); + pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs); } } /* END_HADRON */ @@ -2052,7 +2125,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk) /* Only one main shard sends non-zero currentClusterSize */ if (sk->appendResponse.ps_feedback.currentClusterSize > 0) SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize); - + if (min_feedback.disk_consistent_lsn != standby_apply_lsn) { standby_apply_lsn = min_feedback.disk_consistent_lsn; diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index e7c7abf397..cc7f736239 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -395,23 +395,6 @@ def test_max_wal_rate(neon_simple_env: NeonEnv): tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();") assert tuples[0][0] == 0, "Backpressure throttling detected" - # 0 MB/s max_wal_rate. WAL proposer can still push some WALs but will be super slow. - endpoint.safe_psql_many( - [ - "ALTER SYSTEM SET databricks.max_wal_mb_per_second = 0;", - "SELECT pg_reload_conf();", - ] - ) - - # Write ~10 KB data should hit backpressure. - with endpoint.cursor(dbname=DBNAME) as cur: - cur.execute("SET databricks.max_wal_mb_per_second = 0;") - for _ in range(0, 10): - cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);") - - tuples = endpoint.safe_psql("SELECT backpressure_throttling_time();") - assert tuples[0][0] > 0, "No backpressure throttling detected" - # 1 MB/s max_wal_rate. endpoint.safe_psql_many( [ diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 2252c098c7..c2907d8a4f 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1508,20 +1508,55 @@ def test_sharding_split_failures( env.storage_controller.consistency_check() -@pytest.mark.skip(reason="The backpressure change has not been merged yet.") +# HADRON +def test_create_tenant_after_split(neon_env_builder: NeonEnvBuilder): + """ + Tests creating a tenant and a timeline should fail after a tenant split. + """ + env = neon_env_builder.init_start(initial_tenant_shard_count=4) + + env.storage_controller.allowed_errors.extend( + [ + ".*already exists with a different shard count.*", + ] + ) + + ep = env.endpoints.create_start("main", tenant_id=env.initial_tenant) + ep.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") + ep.safe_psql("INSERT INTO usertable VALUES (1, 'test1');") + ep.safe_psql("INSERT INTO usertable VALUES (2, 'test2');") + ep.safe_psql("INSERT INTO usertable VALUES (3, 'test3');") + + # Split the tenant + + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=8) + + with pytest.raises(RuntimeError): + env.create_tenant(env.initial_tenant, env.initial_timeline, shard_count=4) + + # run more queries + ep.safe_psql("SELECT * FROM usertable;") + ep.safe_psql("UPDATE usertable set FIELD0 = 'test4';") + + ep.stop_and_destroy() + + +# HADRON def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder): """ - Test backpressure can ignore new shards during tenant split so that if we abort the split, - PG can continue without being blocked. + Test backpressure works correctly during a shard split, especially after a split is aborted, + PG will not be stuck forever. """ - DBNAME = "regression" - - init_shard_count = 4 + init_shard_count = 1 neon_env_builder.num_pageservers = init_shard_count stripe_size = 32 env = neon_env_builder.init_start( - initial_tenant_shard_count=init_shard_count, initial_tenant_shard_stripe_size=stripe_size + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + initial_tenant_conf={ + "checkpoint_distance": 1024 * 1024 * 1024, + }, ) env.storage_controller.allowed_errors.extend( @@ -1537,19 +1572,31 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder): "main", config_lines=[ "max_replication_write_lag = 1MB", - "databricks.max_wal_mb_per_second = 1", "neon.max_cluster_size = 10GB", + "databricks.max_wal_mb_per_second=100", ], ) - endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. + endpoint.respec(skip_pg_catalog_updates=False) endpoint.start() - endpoint.safe_psql(f"CREATE DATABASE {DBNAME}") - - endpoint.safe_psql("CREATE TABLE usertable ( YCSB_KEY INT, FIELD0 TEXT);") + # generate 10MB of data + endpoint.safe_psql( + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;" + ) write_done = Event() - def write_data(write_done): + def get_write_lag(): + res = endpoint.safe_psql( + """ + SELECT + pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag + FROM neon.backpressure_lsns(); + """, + log_query=False, + ) + return res[0][0] + + def write_data(write_done: Event): while not write_done.is_set(): endpoint.safe_psql( "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False @@ -1560,35 +1607,39 @@ def test_back_pressure_during_split(neon_env_builder: NeonEnvBuilder): writer_thread.start() env.storage_controller.configure_failpoints(("shard-split-pre-complete", "return(1)")) + # sleep 10 seconds before re-activating the old shard when aborting the split. + # this is to add some backpressures to PG + env.pageservers[0].http_client().configure_failpoints( + ("attach-before-activate-sleep", "return(10000)"), + ) # split the tenant with pytest.raises(StorageControllerApiException): - env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=16) + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4) + + def check_tenant_status(): + status = ( + env.pageservers[0].http_client().tenant_status(TenantShardId(env.initial_tenant, 0, 1)) + ) + assert status["state"]["slug"] == "Active" + + wait_until(check_tenant_status) write_done.set() writer_thread.join() + log.info(f"current write lag: {get_write_lag()}") + # writing more data to page servers after split is aborted - for _i in range(5000): - endpoint.safe_psql( - "INSERT INTO usertable SELECT random(), repeat('a', 1000);", log_query=False - ) + with endpoint.cursor() as cur: + for _i in range(1000): + cur.execute("INSERT INTO usertable SELECT random(), repeat('a', 1000);") # wait until write lag becomes 0 def check_write_lag_is_zero(): - res = endpoint.safe_psql( - """ - SELECT - pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag - FROM neon.backpressure_lsns(); - """, - dbname="databricks_system", - log_query=False, - ) - log.info(f"received_lsn_lag = {res[0][0]}") - assert res[0][0] == 0 + res = get_write_lag() + assert res == 0 wait_until(check_write_lag_is_zero) - endpoint.stop_and_destroy() # BEGIN_HADRON @@ -1674,7 +1725,6 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder): # HADRON -@pytest.mark.skip(reason="The backpressure change has not been merged yet.") def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): """ Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level. @@ -1703,20 +1753,16 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): "neon.max_cluster_size = 10GB", ], ) - endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. + endpoint.respec(skip_pg_catalog_updates=False) endpoint.start() - # generate 20MB of data + # generate 10MB of data endpoint.safe_psql( - "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;" ) - res = endpoint.safe_psql( - "SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system" - )[0] + res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0] assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}" - endpoint.stop() - # HADRON def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder): @@ -1880,14 +1926,14 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder): shards_info() for _write_iter in range(30): - # approximately 1MB of data - workload.write_rows(8000, upload=False) + # approximately 10MB of data + workload.write_rows(80000, upload=False) update_write_lsn() infos = shards_info() min_lsn = min(Lsn(info["last_record_lsn"]) for info in infos) max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos) diff = max_lsn - min_lsn - assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure" + assert diff < 8 * 1024 * 1024, f"LSN diff={diff}, expected diff < 8MB due to backpressure" def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):