Perform check for replicaiton LAG only for backends writting WAL

This commit is contained in:
Konstantin Knizhnik
2023-08-21 10:16:38 +03:00
parent 130ccb4b67
commit 3a11ef7720

View File

@@ -163,6 +163,7 @@ static void nwp_register_gucs(void);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void);
static uint64 measure_replication_lag(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
@@ -171,6 +172,8 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static bool check_replication_lag;
void
pg_init_walproposer(void)
{
@@ -2492,37 +2495,45 @@ backpressure_lag_impl(void)
{
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
check_replication_lag = true;
return measure_replication_lag();
}
return 0;
}
static uint64
measure_replication_lag(void)
{
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
#if PG_VERSION_NUM >= 150000
XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL);
XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL);
#else
XLogRecPtr myFlushLsn = GetFlushRecPtr();
XLogRecPtr myFlushLsn = GetFlushRecPtr();
#endif
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
#define MB ((XLogRecPtr)1024 * 1024)
elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr));
elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr));
if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB))
{
return (myFlushLsn - writePtr - max_replication_write_lag * MB);
}
if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB))
{
return (myFlushLsn - writePtr - max_replication_write_lag * MB);
}
if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB))
{
return (myFlushLsn - flushPtr - max_replication_flush_lag * MB);
}
if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB))
{
return (myFlushLsn - flushPtr - max_replication_flush_lag * MB);
}
if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB))
{
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB))
{
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
return 0;
}
@@ -2539,14 +2550,18 @@ backpressure_throttling_impl(void)
? PrevProcessInterruptsCallback()
: false;
/* Don't throttle read only transactions and wal sender. */
if (am_walsender || !TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
/* Throttle onlhy backends writing WAL. */
if (!check_replication_lag)
return retry;
/* Calculate replicas lag */
lag = backpressure_lag_impl();
/* Calculate replication lag */
lag = measure_replication_lag();
if (lag == 0)
{
/* Do not measure replication lag before we writting something to the WAL */
check_replication_lag = false;
return retry;
}
/* Suspend writers until replicas catch up */
set_ps_display("backpressure throttling");