Do not explicitly launch wal_proposer: rely on BgWorkerStart_RecoveryFinished

This commit is contained in:
Konstantin Knizhnik
2025-05-27 17:39:39 +03:00
committed by Mikhail Kot
parent 2e35475c9a
commit 32e3b7dfd2
4 changed files with 13 additions and 51 deletions

View File

@@ -1380,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
* we must bail out, as clog and other non rel data is inconsistent.
*/
walprop_shared = wp->api.get_shmem_state(wp);
if (!wp->config->syncSafekeepers && !wp->api.get_shmem_state(wp)->bgw_started)
if (!wp->config->syncSafekeepers)
{
/*
* Basebackup LSN always points to the beginning of the record (not
@@ -1407,8 +1407,8 @@ ProcessPropStartPos(WalProposer *wp)
* However, don't dump core as this is kinda expected
* scenario.
*/
disable_core_dump();
wp_log(PANIC,
//disable_core_dump();
wp_log(LOG,
"collected propTermStartLsn %X/%X, but basebackup LSN %X/%X",
LSN_FORMAT_ARGS(wp->propTermStartLsn),
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));

View File

@@ -384,7 +384,6 @@ typedef struct WalproposerShmemState
XLogRecPtr donor_lsn;
slock_t mutex;
bool bgw_started;
pg_atomic_uint64 mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
pg_atomic_uint64 currentClusterSize;

View File

@@ -86,7 +86,7 @@ static void nwp_prepare_shmem(void);
static uint64 backpressure_lag_impl(void);
static uint64 startup_backpressure_wrap(void);
static bool backpressure_throttling_impl(void);
static void walprop_register_bgworker(bool dynamic);
static void walprop_register_bgworker(void);
static void walprop_pg_init_standalone_sync_safekeepers(void);
static void walprop_pg_init_walsender(void);
@@ -161,13 +161,9 @@ WalProposerMain(Datum main_arg)
{
WalProposer *wp;
walprop_shared->bgw_started = true;
while (*wal_acceptors_list == '\0')
if (*wal_acceptors_list == '\0')
{
/* Wait until wal acceptors list GUC changes are propagated */
pg_usleep(GUC_POLL_DELAY);
CHECK_FOR_INTERRUPTS();
elog(PANIC, "Safekeepers list is empty");
}
init_walprop_config(false);
@@ -199,9 +195,7 @@ pg_init_walproposer(void)
PrevProcessInterruptsCallback = ProcessInterruptsCallback;
ProcessInterruptsCallback = backpressure_throttling_impl;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list != '\0')
walprop_register_bgworker(false);
walprop_register_bgworker();
}
static void
@@ -319,29 +313,12 @@ safekeepers_cmp(char *old, char *new)
return true;
}
/*
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
* the list changed.
*/
static void
assign_neon_safekeepers(const char *newval, void *extra)
{
char *newval_copy;
char *oldval;
/* Promotion of replica */
if (*wal_acceptors_list == '\0' && *newval != '\0' && walprop_shared && IsUnderPostmaster)
{
SpinLockAcquire(&walprop_shared->mutex);
if (!walprop_shared->bgw_started)
{
walprop_shared->bgw_started = true;
walprop_register_bgworker(true);
}
SpinLockRelease(&walprop_shared->mutex);
return;
}
if (!am_walproposer)
return;
@@ -534,13 +511,13 @@ BackpressureThrottlingTime(void)
* Register a background worker proposing WAL to wal acceptors.
*/
static void
walprop_register_bgworker(bool dynamic)
walprop_register_bgworker(void)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = dynamic ? BgWorkerStart_ConsistentState : BgWorkerStart_RecoveryFinished;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer");
@@ -548,19 +525,7 @@ walprop_register_bgworker(bool dynamic)
bgw.bgw_restart_time = 1;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
if (dynamic)
{
BackgroundWorkerHandle* handle;
if (!RegisterDynamicBackgroundWorker(&bgw, &handle))
{
elog(FATAL, "Failed to start walproposer");
}
}
else
{
RegisterBackgroundWorker(&bgw);
}
RegisterBackgroundWorker(&bgw);
}
/* shmem handling */

View File

@@ -52,17 +52,15 @@ def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
wait_replica_caughtup(primary, secondary)
primary.stop()
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute("SELECT * FROM pg_promote()")
assert secondary_cur.fetchone() == (True,)
secondary_conn = secondary.connect()
secondary_cur = secondary_conn.cursor()
secondary_cur.execute(f"alter system set neon.safekeepers='{safekeepers}'")
secondary_cur.execute("select pg_reload_conf()")
secondary_cur.execute("SELECT * FROM pg_promote()")
assert secondary_cur.fetchone() == (True,)
new_primary = secondary
old_primary = primary