Compare commits

...

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
3a11ef7720 Perform check for replicaiton LAG only for backends writting WAL 2023-08-21 10:16:38 +03:00

View File

@@ -163,6 +163,7 @@ static void nwp_register_gucs(void);
static void nwp_prepare_shmem(void); static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void); static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void); static bool backpressure_throttling_impl(void);
static uint64 measure_replication_lag(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback; static process_interrupts_callback_t PrevProcessInterruptsCallback;
static shmem_startup_hook_type prev_shmem_startup_hook_type; static shmem_startup_hook_type prev_shmem_startup_hook_type;
@@ -171,6 +172,8 @@ static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void); static void walproposer_shmem_request(void);
#endif #endif
static bool check_replication_lag;
void void
pg_init_walproposer(void) pg_init_walproposer(void)
{ {
@@ -2492,37 +2495,45 @@ backpressure_lag_impl(void)
{ {
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{ {
XLogRecPtr writePtr; check_replication_lag = true;
XLogRecPtr flushPtr; return measure_replication_lag();
XLogRecPtr applyPtr; }
return 0;
}
static uint64
measure_replication_lag(void)
{
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr applyPtr;
#if PG_VERSION_NUM >= 150000 #if PG_VERSION_NUM >= 150000
XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL); XLogRecPtr myFlushLsn = GetFlushRecPtr(NULL);
#else #else
XLogRecPtr myFlushLsn = GetFlushRecPtr(); XLogRecPtr myFlushLsn = GetFlushRecPtr();
#endif #endif
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
#define MB ((XLogRecPtr)1024 * 1024) #define MB ((XLogRecPtr)1024 * 1024)
elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X", elog(DEBUG2, "current flushLsn %X/%X PageserverFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn), LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr), LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr), LSN_FORMAT_ARGS(flushPtr),
LSN_FORMAT_ARGS(applyPtr)); LSN_FORMAT_ARGS(applyPtr));
if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB)) if ((writePtr != InvalidXLogRecPtr && max_replication_write_lag > 0 && myFlushLsn > writePtr + max_replication_write_lag * MB))
{ {
return (myFlushLsn - writePtr - max_replication_write_lag * MB); return (myFlushLsn - writePtr - max_replication_write_lag * MB);
} }
if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB)) if ((flushPtr != InvalidXLogRecPtr && max_replication_flush_lag > 0 && myFlushLsn > flushPtr + max_replication_flush_lag * MB))
{ {
return (myFlushLsn - flushPtr - max_replication_flush_lag * MB); return (myFlushLsn - flushPtr - max_replication_flush_lag * MB);
} }
if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB)) if ((applyPtr != InvalidXLogRecPtr && max_replication_apply_lag > 0 && myFlushLsn > applyPtr + max_replication_apply_lag * MB))
{ {
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB); return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
} }
return 0; return 0;
} }
@@ -2539,14 +2550,18 @@ backpressure_throttling_impl(void)
? PrevProcessInterruptsCallback() ? PrevProcessInterruptsCallback()
: false; : false;
/* Don't throttle read only transactions and wal sender. */ /* Throttle onlhy backends writing WAL. */
if (am_walsender || !TransactionIdIsValid(GetCurrentTransactionIdIfAny())) if (!check_replication_lag)
return retry; return retry;
/* Calculate replicas lag */ /* Calculate replication lag */
lag = backpressure_lag_impl(); lag = measure_replication_lag();
if (lag == 0) if (lag == 0)
{
/* Do not measure replication lag before we writting something to the WAL */
check_replication_lag = false;
return retry; return retry;
}
/* Suspend writers until replicas catch up */ /* Suspend writers until replicas catch up */
set_ps_display("backpressure throttling"); set_ps_display("backpressure throttling");