mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
PG: smooth max wal rate (#12514)
## Problem We were only resetting the limit in the wal proposer. If backends are back pressured, it might take a while for the wal proposer to receive a new WAL to reset the limit. ## Summary of changes Backend also checks the time and resets the limit. ## How is this tested? pgbench has more smooth tps Signed-off-by: Tristan Partin <tristan.partin@databricks.com> Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com>
This commit is contained in:
@@ -431,7 +431,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
|
||||
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
|
||||
sent_bytes: 0,
|
||||
last_recorded_time_us: 0,
|
||||
last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
};
|
||||
|
||||
crate::bindings::WalproposerShmemState {
|
||||
|
||||
@@ -377,6 +377,16 @@ typedef struct PageserverFeedback
|
||||
} PageserverFeedback;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
/**
|
||||
* WAL proposer is the only backend that will update `sent_bytes` and `last_recorded_time_us`.
|
||||
* Once the `sent_bytes` reaches the limit, it puts backpressure on PG backends.
|
||||
*
|
||||
* A PG backend checks `should_limit` to see if it should hit backpressure.
|
||||
* - If yes, it also checks the `last_recorded_time_us` to see
|
||||
* if it's time to push more WALs. This is because the WAL proposer
|
||||
* only resets `should_limit` to 0 after it is notified about new WALs
|
||||
* which might take a while.
|
||||
*/
|
||||
typedef struct WalRateLimiter
|
||||
{
|
||||
/* If the value is 1, PG backends will hit backpressure. */
|
||||
@@ -384,7 +394,7 @@ typedef struct WalRateLimiter
|
||||
/* The number of bytes sent in the current second. */
|
||||
uint64 sent_bytes;
|
||||
/* The last recorded time in microsecond. */
|
||||
TimestampTz last_recorded_time_us;
|
||||
pg_atomic_uint64 last_recorded_time_us;
|
||||
} WalRateLimiter;
|
||||
/* END_HADRON */
|
||||
|
||||
|
||||
@@ -449,8 +449,20 @@ backpressure_lag_impl(void)
|
||||
}
|
||||
|
||||
state = GetWalpropShmemState();
|
||||
if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1)
|
||||
if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit))
|
||||
{
|
||||
TimestampTz now = GetCurrentTimestamp();
|
||||
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
|
||||
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
|
||||
if (now - last_recorded_time > USECS_PER_SEC)
|
||||
{
|
||||
/*
|
||||
* The backend has past 1 second since the last recorded time and it's time to push more WALs.
|
||||
* If the backends are pushing WALs too fast, the wal proposer will rate limit them again.
|
||||
*/
|
||||
uint32 expected = true;
|
||||
pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
/* END_HADRON */
|
||||
@@ -502,6 +514,7 @@ WalproposerShmemInit(void)
|
||||
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
|
||||
/* BEGIN_HADRON */
|
||||
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
|
||||
/* END_HADRON */
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
@@ -520,6 +533,7 @@ WalproposerShmemInit_SyncSafekeeper(void)
|
||||
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
|
||||
/* BEGIN_HADRON */
|
||||
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
|
||||
pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0);
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
@@ -1551,18 +1565,18 @@ XLogBroadcastWalProposer(WalProposer *wp)
|
||||
{
|
||||
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
|
||||
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
|
||||
|
||||
if (now - limiter->last_recorded_time_us > USECS_PER_SEC)
|
||||
uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us);
|
||||
if (now - last_recorded_time > USECS_PER_SEC)
|
||||
{
|
||||
/* Reset the rate limiter */
|
||||
limiter->last_recorded_time_us = now;
|
||||
limiter->sent_bytes = 0;
|
||||
pg_atomic_exchange_u32(&limiter->should_limit, 0);
|
||||
pg_atomic_write_u64(&limiter->last_recorded_time_us, now);
|
||||
pg_atomic_write_u32(&limiter->should_limit, false);
|
||||
}
|
||||
limiter->sent_bytes += (endptr - startptr);
|
||||
if (limiter->sent_bytes > max_wal_bytes)
|
||||
{
|
||||
pg_atomic_exchange_u32(&limiter->should_limit, 1);
|
||||
pg_atomic_write_u32(&limiter->should_limit, true);
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
Reference in New Issue
Block a user