added lakebase_mode wrappers

This commit is contained in:
Suhas Thalanki
2025-07-30 11:12:56 -04:00
parent 591fc820c9
commit c8b02ed3f1

View File

@@ -1623,37 +1623,39 @@ XLogBroadcastWalProposer(WalProposer *wp)
return;
/* BEGIN_HADRON */
state = GetWalpropShmemState();
effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
if (effective_max_wal_bytes_per_second != -1 && state != NULL)
{
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if ( now >= batch_end_time )
if (lakebase_mode) {
state = GetWalpropShmemState();
effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second);
if (effective_max_wal_bytes_per_second != -1 && state != NULL)
{
// Reset the rate limiter to start a new batch
limiter->sent_bytes = 0;
pg_atomic_write_u32(&limiter->should_limit, false);
pg_atomic_write_u64(&limiter->batch_start_time_us, now);
/* tentatively assign the batch end time as 1s from now. This could result in one of the following cases:
1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s,
then we will reset the current batch and clear sent_bytes. No throttling happens.
2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written,
and throttle PG until the batch end time. */
pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > effective_max_wal_bytes_per_second)
{
uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us);
uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1);
if (throttle_usecs > kRateLimitMaxBatchUSecs){
elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs);
throttle_usecs = kRateLimitMaxBatchUSecs;
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us);
if ( now >= batch_end_time )
{
// Reset the rate limiter to start a new batch
limiter->sent_bytes = 0;
pg_atomic_write_u32(&limiter->should_limit, false);
pg_atomic_write_u64(&limiter->batch_start_time_us, now);
/* tentatively assign the batch end time as 1s from now. This could result in one of the following cases:
1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s,
then we will reset the current batch and clear sent_bytes. No throttling happens.
2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written,
and throttle PG until the batch end time. */
pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > effective_max_wal_bytes_per_second)
{
uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us);
uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1);
if (throttle_usecs > kRateLimitMaxBatchUSecs){
elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs);
throttle_usecs = kRateLimitMaxBatchUSecs;
}
pg_atomic_write_u32(&limiter->should_limit, true);
pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs);
pg_atomic_write_u32(&limiter->should_limit, true);
pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs);
}
}
}
/* END_HADRON */
@@ -2133,13 +2135,15 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
return;
/* BEGIN_HADRON */
// Record safekeeper commit LSN in shared memory for lag monitoring
{
WalproposerShmemState *shmem = wp->api.get_shmem_state(wp);
Assert(sk->index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
SpinLockRelease(&shmem->mutex);
if (lakebase_mode) {
// Record safekeeper commit LSN in shared memory for lag monitoring
{
WalproposerShmemState *shmem = wp->api.get_shmem_state(wp);
Assert(sk->index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
SpinLockRelease(&shmem->mutex);
}
}
/* END_HADRON */