diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7ec4ec99fc..0336d63e8d 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -57,10 +57,11 @@ static void SendProposerGreeting(Safekeeper *sk); static void RecvAcceptorGreeting(Safekeeper *sk); static void SendVoteRequest(Safekeeper *sk); static void RecvVoteResponse(Safekeeper *sk); +static bool VotesCollected(WalProposer *wp); static void HandleElectedProposer(WalProposer *wp); static term_t GetHighestTerm(TermHistory *th); -static term_t GetEpoch(Safekeeper *sk); -static void DetermineEpochStartLsn(WalProposer *wp); +static term_t GetLastLogTerm(Safekeeper *sk); +static void ProcessPropStartPos(WalProposer *wp); static void SendProposerElected(Safekeeper *sk); static void StartStreaming(Safekeeper *sk); static void SendMessageToNode(Safekeeper *sk); @@ -97,6 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp = palloc0(sizeof(WalProposer)); wp->config = config; wp->api = api; + wp->state = WPS_COLLECTING_TERMS; wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); @@ -518,7 +520,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * nodes are transferred from SS_VOTING to sending actual vote * requests. */ - case SS_VOTING: + case SS_WAIT_VOTING: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -547,7 +549,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) /* * Idle state for waiting votes from quorum. */ - case SS_IDLE: + case SS_WAIT_ELECTED: wp_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk)); ResetConnection(sk); @@ -721,6 +723,15 @@ SendProposerGreeting(Safekeeper *sk) BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } +/* + * Have we received greeting from enough (quorum) safekeepers to start voting? + */ +static bool +TermsCollected(WalProposer *wp) +{ + return wp->n_connected >= wp->quorum; +} + static void RecvAcceptorGreeting(Safekeeper *sk) { @@ -754,7 +765,7 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* Protocol is all good, move to voting. */ - sk->state = SS_VOTING; + sk->state = SS_WAIT_VOTING; /* * Note: it would be better to track the counter on per safekeeper basis, @@ -762,17 +773,18 @@ RecvAcceptorGreeting(Safekeeper *sk) * as is for now. */ ++wp->n_connected; - if (wp->n_connected <= wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* We're still collecting terms from the majority. */ wp->propTerm = Max(sk->greetResponse.term, wp->propTerm); /* Quorum is acquried, prepare the vote request. */ - if (wp->n_connected == wp->quorum) + if (TermsCollected(wp)) { wp->propTerm++; wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); + wp->state = WPS_CAMPAIGN; wp->voteRequest.pam.tag = 'v'; wp->voteRequest.generation = wp->mconf.generation; wp->voteRequest.term = wp->propTerm; @@ -787,12 +799,10 @@ RecvAcceptorGreeting(Safekeeper *sk) } /* - * Check if we have quorum. If there aren't enough safekeepers, wait and - * do nothing. We'll eventually get a task when the election starts. - * - * If we do have quorum, we can start an election. + * If we have quorum, start (or just send vote request to newly connected + * node) election, otherwise wait until we have more greetings. */ - if (wp->n_connected < wp->quorum) + if (wp->state == WPS_COLLECTING_TERMS) { /* * SS_VOTING is an idle state; read-ready indicates the connection @@ -807,11 +817,7 @@ RecvAcceptorGreeting(Safekeeper *sk) */ for (int j = 0; j < wp->n_safekeepers; j++) { - /* - * Remember: SS_VOTING indicates that the safekeeper is - * participating in voting, but hasn't sent anything yet. - */ - if (wp->safekeeper[j].state == SS_VOTING) + if (wp->safekeeper[j].state == SS_WAIT_VOTING) SendVoteRequest(&wp->safekeeper[j]); } } @@ -838,6 +844,8 @@ RecvVoteResponse(Safekeeper *sk) { WalProposer *wp = sk->wp; + Assert(wp->state >= WPS_CAMPAIGN); + sk->voteResponse.apm.tag = 'v'; if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; @@ -856,7 +864,7 @@ RecvVoteResponse(Safekeeper *sk) * we are not elected yet and thus need the vote. */ if ((!sk->voteResponse.voteGiven) && - (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) + (sk->voteResponse.term > wp->propTerm || wp->state == WPS_CAMPAIGN)) { wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, @@ -864,38 +872,83 @@ RecvVoteResponse(Safekeeper *sk) } Assert(sk->voteResponse.term == wp->propTerm); - /* Handshake completed, do we have quorum? */ + /* ready for elected message */ + sk->state = SS_WAIT_ELECTED; + wp->n_votes++; - if (wp->n_votes < wp->quorum) + /* Are we already elected? */ + if (wp->state == WPS_CAMPAIGN) { - sk->state = SS_IDLE; /* can't do much yet, no quorum */ - } - else if (wp->n_votes > wp->quorum) - { - /* already elected, start streaming */ - SendProposerElected(sk); + /* no; check if this vote makes us elected */ + if (VotesCollected(wp)) + { + wp->state = WPS_ELECTED; + HandleElectedProposer(wp); + } + else + { + /* can't do much yet, no quorum */ + return; + } } else { - sk->state = SS_IDLE; - /* Idle state waits for read-ready events */ - wp->api.update_event_set(sk, WL_SOCKET_READABLE); - - HandleElectedProposer(sk->wp); + Assert(wp->state == WPS_ELECTED); + /* send elected only to this sk */ + SendProposerElected(sk); } } +/* + * Checks if enough votes has been collected to get elected and if that's the + * case finds the highest vote, setting donor, donorLastLogTerm, + * propTermStartLsn fields. Also sets truncateLsn. + */ +static bool +VotesCollected(WalProposer *wp) +{ + int n_ready = 0; + + /* assumed to be called only when not elected yet */ + Assert(wp->state == WPS_CAMPAIGN); + + wp->propTermStartLsn = InvalidXLogRecPtr; + wp->donorLastLogTerm = 0; + wp->truncateLsn = InvalidXLogRecPtr; + + for (int i = 0; i < wp->n_safekeepers; i++) + { + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) + { + n_ready++; + + if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm || + (GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm && + wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn)) + { + wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]); + wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn; + wp->donor = i; + } + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); + } + } + + return n_ready >= wp->quorum; +} + /* * Called once a majority of acceptors have voted for us and current proposer * has been elected. * - * Sends ProposerElected message to all acceptors in SS_IDLE state and starts + * Sends ProposerElected message to all acceptors in SS_WAIT_ELECTED state and starts * replication from walsender. */ static void HandleElectedProposer(WalProposer *wp) { - DetermineEpochStartLsn(wp); + ProcessPropStartPos(wp); + Assert(wp->propTermStartLsn != InvalidXLogRecPtr); /* * Synchronously download WAL from the most advanced safekeeper. We do @@ -907,40 +960,24 @@ HandleElectedProposer(WalProposer *wp) wp_log(FATAL, "failed to download WAL for logical replicaiton"); } - /* - * Zero propEpochStartLsn means majority of safekeepers doesn't have any - * WAL, timeline was just created. Compute bumps it to basebackup LSN, - * otherwise we must be sync-safekeepers and we have nothing to do then. - * - * Proceeding is not only pointless but harmful, because we'd give - * safekeepers term history starting with 0/0. These hacks will go away - * once we disable implicit timeline creation on safekeepers and create it - * with non zero LSN from the start. - */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr) - { - Assert(wp->config->syncSafekeepers); - wp_log(LOG, "elected with zero propEpochStartLsn in sync-safekeepers, exiting"); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); - } - - if (wp->truncateLsn == wp->propEpochStartLsn && wp->config->syncSafekeepers) + if (wp->truncateLsn == wp->propTermStartLsn && wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } for (int i = 0; i < wp->n_safekeepers; i++) { - if (wp->safekeeper[i].state == SS_IDLE) + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) SendProposerElected(&wp->safekeeper[i]); } /* * The proposer has been elected, and there will be no quorum waiting - * after this point. There will be no safekeeper with state SS_IDLE also, - * because that state is used only for quorum waiting. + * after this point. There will be no safekeeper with state + * SS_WAIT_ELECTED also, because that state is used only for quorum + * waiting. */ if (wp->config->syncSafekeepers) @@ -957,7 +994,7 @@ HandleElectedProposer(WalProposer *wp) return; } - wp->api.start_streaming(wp, wp->propEpochStartLsn); + wp->api.start_streaming(wp, wp->propTermStartLsn); /* Should not return here */ } @@ -970,7 +1007,7 @@ GetHighestTerm(TermHistory *th) /* safekeeper's epoch is the term of the highest entry in the log */ static term_t -GetEpoch(Safekeeper *sk) +GetLastLogTerm(Safekeeper *sk) { return GetHighestTerm(&sk->voteResponse.termHistory); } @@ -991,72 +1028,52 @@ SkipXLogPageHeader(WalProposer *wp, XLogRecPtr lsn) } /* - * Called after majority of acceptors gave votes, it calculates the most - * advanced safekeeper (who will be the donor) and epochStartLsn -- LSN since - * which we'll write WAL in our term. - * - * Sets truncateLsn along the way (though it is not of much use at this point -- - * only for skipping recovery). + * Called after quorum gave votes and proposer starting position (highest vote + * term + flush LSN) -- is determined (VotesCollected true), this function + * adopts it: pushes LSN to shmem, sets wp term history, verifies that the + * basebackup matches. */ static void -DetermineEpochStartLsn(WalProposer *wp) +ProcessPropStartPos(WalProposer *wp) { TermHistory *dth; - int n_ready = 0; WalproposerShmemState *walprop_shared; - wp->propEpochStartLsn = InvalidXLogRecPtr; - wp->donorEpoch = 0; - wp->truncateLsn = InvalidXLogRecPtr; - - for (int i = 0; i < wp->n_safekeepers; i++) - { - if (wp->safekeeper[i].state == SS_IDLE) - { - n_ready++; - - if (GetEpoch(&wp->safekeeper[i]) > wp->donorEpoch || - (GetEpoch(&wp->safekeeper[i]) == wp->donorEpoch && - wp->safekeeper[i].voteResponse.flushLsn > wp->propEpochStartLsn)) - { - wp->donorEpoch = GetEpoch(&wp->safekeeper[i]); - wp->propEpochStartLsn = wp->safekeeper[i].voteResponse.flushLsn; - wp->donor = i; - } - wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - } - } - - if (n_ready < wp->quorum) - { - /* - * This is a rare case that can be triggered if safekeeper has voted - * and disconnected. In this case, its state will not be SS_IDLE and - * its vote cannot be used, because we clean up `voteResponse` in - * `ShutdownConnection`. - */ - wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready); - } + /* must have collected votes */ + Assert(wp->state == WPS_ELECTED); /* - * If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are - * bootstrapping and nothing was committed yet. Start streaming then from - * the basebackup LSN. + * If propTermStartLsn is 0, it means flushLsn is 0 everywhere, we are + * bootstrapping and nothing was committed yet. Start streaming from the + * basebackup LSN then. + * + * In case of sync-safekeepers just exit: proceeding is not only pointless + * but harmful, because we'd give safekeepers term history starting with + * 0/0. These hacks will go away once we disable implicit timeline + * creation on safekeepers and create it with non zero LSN from the start. */ - if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) + if (wp->propTermStartLsn == InvalidXLogRecPtr) { - wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); - wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + if (!wp->config->syncSafekeepers) + { + wp->propTermStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); + wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propTermStartLsn)); + } + else + { + wp_log(LOG, "elected with zero propTermStartLsn in sync-safekeepers, exiting"); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); + } } - pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn); + pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propTermStartLsn); Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers); /* - * We will be generating WAL since propEpochStartLsn, so we should set + * We will be generating WAL since propTermStartLsn, so we should set * availableLsn to mark this LSN as the latest available position. */ - wp->availableLsn = wp->propEpochStartLsn; + wp->availableLsn = wp->propTermStartLsn; /* * Proposer's term history is the donor's + its own entry. @@ -1067,12 +1084,12 @@ DetermineEpochStartLsn(WalProposer *wp) if (dth->n_entries > 0) memcpy(wp->propTermHistory.entries, dth->entries, sizeof(TermSwitchEntry) * dth->n_entries); wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; - wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; + wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn; wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", wp->quorum, wp->propTerm, - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + LSN_FORMAT_ARGS(wp->propTermStartLsn), wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, LSN_FORMAT_ARGS(wp->truncateLsn)); @@ -1090,7 +1107,7 @@ DetermineEpochStartLsn(WalProposer *wp) * Safekeepers don't skip header as they need continious stream of * data, so correct LSN for comparison. */ - if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) + if (SkipXLogPageHeader(wp, wp->propTermStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* * However, allow to proceed if last_log_term on the node which @@ -1111,8 +1128,8 @@ DetermineEpochStartLsn(WalProposer *wp) */ disable_core_dump(); wp_log(PANIC, - "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", - LSN_FORMAT_ARGS(wp->propEpochStartLsn), + "collected propTermStartLsn %X/%X, but basebackup LSN %X/%X", + LSN_FORMAT_ARGS(wp->propTermStartLsn), LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } @@ -1623,7 +1640,7 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp) * Like in Raft, we aren't allowed to commit entries from previous * terms, so ignore reported LSN until it gets to epochStartLsn. */ - responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propEpochStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; + responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; } qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); @@ -1656,10 +1673,10 @@ UpdateDonorShmem(WalProposer *wp) * about its position immediately after election before any feedbacks are * sent. */ - if (wp->safekeeper[wp->donor].state >= SS_IDLE) + if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED) { donor = &wp->safekeeper[wp->donor]; - donor_lsn = wp->propEpochStartLsn; + donor_lsn = wp->propTermStartLsn; } /* @@ -1748,7 +1765,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) for (int i = 0; i < wp->n_safekeepers; i++) { Safekeeper *sk = &wp->safekeeper[i]; - bool synced = sk->appendResponse.commitLsn >= wp->propEpochStartLsn; + bool synced = sk->appendResponse.commitLsn >= wp->propTermStartLsn; /* alive safekeeper which is not synced yet; wait for it */ if (sk->state != SS_OFFLINE && !synced) @@ -1772,7 +1789,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) */ BroadcastAppendRequest(wp); - wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propTermStartLsn); /* unreachable */ } } @@ -2378,7 +2395,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_HANDSHAKE_RECV: return_val = "handshake (receiving)"; break; - case SS_VOTING: + case SS_WAIT_VOTING: return_val = "voting"; break; case SS_WAIT_VERDICT: @@ -2387,7 +2404,7 @@ FormatSafekeeperState(Safekeeper *sk) case SS_SEND_ELECTED_FLUSH: return_val = "send-announcement-flush"; break; - case SS_IDLE: + case SS_WAIT_ELECTED: return_val = "idle"; break; case SS_ACTIVE: @@ -2476,8 +2493,8 @@ SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_even * Idle states use read-readiness as a sign that the connection * has been disconnected. */ - case SS_VOTING: - case SS_IDLE: + case SS_WAIT_VOTING: + case SS_WAIT_ELECTED: *sk_events = WL_SOCKET_READABLE; return; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 8d1ae26cac..d116bce806 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -73,12 +73,12 @@ typedef enum * Moved externally by execution of SS_HANDSHAKE_RECV, when we received a * quorum of handshakes. */ - SS_VOTING, + SS_WAIT_VOTING, /* * Already sent voting information, waiting to receive confirmation from - * the node. After receiving, moves to SS_IDLE, if the quorum isn't - * reached yet. + * the node. After receiving, moves to SS_WAIT_ELECTED, if the quorum + * isn't reached yet. */ SS_WAIT_VERDICT, @@ -91,7 +91,7 @@ typedef enum * * Moves to SS_ACTIVE only by call to StartStreaming. */ - SS_IDLE, + SS_WAIT_ELECTED, /* * Active phase, when we acquired quorum and have WAL to send or feedback @@ -751,6 +751,15 @@ typedef struct WalProposerConfig #endif } WalProposerConfig; +typedef enum +{ + /* collecting greetings to determine term to campaign for */ + WPS_COLLECTING_TERMS, + /* campaing started, waiting for votes */ + WPS_CAMPAIGN, + /* successfully elected */ + WPS_ELECTED, +} WalProposerState; /* * WAL proposer state. @@ -758,6 +767,7 @@ typedef struct WalProposerConfig typedef struct WalProposer { WalProposerConfig *config; + WalProposerState state; /* Current walproposer membership configuration */ MembershipConfiguration mconf; @@ -813,10 +823,10 @@ typedef struct WalProposer TermHistory propTermHistory; /* epoch start lsn of the proposer */ - XLogRecPtr propEpochStartLsn; + XLogRecPtr propTermStartLsn; /* Most advanced acceptor epoch */ - term_t donorEpoch; + term_t donorLastLogTerm; /* Most advanced acceptor */ int donor; diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b21184de57..9c34c90002 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -1496,7 +1496,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk) snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port); Assert(!sk->xlogreader); - sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix); + sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix); if (sk->xlogreader == NULL) wpg_log(FATAL, "failed to allocate xlog reader"); } diff --git a/safekeeper/tests/walproposer_sim/walproposer_api.rs b/safekeeper/tests/walproposer_sim/walproposer_api.rs index 6451589e80..82e7a32881 100644 --- a/safekeeper/tests/walproposer_sim/walproposer_api.rs +++ b/safekeeper/tests/walproposer_sim/walproposer_api.rs @@ -511,8 +511,7 @@ impl ApiImpl for SimulationApi { // collected quorum with lower term, then got rejected by next connected safekeeper executor::exit(1, msg.to_owned()); } - if msg.contains("collected propEpochStartLsn") && msg.contains(", but basebackup LSN ") - { + if msg.contains("collected propTermStartLsn") && msg.contains(", but basebackup LSN ") { // sync-safekeepers collected wrong quorum, walproposer collected another quorum executor::exit(1, msg.to_owned()); } @@ -529,7 +528,7 @@ impl ApiImpl for SimulationApi { } fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) { - let prop_lsn = wp.propEpochStartLsn; + let prop_lsn = wp.propTermStartLsn; let prop_term = wp.propTerm; let mut prev_lsn: u64 = 0; @@ -612,7 +611,7 @@ impl ApiImpl for SimulationApi { sk: &mut walproposer::bindings::Safekeeper, ) -> bool { let mut startpos = wp.truncateLsn; - let endpos = wp.propEpochStartLsn; + let endpos = wp.propTermStartLsn; if startpos == endpos { debug!("recovery_download: nothing to download");