greetings

This commit is contained in:
Arseny Sher
2025-03-20 21:49:08 +01:00
parent 5426272f14
commit a2cbf64797

View File

@@ -728,14 +728,14 @@ SendProposerGreeting(Safekeeper *sk)
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
/*
/*
* Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in
* members_safekeepers & new_members_safekeepers to sk.
*/
static void
UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
/* members_safekeepers etc are fixed size, sanity check mconf */
/* members_safekeepers etc are fixed size, sanity check mconf size */
if (wp->mconf.members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len);
if (wp->mconf.new_members.len > MAX_SAFEKEEPERS)
@@ -744,7 +744,7 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
/* node id is not known until greeting is received */
if (sk->state < SS_WAIT_VOTING)
return;
/* 0 is assumed to be invalid node id, should never happen */
if (sk->greetResponse.nodeId == 0)
{
@@ -792,10 +792,10 @@ UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->new_members_safekeepers[i] = sk;
}
}
}
}
/*
/*
* Reset wp->members_safekeepers & new_members_safekeepers and refill them.
* Called after wp changes mconf.
*/
@@ -811,12 +811,41 @@ ResetMemberSafekeeperPtrs(WalProposer *wp)
}
}
/* Does n forms quorum in mset? */
static bool MsetHasQuorum(MemberSet *mset, uint32 n)
{
return n >= (mset->len / 2 + 1);
}
/* TermsCollected helper for single member set. */
static bool TermsCollectedMset(WalProposer *wp, MemberSet *mset, StringInfo s)
{
uint32 n_greeted = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = wp->members_safekeepers[i];
if (sk != NULL && sk->state == SS_WAIT_VOTING)
{
if (n_greeted > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s }", sk->greetResponse.nodeId, sk->host, sk->port);
n_greeted++;
}
}
return MsetHasQuorum(mset, n_greeted);
}
/*
* Have we received greeting from enough (quorum) safekeepers to start voting?
*/
static bool
TermsCollected(WalProposer *wp)
{
StringInfoData s; /* str for logging */
bool res = false;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
@@ -835,7 +864,24 @@ TermsCollected(WalProposer *wp)
*/
if (wp->mconf.generation == INVALID_GENERATION)
return false;
return false;
initStringInfo(&s);
appendStringInfoString(&s, "mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.members, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.new_members, &s))
goto res;
}
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum of safekeepers, propTerm=" INT64_FORMAT ", starting voting. %s", wp->propTerm, s.data);
res = true;
res:
pfree(s.data);
return res;
}
static void