Move backpressure throttling implementation to neon extension and function for monitoring throttling time (#2380)

* Move backpressure throttling implementation to neon extension and function for monitoring throttling time

* Add missing includes

* Bump postgres version
This commit is contained in:
Konstantin Knizhnik
2022-09-03 08:48:28 +03:00
committed by GitHub
parent a4e79db348
commit 71c965b0e1
5 changed files with 62 additions and 3 deletions

View File

@@ -15,3 +15,10 @@ RETURNS record
AS 'MODULE_PATHNAME', 'backpressure_lsns'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION backpressure_throttling_time()
RETURNS bigint
AS 'MODULE_PATHNAME', 'backpressure_throttling_time'
LANGUAGE C STRICT
PARALLEL UNSAFE;

View File

@@ -40,6 +40,7 @@ void _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -80,3 +81,9 @@ backpressure_lsns(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
Datum
backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}

View File

@@ -36,6 +36,7 @@
#include <signal.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/xact.h"
#include "access/xlogdefs.h"
#include "access/xlogutils.h"
#include "storage/latch.h"
@@ -58,6 +59,7 @@
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "neon.h"
@@ -159,8 +161,9 @@ static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static bool backpressure_throttling_impl(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback;
static shmem_startup_hook_type prev_shmem_startup_hook_type;
@@ -175,9 +178,11 @@ void pg_init_walproposer(void)
nwp_prepare_shmem();
delay_backend_us = &backpressure_lag_impl;
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
WalProposerRegister();
WalProposerInit = &WalProposerInitImpl;
WalProposerStart = &WalProposerStartImpl;
}
@@ -1963,6 +1968,7 @@ WalproposerShmemInit(void)
{
memset(walprop_shared, 0, WalproposerShmemSize());
SpinLockInit(&walprop_shared->mutex);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
}
LWLockRelease(AddinShmemInitLock);
@@ -2401,3 +2407,39 @@ backpressure_lag_impl(void)
}
return 0;
}
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
static bool backpressure_throttling_impl(void)
{
int64 lag;
TimestampTz start, stop;
bool retry = PrevProcessInterruptsCallback
? PrevProcessInterruptsCallback()
: false;
// Don't throttle read only transactions and wal sender.
if (am_walsender || !TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
return retry;
// Calculate replicas lag
lag = backpressure_lag_impl();
if (lag == 0)
return retry;
// Suspend writers until replicas catch up
set_ps_display("backpressure throttling");
elog(DEBUG2, "backpressure throttling: lag %lu", lag);
start = GetCurrentTimestamp();
pg_usleep(BACK_PRESSURE_DELAY);
stop = GetCurrentTimestamp();
pg_atomic_add_fetch_u64(&walprop_shared->backpressureThrottlingTime, stop - start);
return true;
}
uint64
BackpressureThrottlingTime(void)
{
return pg_atomic_read_u64(&walprop_shared->backpressureThrottlingTime);
}

View File

@@ -287,6 +287,7 @@ typedef struct WalproposerShmemState
slock_t mutex;
ReplicationFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
/*
@@ -537,4 +538,6 @@ typedef struct WalProposerFunctionsType
*/
extern PGDLLIMPORT WalProposerFunctionsType *WalProposerFunctions;
extern uint64 BackpressureThrottlingTime(void);
#endif /* __NEON_WALPROPOSER_H__ */