From 71c965b0e162a5f1431b417b794e64fb5a39832f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 3 Sep 2022 08:48:28 +0300 Subject: [PATCH] Move backpressure throttling implementation to neon extension and function for monitoring throttling time (#2380) * Move backpressure throttling implementation to neon extension and function for monitoring throttling time * Add missing includes * Bump postgres version --- pgxn/neon/neon--1.0.sql | 7 +++++++ pgxn/neon/neon.c | 7 +++++++ pgxn/neon/walproposer.c | 46 +++++++++++++++++++++++++++++++++++++++-- pgxn/neon/walproposer.h | 3 +++ vendor/postgres | 2 +- 5 files changed, 62 insertions(+), 3 deletions(-) diff --git a/pgxn/neon/neon--1.0.sql b/pgxn/neon/neon--1.0.sql index 34f1ba78d4..58b98a5923 100644 --- a/pgxn/neon/neon--1.0.sql +++ b/pgxn/neon/neon--1.0.sql @@ -15,3 +15,10 @@ RETURNS record AS 'MODULE_PATHNAME', 'backpressure_lsns' LANGUAGE C STRICT PARALLEL UNSAFE; + +CREATE FUNCTION backpressure_throttling_time() +RETURNS bigint +AS 'MODULE_PATHNAME', 'backpressure_throttling_time' +LANGUAGE C STRICT +PARALLEL UNSAFE; + diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 595a126f04..62d2624e56 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -40,6 +40,7 @@ void _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); +PG_FUNCTION_INFO_V1(backpressure_throttling_time); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -80,3 +81,9 @@ backpressure_lsns(PG_FUNCTION_ARGS) PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +Datum +backpressure_throttling_time(PG_FUNCTION_ARGS) +{ + PG_RETURN_UINT64(BackpressureThrottlingTime()); +} diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 9625325c0a..3baa4802b0 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -36,6 +36,7 @@ #include #include #include +#include "access/xact.h" #include "access/xlogdefs.h" #include "access/xlogutils.h" #include "storage/latch.h" @@ -58,6 +59,7 @@ #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" +#include "utils/ps_status.h" #include "utils/timestamp.h" #include "neon.h" @@ -159,8 +161,9 @@ static void nwp_shmem_startup_hook(void); static void nwp_register_gucs(void); static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); +static bool backpressure_throttling_impl(void); - +static process_interrupts_callback_t PrevProcessInterruptsCallback; static shmem_startup_hook_type prev_shmem_startup_hook_type; @@ -175,9 +178,11 @@ void pg_init_walproposer(void) nwp_prepare_shmem(); delay_backend_us = &backpressure_lag_impl; + PrevProcessInterruptsCallback = ProcessInterruptsCallback; + ProcessInterruptsCallback = backpressure_throttling_impl; WalProposerRegister(); - + WalProposerInit = &WalProposerInitImpl; WalProposerStart = &WalProposerStartImpl; } @@ -1963,6 +1968,7 @@ WalproposerShmemInit(void) { memset(walprop_shared, 0, WalproposerShmemSize()); SpinLockInit(&walprop_shared->mutex); + pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0); } LWLockRelease(AddinShmemInitLock); @@ -2401,3 +2407,39 @@ backpressure_lag_impl(void) } return 0; } + +#define BACK_PRESSURE_DELAY 10000L // 0.01 sec + +static bool backpressure_throttling_impl(void) +{ + int64 lag; + TimestampTz start, stop; + bool retry = PrevProcessInterruptsCallback + ? PrevProcessInterruptsCallback() + : false; + + // Don't throttle read only transactions and wal sender. + if (am_walsender || !TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return retry; + + // Calculate replicas lag + lag = backpressure_lag_impl(); + if (lag == 0) + return retry; + + // Suspend writers until replicas catch up + set_ps_display("backpressure throttling"); + + elog(DEBUG2, "backpressure throttling: lag %lu", lag); + start = GetCurrentTimestamp(); + pg_usleep(BACK_PRESSURE_DELAY); + stop = GetCurrentTimestamp(); + pg_atomic_add_fetch_u64(&walprop_shared->backpressureThrottlingTime, stop - start); + return true; +} + +uint64 +BackpressureThrottlingTime(void) +{ + return pg_atomic_read_u64(&walprop_shared->backpressureThrottlingTime); +} diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index b684d5264f..75167163f3 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -287,6 +287,7 @@ typedef struct WalproposerShmemState slock_t mutex; ReplicationFeedback feedback; term_t mineLastElectedTerm; + pg_atomic_uint64 backpressureThrottlingTime; } WalproposerShmemState; /* @@ -537,4 +538,6 @@ typedef struct WalProposerFunctionsType */ extern PGDLLIMPORT WalProposerFunctionsType *WalProposerFunctions; +extern uint64 BackpressureThrottlingTime(void); + #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/vendor/postgres b/vendor/postgres index 22d9ead36b..bbd2ab1544 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 22d9ead36beeab6b6a99c64f9b0b1576927ad91b +Subproject commit bbd2ab15443935a6871b39f90ed669160d9987ad