From 32e3b7dfd22741e5a4986ff48f2effceaa631437 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 27 May 2025 17:39:39 +0300 Subject: [PATCH] Do not explicitly launch wal_proposer: rely on BgWorkerStart_RecoveryFinished --- pgxn/neon/walproposer.c | 6 +-- pgxn/neon/walproposer.h | 1 - pgxn/neon/walproposer_pg.c | 49 +++------------------ test_runner/regress/test_replica_promote.py | 8 ++-- 4 files changed, 13 insertions(+), 51 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 4cb9aec258..4bcbe9078a 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1380,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp) * we must bail out, as clog and other non rel data is inconsistent. */ walprop_shared = wp->api.get_shmem_state(wp); - if (!wp->config->syncSafekeepers && !wp->api.get_shmem_state(wp)->bgw_started) + if (!wp->config->syncSafekeepers) { /* * Basebackup LSN always points to the beginning of the record (not @@ -1407,8 +1407,8 @@ ProcessPropStartPos(WalProposer *wp) * However, don't dump core as this is kinda expected * scenario. */ - disable_core_dump(); - wp_log(PANIC, + //disable_core_dump(); + wp_log(LOG, "collected propTermStartLsn %X/%X, but basebackup LSN %X/%X", LSN_FORMAT_ARGS(wp->propTermStartLsn), LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 57cfabeb23..cca20e746b 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -384,7 +384,6 @@ typedef struct WalproposerShmemState XLogRecPtr donor_lsn; slock_t mutex; - bool bgw_started; pg_atomic_uint64 mineLastElectedTerm; pg_atomic_uint64 backpressureThrottlingTime; pg_atomic_uint64 currentClusterSize; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 3fdab359df..daacf9f035 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -86,7 +86,7 @@ static void nwp_prepare_shmem(void); static uint64 backpressure_lag_impl(void); static uint64 startup_backpressure_wrap(void); static bool backpressure_throttling_impl(void); -static void walprop_register_bgworker(bool dynamic); +static void walprop_register_bgworker(void); static void walprop_pg_init_standalone_sync_safekeepers(void); static void walprop_pg_init_walsender(void); @@ -161,13 +161,9 @@ WalProposerMain(Datum main_arg) { WalProposer *wp; - walprop_shared->bgw_started = true; - - while (*wal_acceptors_list == '\0') + if (*wal_acceptors_list == '\0') { - /* Wait until wal acceptors list GUC changes are propagated */ - pg_usleep(GUC_POLL_DELAY); - CHECK_FOR_INTERRUPTS(); + elog(PANIC, "Safekeepers list is empty"); } init_walprop_config(false); @@ -199,9 +195,7 @@ pg_init_walproposer(void) PrevProcessInterruptsCallback = ProcessInterruptsCallback; ProcessInterruptsCallback = backpressure_throttling_impl; - /* If no wal acceptors are specified, don't start the background worker. */ - if (*wal_acceptors_list != '\0') - walprop_register_bgworker(false); + walprop_register_bgworker(); } static void @@ -319,29 +313,12 @@ safekeepers_cmp(char *old, char *new) return true; } -/* - * GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if - * the list changed. - */ static void assign_neon_safekeepers(const char *newval, void *extra) { char *newval_copy; char *oldval; - /* Promotion of replica */ - if (*wal_acceptors_list == '\0' && *newval != '\0' && walprop_shared && IsUnderPostmaster) - { - SpinLockAcquire(&walprop_shared->mutex); - if (!walprop_shared->bgw_started) - { - walprop_shared->bgw_started = true; - walprop_register_bgworker(true); - } - SpinLockRelease(&walprop_shared->mutex); - return; - } - if (!am_walproposer) return; @@ -534,13 +511,13 @@ BackpressureThrottlingTime(void) * Register a background worker proposing WAL to wal acceptors. */ static void -walprop_register_bgworker(bool dynamic) +walprop_register_bgworker(void) { BackgroundWorker bgw; memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; - bgw.bgw_start_time = dynamic ? BgWorkerStart_ConsistentState : BgWorkerStart_RecoveryFinished; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer"); @@ -548,19 +525,7 @@ walprop_register_bgworker(bool dynamic) bgw.bgw_restart_time = 1; bgw.bgw_notify_pid = 0; bgw.bgw_main_arg = (Datum) 0; - - if (dynamic) - { - BackgroundWorkerHandle* handle; - if (!RegisterDynamicBackgroundWorker(&bgw, &handle)) - { - elog(FATAL, "Failed to start walproposer"); - } - } - else - { - RegisterBackgroundWorker(&bgw); - } + RegisterBackgroundWorker(&bgw); } /* shmem handling */ diff --git a/test_runner/regress/test_replica_promote.py b/test_runner/regress/test_replica_promote.py index 67a39159ef..1afa89af2d 100644 --- a/test_runner/regress/test_replica_promote.py +++ b/test_runner/regress/test_replica_promote.py @@ -52,17 +52,15 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion): wait_replica_caughtup(primary, secondary) primary.stop() - secondary_conn = secondary.connect() - secondary_cur = secondary_conn.cursor() - secondary_cur.execute("SELECT * FROM pg_promote()") - assert secondary_cur.fetchone() == (True,) - secondary_conn = secondary.connect() secondary_cur = secondary_conn.cursor() secondary_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'") secondary_cur.execute("select pg_reload_conf()") + secondary_cur.execute("SELECT * FROM pg_promote()") + assert secondary_cur.fetchone() == (True,) + new_primary = secondary old_primary = primary