diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 874a1590ac..b0f5828d39 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -507,19 +507,45 @@ backpressure_lag_impl(void) LSN_FORMAT_ARGS(flushPtr), LSN_FORMAT_ARGS(applyPtr)); - if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) + if (lakebase_mode) { - return (myFlushLsn - writePtr - max_replication_write_lag * MB); - } + // in case PG does not have shard map initialized, we assume PG always has 1 shard at minimum. + shardno_t num_shards = Max(1, get_num_shards()); + int tenant_max_replication_apply_lag = num_shards * max_replication_apply_lag; + int tenant_max_replication_flush_lag = num_shards * max_replication_flush_lag; + int tenant_max_replication_write_lag = num_shards * max_replication_write_lag; - if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) - { - return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); - } + if ((writePtr != InvalidXLogRecPtr && tenant_max_replication_write_lag > 0 && myFlushLsn > writePtr + tenant_max_replication_write_lag * MB)) + { + return (myFlushLsn - writePtr - tenant_max_replication_write_lag * MB); + } - if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) + if ((flushPtr != InvalidXLogRecPtr && tenant_max_replication_flush_lag > 0 && myFlushLsn > flushPtr + tenant_max_replication_flush_lag * MB)) + { + return (myFlushLsn - flushPtr - tenant_max_replication_flush_lag * MB); + } + + if ((applyPtr != InvalidXLogRecPtr && tenant_max_replication_apply_lag > 0 && myFlushLsn > applyPtr + tenant_max_replication_apply_lag * MB)) + { + return (myFlushLsn - applyPtr - tenant_max_replication_apply_lag * MB); + } + } + else { - return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); + if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) + { + return (myFlushLsn - writePtr - max_replication_write_lag * MB); + } + + if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) + { + return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); + } + + if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) + { + return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); + } } } return 0; diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index c2907d8a4f..4e46b67988 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1751,14 +1751,15 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): "max_replication_apply_lag = 0", "max_replication_flush_lag = 15MB", "neon.max_cluster_size = 10GB", + "neon.lakebase_mode = true", ], ) endpoint.respec(skip_pg_catalog_updates=False) endpoint.start() - # generate 10MB of data + # generate 20MB of data endpoint.safe_psql( - "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;" + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" ) res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0] assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"