diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 83ef72d3d7..547c1b82ab 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -384,6 +384,7 @@ typedef struct WalproposerShmemState XLogRecPtr donor_lsn; slock_t mutex; + bool bgw_started; pg_atomic_uint64 mineLastElectedTerm; pg_atomic_uint64 backpressureThrottlingTime; pg_atomic_uint64 currentClusterSize; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 17582405db..4da4d295dc 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -85,7 +85,7 @@ static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static uint64 startup_backpressure_wrap(void); static bool backpressure_throttling_impl(void); -static void walprop_register_bgworker(void); +static void walprop_register_bgworker(bool dynamic); static void walprop_pg_init_standalone_sync_safekeepers(void); static void walprop_pg_init_walsender(void); @@ -149,6 +149,8 @@ WalProposerSync(int argc, char *argv[]) WalProposerStart(wp); } +#define GUC_POLL_DELAY 100000L // 0.1 sec + /* * WAL proposer bgworker entry point. */ @@ -157,6 +159,13 @@ WalProposerMain(Datum main_arg) { WalProposer *wp; + while (*wal_acceptors_list == '\0') + { + /* Wait until wal acceptors list GUC changes are propagated */ + pg_usleep(GUC_POLL_DELAY); + CHECK_FOR_INTERRUPTS(); + } + init_walprop_config(false); walprop_pg_init_bgworker(); am_walproposer = true; @@ -186,7 +195,9 @@ pg_init_walproposer(void) PrevProcessInterruptsCallback = ProcessInterruptsCallback; ProcessInterruptsCallback = backpressure_throttling_impl; - walprop_register_bgworker(); + /* If no wal acceptors are specified, don't start the background worker. */ + if (*wal_acceptors_list != '\0') + walprop_register_bgworker(false); } static void @@ -304,6 +315,19 @@ assign_neon_safekeepers(const char *newval, void *extra) char *newval_copy; char *oldval; + /* Promotion of replica */ + if (*wal_acceptors_list == '\0' && *newval != '\0' && walprop_shared && IsUnderPostmaster) + { + SpinLockAcquire(&walprop_shared->mutex); + if (!walprop_shared->bgw_started) + { + walprop_shared->bgw_started = true; + walprop_register_bgworker(true); + } + SpinLockRelease(&walprop_shared->mutex); + return; + } + if (!am_walproposer) return; @@ -496,17 +520,13 @@ BackpressureThrottlingTime(void) * Register a background worker proposing WAL to wal acceptors. */ static void -walprop_register_bgworker(void) +walprop_register_bgworker(bool dynamic) { BackgroundWorker bgw; - /* If no wal acceptors are specified, don't start the background worker. */ - if (*wal_acceptors_list == '\0') - return; - memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = dynamic ? BgWorkerStart_ConsistentState : BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer"); @@ -515,7 +535,18 @@ walprop_register_bgworker(void) bgw.bgw_notify_pid = 0; bgw.bgw_main_arg = (Datum) 0; - RegisterBackgroundWorker(&bgw); + if (dynamic) + { + BackgroundWorkerHandle* handle; + if (!RegisterDynamicBackgroundWorker(&bgw, &handle)) + { + elog(FATAL, "Failed to start walproposer"); + } + } + else + { + RegisterBackgroundWorker(&bgw); + } } /* shmem handling */ diff --git a/test_runner/regress/test_replica_promote.py b/test_runner/regress/test_replica_promote.py index 926512bb7b..b1763f8e10 100644 --- a/test_runner/regress/test_replica_promote.py +++ b/test_runner/regress/test_replica_promote.py @@ -30,6 +30,8 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): ) primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)") primary_cur.execute("select pg_switch_wal()") + primary_cur.execute("show neon.safekeepers") + safekeepers = primary_cur.fetchall()[0][0] wait_replica_caughtup(primary, secondary) @@ -73,11 +75,15 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): secondary_conn = secondary.connect() secondary_cur = secondary_conn.cursor() - secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)") - secondary_cur.fetchall() + secondary_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'") + secondary_cur.execute("select pg_reload_conf()"); - secondary_cur.execute("select count(*) from t") - assert secondary_cur.fetchone() == (200,) + new_primary_conn = secondary.connect() + new_primary_cur = new_primary_conn.cursor() + new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)") + + new_primary_cur.execute("select count(*) from t") + assert new_primary_cur.fetchone() == (200,) secondary.stop(mode="immediate")