[BRC-2951] Enforce PG backpressure parameters at the shard level (#12694)

## Problem
Currently PG backpressure parameters are enforced globally. With tenant
splitting, this makes it hard to balance small tenants and large
tenants. For large tenants with more shards, we need to increase the
lagging because each shard receives total/shard_count amount of data,
while doing so could be suboptimal to small tenants with fewer shards.

## Summary of changes
This PR makes these parameters to be enforced at the shard level, i.e.,
PG will compute the actual lag limit by multiply the shard count.

## How is this tested?
Added regression test.

Co-authored-by: Chen Luo <chen.luo@databricks.com>
This commit is contained in:
Tristan Partin
2025-07-24 13:41:29 -05:00
committed by GitHub
parent 89554af1bd
commit 11527b9df7
2 changed files with 38 additions and 11 deletions

View File

@@ -507,19 +507,45 @@ backpressure_lag_impl(void)
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr));
if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB))
if (lakebase_mode)
{
return (myFlushLsn - writePtr - max_replication_write_lag * MB);
}
// in case PG does not have shard map initialized, we assume PG always has 1 shard at minimum.
shardno_t num_shards = Max(1, get_num_shards());
int tenant_max_replication_apply_lag = num_shards * max_replication_apply_lag;
int tenant_max_replication_flush_lag = num_shards * max_replication_flush_lag;
int tenant_max_replication_write_lag = num_shards * max_replication_write_lag;
if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB))
{
return (myFlushLsn - flushPtr - max_replication_flush_lag * MB);
}
if ((writePtr != InvalidXLogRecPtr && tenant_max_replication_write_lag > 0 && myFlushLsn > writePtr + tenant_max_replication_write_lag * MB))
{
return (myFlushLsn - writePtr - tenant_max_replication_write_lag * MB);
}
if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB))
if ((flushPtr != InvalidXLogRecPtr && tenant_max_replication_flush_lag > 0 && myFlushLsn > flushPtr + tenant_max_replication_flush_lag * MB))
{
return (myFlushLsn - flushPtr - tenant_max_replication_flush_lag * MB);
}
if ((applyPtr != InvalidXLogRecPtr && tenant_max_replication_apply_lag > 0 && myFlushLsn > applyPtr + tenant_max_replication_apply_lag * MB))
{
return (myFlushLsn - applyPtr - tenant_max_replication_apply_lag * MB);
}
}
else
{
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB))
{
return (myFlushLsn - writePtr - max_replication_write_lag * MB);
}
if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB))
{
return (myFlushLsn - flushPtr - max_replication_flush_lag * MB);
}
if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB))
{
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
}
}
return 0;

View File

@@ -1751,14 +1751,15 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"max_replication_apply_lag = 0",
"max_replication_flush_lag = 15MB",
"neon.max_cluster_size = 10GB",
"neon.lakebase_mode = true",
],
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# generate 10MB of data
# generate 20MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;"
)
res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"