diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index 5f856a44d4..825a137d0f 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -431,7 +431,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { let empty_wal_rate_limiter = crate::bindings::WalRateLimiter { should_limit: crate::bindings::pg_atomic_uint32 { value: 0 }, sent_bytes: 0, - last_recorded_time_us: 0, + last_recorded_time_us: crate::bindings::pg_atomic_uint64 { value: 0 }, }; crate::bindings::WalproposerShmemState { diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index e3a4022664..19d23925a5 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -377,6 +377,16 @@ typedef struct PageserverFeedback } PageserverFeedback; /* BEGIN_HADRON */ +/** + * WAL proposer is the only backend that will update `sent_bytes` and `last_recorded_time_us`. + * Once the `sent_bytes` reaches the limit, it puts backpressure on PG backends. + * + * A PG backend checks `should_limit` to see if it should hit backpressure. + * - If yes, it also checks the `last_recorded_time_us` to see + * if it's time to push more WALs. This is because the WAL proposer + * only resets `should_limit` to 0 after it is notified about new WALs + * which might take a while. + */ typedef struct WalRateLimiter { /* If the value is 1, PG backends will hit backpressure. */ @@ -384,7 +394,7 @@ typedef struct WalRateLimiter /* The number of bytes sent in the current second. */ uint64 sent_bytes; /* The last recorded time in microsecond. */ - TimestampTz last_recorded_time_us; + pg_atomic_uint64 last_recorded_time_us; } WalRateLimiter; /* END_HADRON */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index aaf8f43eeb..18655d4c6c 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -449,8 +449,20 @@ backpressure_lag_impl(void) } state = GetWalpropShmemState(); - if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1) + if (state != NULL && !!pg_atomic_read_u32(&state->wal_rate_limiter.should_limit)) { + TimestampTz now = GetCurrentTimestamp(); + struct WalRateLimiter *limiter = &state->wal_rate_limiter; + uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us); + if (now - last_recorded_time > USECS_PER_SEC) + { + /* + * The backend has past 1 second since the last recorded time and it's time to push more WALs. + * If the backends are pushing WALs too fast, the wal proposer will rate limit them again. + */ + uint32 expected = true; + pg_atomic_compare_exchange_u32(&state->wal_rate_limiter.should_limit, &expected, false); + } return 1; } /* END_HADRON */ @@ -502,6 +514,7 @@ WalproposerShmemInit(void) pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0); /* BEGIN_HADRON */ pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); /* END_HADRON */ } LWLockRelease(AddinShmemInitLock); @@ -520,6 +533,7 @@ WalproposerShmemInit_SyncSafekeeper(void) pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); /* BEGIN_HADRON */ pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0); + pg_atomic_init_u64(&walprop_shared->wal_rate_limiter.last_recorded_time_us, 0); /* END_HADRON */ } @@ -1551,18 +1565,18 @@ XLogBroadcastWalProposer(WalProposer *wp) { uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024; struct WalRateLimiter *limiter = &state->wal_rate_limiter; - - if (now - limiter->last_recorded_time_us > USECS_PER_SEC) + uint64 last_recorded_time = pg_atomic_read_u64(&limiter->last_recorded_time_us); + if (now - last_recorded_time > USECS_PER_SEC) { /* Reset the rate limiter */ - limiter->last_recorded_time_us = now; limiter->sent_bytes = 0; - pg_atomic_exchange_u32(&limiter->should_limit, 0); + pg_atomic_write_u64(&limiter->last_recorded_time_us, now); + pg_atomic_write_u32(&limiter->should_limit, false); } limiter->sent_bytes += (endptr - startptr); if (limiter->sent_bytes > max_wal_bytes) { - pg_atomic_exchange_u32(&limiter->should_limit, 1); + pg_atomic_write_u32(&limiter->should_limit, true); } } /* END_HADRON */