diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 9aac3771d5..6d25bf3ea7 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -728,14 +728,14 @@ SendProposerGreeting(Safekeeper *sk) BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } -/* +/* * Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in * members_safekeepers & new_members_safekeepers to sk. */ static void UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk) { - /* members_safekeepers etc are fixed size, sanity check mconf */ + /* members_safekeepers etc are fixed size, sanity check mconf size */ if (wp->mconf.members.len > MAX_SAFEKEEPERS) wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len); if (wp->mconf.new_members.len > MAX_SAFEKEEPERS) @@ -744,7 +744,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk) /* node id is not known until greeting is received */ if (sk->state < SS_WAIT_VOTING) return; - + /* 0 is assumed to be invalid node id, should never happen */ if (sk->greetResponse.nodeId == 0) { @@ -792,10 +792,10 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk) sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper); wp->new_members_safekeepers[i] = sk; } - } + } } -/* +/* * Reset wp->members_safekeepers & new_members_safekeepers and refill them. * Called after wp changes mconf. */ @@ -811,12 +811,41 @@ ResetMemberSafekeeperPtrs(WalProposer *wp) } } +/* Does n forms quorum in mset? */ +static bool MsetHasQuorum(MemberSet *mset, uint32 n) +{ + return n >= (mset->len / 2 + 1); +} + +/* TermsCollected helper for single member set. */ +static bool TermsCollectedMset(WalProposer *wp, MemberSet *mset, StringInfo s) +{ + uint32 n_greeted = 0; + + for (uint32 i = 0; i < wp->mconf.members.len; i++) + { + Safekeeper *sk = wp->members_safekeepers[i]; + + if (sk != NULL && sk->state == SS_WAIT_VOTING) + { + if (n_greeted > 0) + appendStringInfoString(s, ", "); + appendStringInfo(s, "{id = %lu, ep = %s:%s }", sk->greetResponse.nodeId, sk->host, sk->port); + n_greeted++; + } + } + return MsetHasQuorum(mset, n_greeted); +} + /* * Have we received greeting from enough (quorum) safekeepers to start voting? */ static bool TermsCollected(WalProposer *wp) { + StringInfoData s; /* str for logging */ + bool res = false; + /* legacy: generations disabled */ if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION) { @@ -835,7 +864,24 @@ TermsCollected(WalProposer *wp) */ if (wp->mconf.generation == INVALID_GENERATION) return false; - return false; + + initStringInfo(&s); + appendStringInfoString(&s, "mset greeters: "); + if (!TermsCollectedMset(wp, &wp->mconf.members, &s)) + goto res; + if (wp->mconf.new_members.len > 0) + { + appendStringInfoString(&s, ", new_mset greeters: "); + if (!TermsCollectedMset(wp, &wp->mconf.new_members, &s)) + goto res; + } + wp->propTerm++; + wp_log(LOG, "walproposer connected to quorum of safekeepers, propTerm=" INT64_FORMAT ", starting voting. %s", wp->propTerm, s.data); + res = true; + +res: + pfree(s.data); + return res; } static void