From 11527b9df7a3ed7dcd3faaddbb64bd90f1739f31 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Thu, 24 Jul 2025 13:41:29 -0500 Subject: [PATCH] [BRC-2951] Enforce PG backpressure parameters at the shard level (#12694) ## Problem Currently PG backpressure parameters are enforced globally. With tenant splitting, this makes it hard to balance small tenants and large tenants. For large tenants with more shards, we need to increase the lagging because each shard receives total/shard_count amount of data, while doing so could be suboptimal to small tenants with fewer shards. ## Summary of changes This PR makes these parameters to be enforced at the shard level, i.e., PG will compute the actual lag limit by multiply the shard count. ## How is this tested? Added regression test. Co-authored-by: Chen Luo --- pgxn/neon/walproposer_pg.c | 44 ++++++++++++++++++++++------ test_runner/regress/test_sharding.py | 5 ++-- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 874a1590ac..b0f5828d39 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -507,19 +507,45 @@ backpressure_lag_impl(void) LSN_FORMAT_ARGS(flushPtr), LSN_FORMAT_ARGS(applyPtr)); - if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) + if (lakebase_mode) { - return (myFlushLsn - writePtr - max_replication_write_lag * MB); - } + // in case PG does not have shard map initialized, we assume PG always has 1 shard at minimum. + shardno_t num_shards = Max(1, get_num_shards()); + int tenant_max_replication_apply_lag = num_shards * max_replication_apply_lag; + int tenant_max_replication_flush_lag = num_shards * max_replication_flush_lag; + int tenant_max_replication_write_lag = num_shards * max_replication_write_lag; - if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) - { - return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); - } + if ((writePtr != InvalidXLogRecPtr && tenant_max_replication_write_lag > 0 && myFlushLsn > writePtr + tenant_max_replication_write_lag * MB)) + { + return (myFlushLsn - writePtr - tenant_max_replication_write_lag * MB); + } - if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) + if ((flushPtr != InvalidXLogRecPtr && tenant_max_replication_flush_lag > 0 && myFlushLsn > flushPtr + tenant_max_replication_flush_lag * MB)) + { + return (myFlushLsn - flushPtr - tenant_max_replication_flush_lag * MB); + } + + if ((applyPtr != InvalidXLogRecPtr && tenant_max_replication_apply_lag > 0 && myFlushLsn > applyPtr + tenant_max_replication_apply_lag * MB)) + { + return (myFlushLsn - applyPtr - tenant_max_replication_apply_lag * MB); + } + } + else { - return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); + if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) + { + return (myFlushLsn - writePtr - max_replication_write_lag * MB); + } + + if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) + { + return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); + } + + if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) + { + return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); + } } } return 0; diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index c2907d8a4f..4e46b67988 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1751,14 +1751,15 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): "max_replication_apply_lag = 0", "max_replication_flush_lag = 15MB", "neon.max_cluster_size = 10GB", + "neon.lakebase_mode = true", ], ) endpoint.respec(skip_pg_catalog_updates=False) endpoint.start() - # generate 10MB of data + # generate 20MB of data endpoint.safe_psql( - "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;" + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" ) res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0] assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"