diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index d9999ef2b1..bb938b739e 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -163,6 +163,7 @@ static void nwp_register_gucs(void); static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static bool backpressure_throttling_impl(void); +static uint64 measure_replication_lag(void); static process_interrupts_callback_t PrevProcessInterruptsCallback; static shmem_startup_hook_type prev_shmem_startup_hook_type; @@ -171,6 +172,8 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL; static void walproposer_shmem_request(void); #endif +static bool check_replication_lag; + void pg_init_walproposer(void) { @@ -2492,37 +2495,45 @@ backpressure_lag_impl(void) { if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) { - XLogRecPtr writePtr; - XLogRecPtr flushPtr; - XLogRecPtr applyPtr; + check_replication_lag = true; + return measure_replication_lag(); + } + return 0; +} + +static uint64 +measure_replication_lag(void) +{ + XLogRecPtr writePtr; + XLogRecPtr flushPtr; + XLogRecPtr applyPtr; #if PG_VERSION_NUM >= 150000 - XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL); + XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL); #else - XLogRecPtr myFlushLsn = GetFlushRecPtr(); + XLogRecPtr myFlushLsn = GetFlushRecPtr(); #endif - replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); + replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); #define MB ((XLogRecPtr)1024 * 1024) - elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X", - LSN_FORMAT_ARGS(myFlushLsn), - LSN_FORMAT_ARGS(writePtr), - LSN_FORMAT_ARGS(flushPtr), - LSN_FORMAT_ARGS(applyPtr)); + elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X", + LSN_FORMAT_ARGS(myFlushLsn), + LSN_FORMAT_ARGS(writePtr), + LSN_FORMAT_ARGS(flushPtr), + LSN_FORMAT_ARGS(applyPtr)); - if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) - { - return (myFlushLsn - writePtr - max_replication_write_lag * MB); - } + if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) + { + return (myFlushLsn - writePtr - max_replication_write_lag * MB); + } - if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) - { - return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); - } + if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) + { + return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); + } - if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) - { - return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); - } + if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) + { + return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); } return 0; } @@ -2539,14 +2550,18 @@ backpressure_throttling_impl(void) ? PrevProcessInterruptsCallback() : false; - /* Don't throttle read only transactions and wal sender. */ - if (am_walsender || !TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + /* Throttle onlhy backends writing WAL. */ + if (!check_replication_lag) return retry; - /* Calculate replicas lag */ - lag = backpressure_lag_impl(); + /* Calculate replication lag */ + lag = measure_replication_lag(); if (lag == 0) + { + /* Do not measure replication lag before we writting something to the WAL */ + check_replication_lag = false; return retry; + } /* Suspend writers until replicas catch up */ set_ps_display("backpressure throttling");