Start walproposer on replica promotion

This commit is contained in:
Konstantin Knizhnik
2025-05-13 08:27:56 +03:00
parent 45649ccd62
commit 41adde29d7
3 changed files with 51 additions and 13 deletions

View File

@@ -384,6 +384,7 @@ typedef struct WalproposerShmemState
XLogRecPtr donor_lsn;
slock_t mutex;
bool bgw_started;
pg_atomic_uint64 mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
pg_atomic_uint64 currentClusterSize;

View File

@@ -85,7 +85,7 @@ static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(void);
static void walprop_register_bgworker(bool dynamic);
static void walprop_pg_init_standalone_sync_safekeepers(void);
static void walprop_pg_init_walsender(void);
@@ -149,6 +149,8 @@ WalProposerSync(int argc, char *argv[])
WalProposerStart(wp);
}
#define GUC_POLL_DELAY 100000L // 0.1 sec
/*
* WAL proposer bgworker entry point.
*/
@@ -157,6 +159,13 @@ WalProposerMain(Datum main_arg)
{
WalProposer *wp;
while (*wal_acceptors_list == '\0')
{
/* Wait until wal acceptors list GUC changes are propagated */
pg_usleep(GUC_POLL_DELAY);
CHECK_FOR_INTERRUPTS();
}
init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
@@ -186,7 +195,9 @@ pg_init_walproposer(void)
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
walprop_register_bgworker();
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list != '\0')
walprop_register_bgworker(false);
}
static void
@@ -304,6 +315,19 @@ assign_neon_safekeepers(const char *newval, void *extra)
char *newval_copy;
char *oldval;
/* Promotion of replica */
if (*wal_acceptors_list == '\0' && *newval != '\0' && walprop_shared && IsUnderPostmaster)
{
SpinLockAcquire(&walprop_shared->mutex);
if (!walprop_shared->bgw_started)
{
walprop_shared->bgw_started = true;
walprop_register_bgworker(true);
}
SpinLockRelease(&walprop_shared->mutex);
return;
}
if (!am_walproposer)
return;
@@ -496,17 +520,13 @@ BackpressureThrottlingTime(void)
* Register a background worker proposing WAL to wal acceptors.
*/
static void
walprop_register_bgworker(void)
walprop_register_bgworker(bool dynamic)
{
BackgroundWorker bgw;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list == '\0')
return;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_start_time = dynamic ? BgWorkerStart_ConsistentState : BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
@@ -515,7 +535,18 @@ walprop_register_bgworker(void)
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
if (dynamic)
{
BackgroundWorkerHandle* handle;
if (!RegisterDynamicBackgroundWorker(&bgw, &handle))
{
elog(FATAL, "Failed to start walproposer");
}
}
else
{
RegisterBackgroundWorker(&bgw);
}
}
/* shmem handling */

View File

@@ -30,6 +30,8 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
)
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
primary_cur.execute("select pg_switch_wal()")
primary_cur.execute("show neon.safekeepers")
safekeepers = primary_cur.fetchall()[0][0]
wait_replica_caughtup(primary, secondary)
@@ -73,11 +75,15 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
secondary_cur.fetchall()
secondary_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
secondary_cur.execute("select pg_reload_conf()");
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (200,)
new_primary_conn = secondary.connect()
new_primary_cur = new_primary_conn.cursor()
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
new_primary_cur.execute("select count(*) from t")
assert new_primary_cur.fetchone() == (200,)
secondary.stop(mode="immediate")