diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 8ffac13daf..29d7252120 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1623,37 +1623,39 @@ XLogBroadcastWalProposer(WalProposer *wp) return; /* BEGIN_HADRON */ - state = GetWalpropShmemState(); - effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second); - if (effective_max_wal_bytes_per_second != -1 && state != NULL) - { - struct WalRateLimiter *limiter = &state->wal_rate_limiter; - uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us); - if ( now >= batch_end_time ) + if (lakebase_mode) { + state = GetWalpropShmemState(); + effective_max_wal_bytes_per_second = pg_atomic_read_u32(&state->wal_rate_limiter.effective_max_wal_bytes_per_second); + if (effective_max_wal_bytes_per_second != -1 && state != NULL) { - // Reset the rate limiter to start a new batch - limiter->sent_bytes = 0; - pg_atomic_write_u32(&limiter->should_limit, false); - pg_atomic_write_u64(&limiter->batch_start_time_us, now); - /* tentatively assign the batch end time as 1s from now. This could result in one of the following cases: - 1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s, - then we will reset the current batch and clear sent_bytes. No throttling happens. - 2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written, - and throttle PG until the batch end time. */ - pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC); - } - limiter->sent_bytes += (endptr - startptr); - if (limiter->sent_bytes > effective_max_wal_bytes_per_second) - { - uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us); - uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1); - if (throttle_usecs > kRateLimitMaxBatchUSecs){ - elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs); - throttle_usecs = kRateLimitMaxBatchUSecs; + struct WalRateLimiter *limiter = &state->wal_rate_limiter; + uint64 batch_end_time = pg_atomic_read_u64(&limiter->batch_end_time_us); + if ( now >= batch_end_time ) + { + // Reset the rate limiter to start a new batch + limiter->sent_bytes = 0; + pg_atomic_write_u32(&limiter->should_limit, false); + pg_atomic_write_u64(&limiter->batch_start_time_us, now); + /* tentatively assign the batch end time as 1s from now. This could result in one of the following cases: + 1. If sent_bytes does not reach effective_max_wal_bytes_per_second in 1s, + then we will reset the current batch and clear sent_bytes. No throttling happens. + 2. Otherwise, we will recompute the end time (below) based on how many bytes are actually written, + and throttle PG until the batch end time. */ + pg_atomic_write_u64(&limiter->batch_end_time_us, now + USECS_PER_SEC); } + limiter->sent_bytes += (endptr - startptr); + if (limiter->sent_bytes > effective_max_wal_bytes_per_second) + { + uint64_t batch_start_time = pg_atomic_read_u64(&limiter->batch_start_time_us); + uint64 throttle_usecs = USECS_PER_SEC * limiter->sent_bytes / Max(effective_max_wal_bytes_per_second, 1); + if (throttle_usecs > kRateLimitMaxBatchUSecs){ + elog(LOG, "throttle_usecs %lu is too large, limiting to %lu", throttle_usecs, kRateLimitMaxBatchUSecs); + throttle_usecs = kRateLimitMaxBatchUSecs; + } - pg_atomic_write_u32(&limiter->should_limit, true); - pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs); + pg_atomic_write_u32(&limiter->should_limit, true); + pg_atomic_write_u64(&limiter->batch_end_time_us, batch_start_time + throttle_usecs); + } } } /* END_HADRON */ @@ -2133,13 +2135,15 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk) return; /* BEGIN_HADRON */ - // Record safekeeper commit LSN in shared memory for lag monitoring - { - WalproposerShmemState *shmem = wp->api.get_shmem_state(wp); - Assert(sk->index < MAX_SAFEKEEPERS); - SpinLockAcquire(&shmem->mutex); - shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn; - SpinLockRelease(&shmem->mutex); + if (lakebase_mode) { + // Record safekeeper commit LSN in shared memory for lag monitoring + { + WalproposerShmemState *shmem = wp->api.get_shmem_state(wp); + Assert(sk->index < MAX_SAFEKEEPERS); + SpinLockAcquire(&shmem->mutex); + shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn; + SpinLockRelease(&shmem->mutex); + } } /* END_HADRON */