diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 69c79c1adf..97767a09ed 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -43,7 +43,6 @@ /* Prototypes for private functions */ static void WalProposerLoop(WalProposer *wp); -static void HackyRemoveWalProposerEvent(Safekeeper *to_remove); static void ShutdownConnection(Safekeeper *sk); static void ResetConnection(Safekeeper *sk); static long TimeToReconnect(WalProposer *wp, TimestampTz now); @@ -78,7 +77,6 @@ static bool AsyncFlush(Safekeeper *sk); static int CompareLsn(const void *a, const void *b); static char *FormatSafekeeperState(SafekeeperState state); static void AssertEventsOkForState(uint32 events, Safekeeper *sk); -static uint32 SafekeeperStateDesiredEvents(SafekeeperState state); static char *FormatEvents(WalProposer *wp, uint32 events); WalProposer * @@ -303,43 +301,6 @@ WalProposerLoop(WalProposer *wp) WalProposerPoll(wp); } -/* - * Hack: provides a way to remove the event corresponding to an individual walproposer from the set. - * - * Note: Internally, this completely reconstructs the event set. It should be avoided if possible. - */ -static void -HackyRemoveWalProposerEvent(Safekeeper *to_remove) -{ - WalProposer *wp = to_remove->wp; - - /* Remove the existing event set, assign sk->eventPos = -1 */ - wp->api.free_event_set(wp); - /* Re-initialize it without adding any safekeeper events */ - wp->api.init_event_set(wp); - - /* - * loop through the existing safekeepers. If they aren't the one we're - * removing, and if they have a socket we can use, re-add the applicable - * events. - */ - for (int i = 0; i < wp->n_safekeepers; i++) - { - uint32 desired_events = WL_NO_EVENTS; - Safekeeper *sk = &wp->safekeeper[i]; - - if (sk == to_remove) - continue; - - /* If this safekeeper isn't offline, add an event for it! */ - if (sk->state != SS_OFFLINE) - { - desired_events = SafekeeperStateDesiredEvents(sk->state); - /* will set sk->eventPos */ - wp->api.add_safekeeper_event_set(sk, desired_events); - } - } -} /* Shuts down and cleans up the connection for a safekeeper. Sets its state to SS_OFFLINE */ static void @@ -354,7 +315,7 @@ ShutdownConnection(Safekeeper *sk) pfree(sk->voteResponse.termHistory.entries); sk->voteResponse.termHistory.entries = NULL; - HackyRemoveWalProposerEvent(sk); + sk->wp->api.rm_safekeeper_event_set(sk); if (sk->xlogreader) { @@ -626,7 +587,7 @@ HandleConnectionEvent(Safekeeper *sk) * Because PQconnectPoll can change the socket, we have to un-register the * old event and re-register an event on the new socket. */ - HackyRemoveWalProposerEvent(sk); + wp->api.rm_safekeeper_event_set(sk); wp->api.add_safekeeper_event_set(sk, new_events); /* If we successfully connected, send START_WAL_PUSH query */ @@ -2033,7 +1994,7 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) /* Returns the set of events a safekeeper in this state should be waiting on * * This will return WL_NO_EVENTS (= 0) for some events. */ -static uint32 +uint32 SafekeeperStateDesiredEvents(SafekeeperState state) { uint32 result = WL_NO_EVENTS; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 6fdbabb905..3ad4c2faef 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -516,9 +516,6 @@ typedef struct walproposer_api /* Allocate WAL reader. */ void (*wal_reader_allocate) (Safekeeper *sk); - /* Deallocate event set. */ - void (*free_event_set) (WalProposer *wp); - /* Initialize event set. */ void (*init_event_set) (WalProposer *wp); @@ -528,6 +525,9 @@ typedef struct walproposer_api /* Add a new safekeeper connection to the event set. */ void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events); + /* Remove safekeeper connection from event set */ + void (*rm_safekeeper_event_set) (Safekeeper *sk); + /* * Wait until some event happens: - timeout is reached - socket event for * safekeeper connection - new WAL is available @@ -712,6 +712,11 @@ extern void WalProposerStart(WalProposer *wp); extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(WalProposer *wp); extern void WalProposerFree(WalProposer *wp); +/* + * WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to + * recreate set from scratch, hence the export. + */ +extern uint32 SafekeeperStateDesiredEvents(SafekeeperState state); #define WPEVENT 1337 /* special log level for walproposer internal diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 3bdc171690..c562fa2cfa 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1450,6 +1450,45 @@ walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events) sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk); } +/* + * Hack: provides a way to remove the event corresponding to an individual walproposer from the set. + * + * Note: Internally, this completely reconstructs the event set. It should be avoided if possible. + */ +static void +walprop_pg_rm_safekeeper_event_set(Safekeeper *to_remove) +{ + WalProposer *wp = to_remove->wp; + + /* Remove the existing event set, assign sk->eventPos = -1 */ + walprop_pg_free_event_set(wp); + + /* Re-initialize it without adding any safekeeper events */ + wp->api.init_event_set(wp); + + /* + * loop through the existing safekeepers. If they aren't the one we're + * removing, and if they have a socket we can use, re-add the applicable + * events. + */ + for (int i = 0; i < wp->n_safekeepers; i++) + { + uint32 desired_events = WL_NO_EVENTS; + Safekeeper *sk = &wp->safekeeper[i]; + + if (sk == to_remove) + continue; + + /* If this safekeeper isn't offline, add an event for it! */ + if (sk->state != SS_OFFLINE) + { + desired_events = SafekeeperStateDesiredEvents(sk->state); + /* will set sk->eventPos */ + wp->api.add_safekeeper_event_set(sk, desired_events); + } + } +} + static int walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events) { @@ -1713,10 +1752,10 @@ static const walproposer_api walprop_pg = { .recovery_download = WalProposerRecovery, .wal_read = walprop_pg_wal_read, .wal_reader_allocate = walprop_pg_wal_reader_allocate, - .free_event_set = walprop_pg_free_event_set, .init_event_set = walprop_pg_init_event_set, .update_event_set = walprop_pg_update_event_set, .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set, + .rm_safekeeper_event_set = walprop_pg_rm_safekeeper_event_set, .wait_event_set = walprop_pg_wait_event_set, .strong_random = walprop_pg_strong_random, .get_redo_start_lsn = walprop_pg_get_redo_start_lsn,