diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 0336d63e8d..6b133e4dc4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -99,6 +99,9 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->config = config; wp->api = api; wp->state = WPS_COLLECTING_TERMS; + wp->mconf.generation = INVALID_GENERATION; + wp->mconf.members.len = 0; + wp->mconf.new_members.len = 0; wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list); @@ -170,6 +173,8 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) if (wp->config->proto_version != 2 && wp->config->proto_version != 3) wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version); + if (wp->safekeepers_generation > INVALID_GENERATION && wp->config->proto_version < 3) + wp_log(FATAL, "enabling generations requires protocol version 3"); wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version); /* Fill the greeting package */ @@ -214,7 +219,7 @@ WalProposerFree(WalProposer *wp) static bool WalProposerGenerationsEnabled(WalProposer *wp) { - return wp->safekeepers_generation != 0; + return wp->safekeepers_generation != INVALID_GENERATION; } /* @@ -723,13 +728,176 @@ SendProposerGreeting(Safekeeper *sk) BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV); } +/* + * Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in + * members_safekeepers & new_members_safekeepers to sk. + */ +static void +UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk) +{ + /* members_safekeepers etc are fixed size, sanity check mconf size */ + if (wp->mconf.members.len > MAX_SAFEKEEPERS) + wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len); + if (wp->mconf.new_members.len > MAX_SAFEKEEPERS) + wp_log(FATAL, "too many new_members %d in mconf", wp->mconf.new_members.len); + + /* node id is not known until greeting is received */ + if (sk->state < SS_WAIT_VOTING) + return; + + /* 0 is assumed to be invalid node id, should never happen */ + if (sk->greetResponse.nodeId == 0) + { + wp_log(WARNING, "safekeeper %s:%s sent zero node id", sk->host, sk->port); + return; + } + + for (uint32 i = 0; i < wp->mconf.members.len; i++) + { + SafekeeperId *sk_id = &wp->mconf.members.m[i]; + + if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId) + { + /* + * If mconf or list of safekeepers to connect to changed (the + * latter always currently goes through restart though), + * ResetMemberSafekeeperPtrs is expected to be called before + * UpdateMemberSafekeeperPtr. So, other value suggests that we are + * connected to the same sk under different host name, complain + * about that. + */ + if (wp->members_safekeepers[i] != NULL && wp->members_safekeepers[i] != sk) + { + wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in members[%u] is already mapped to connection slot %lu", + sk_id->node_id, sk_id->host, sk_id->port, i, wp->members_safekeepers[i] - wp->safekeeper); + } + wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in members[%u] mapped to connection slot %lu", + sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper); + wp->members_safekeepers[i] = sk; + } + } + /* repeat for new_members */ + for (uint32 i = 0; i < wp->mconf.new_members.len; i++) + { + SafekeeperId *sk_id = &wp->mconf.new_members.m[i]; + + if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId) + { + if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk) + { + wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] is already mapped to connection slot %lu", + sk_id->node_id, sk_id->host, sk_id->port, i, wp->new_members_safekeepers[i] - wp->safekeeper); + } + wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] mapped to connection slot %lu", + sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper); + wp->new_members_safekeepers[i] = sk; + } + } +} + +/* + * Reset wp->members_safekeepers & new_members_safekeepers and refill them. + * Called after wp changes mconf. + */ +static void +ResetMemberSafekeeperPtrs(WalProposer *wp) +{ + memset(&wp->members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS); + memset(&wp->new_members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS); + for (int i = 0; i < wp->n_safekeepers; i++) + { + if (wp->safekeeper[i].state >= SS_WAIT_VOTING) + UpdateMemberSafekeeperPtr(wp, &wp->safekeeper[i]); + } +} + +static uint32 +MsetQuorum(MemberSet *mset) +{ + Assert(mset->len > 0); + return mset->len / 2 + 1; +} + +/* Does n forms quorum in mset? */ +static bool +MsetHasQuorum(MemberSet *mset, uint32 n) +{ + return n >= MsetQuorum(mset); +} + +/* + * TermsCollected helper for a single member set `mset`. + * + * `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers + * or new_members_safekeepers. + */ +static bool +TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s) +{ + uint32 n_greeted = 0; + + for (uint32 i = 0; i < wp->mconf.members.len; i++) + { + Safekeeper *sk = msk[i]; + + if (sk != NULL && sk->state == SS_WAIT_VOTING) + { + if (n_greeted > 0) + appendStringInfoString(s, ", "); + appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port); + n_greeted++; + } + } + appendStringInfo(s, ", %u/%u total", n_greeted, mset->len); + return MsetHasQuorum(mset, n_greeted); +} + /* * Have we received greeting from enough (quorum) safekeepers to start voting? */ static bool TermsCollected(WalProposer *wp) { - return wp->n_connected >= wp->quorum; + StringInfoData s; /* str for logging */ + bool collected = false; + + /* legacy: generations disabled */ + if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION) + { + collected = wp->n_connected >= wp->quorum; + if (collected) + { + wp->propTerm++; + wp_log(LOG, "walproposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT ", starting voting", wp->quorum, wp->propTerm); + } + return collected; + } + + /* + * With generations enabled, we start campaign only when 1) some mconf is + * actually received 2) we have greetings from majority of members as well + * as from majority of new_members if it exists. + */ + if (wp->mconf.generation == INVALID_GENERATION) + return false; + + initStringInfo(&s); + appendStringInfoString(&s, "mset greeters: "); + if (!TermsCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s)) + goto res; + if (wp->mconf.new_members.len > 0) + { + appendStringInfoString(&s, ", new_mset greeters: "); + if (!TermsCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s)) + goto res; + } + wp->propTerm++; + wp_log(LOG, "walproposer connected to quorum of safekeepers: %s, propTerm=" INT64_FORMAT ", starting voting", s.data, wp->propTerm); + collected = true; + +res: + pfree(s.data); + return collected; } static void @@ -753,13 +921,41 @@ RecvAcceptorGreeting(Safekeeper *sk) pfree(mconf_toml); /* - * Adopt mconf of safekeepers if it is higher. TODO: mconf change should - * restart wp if it started voting. + * Adopt mconf of safekeepers if it is higher. */ if (sk->greetResponse.mconf.generation > wp->mconf.generation) { + /* sanity check before adopting, should never happen */ + if (sk->greetResponse.mconf.members.len == 0) + { + wp_log(FATAL, "mconf %u has zero members", sk->greetResponse.mconf.generation); + } + + /* + * If we at least started campaign, restart wp to get elected in the + * new mconf. Note: in principle once wp is already elected + * re-election is not required, but being conservative here is not + * bad. + * + * TODO: put mconf to shmem to immediately pick it up on start, + * otherwise if some safekeeper(s) misses latest mconf and gets + * connected the first, it may cause redundant restarts here. + * + * More generally, it would be nice to restart walproposer (wiping + * election state) without restarting the process. In particular, that + * would allow sync-safekeepers not to die here if it intersected with + * sk migration (as well as remove 1s delay). + * + * Note that assign_neon_safekeepers also currently restarts the + * process, so during normal migration walproposer may restart twice. + */ + if (wp->state >= WPS_CAMPAIGN) + { + wp_log(FATAL, "restarting to adopt mconf generation %d", sk->greetResponse.mconf.generation); + } MembershipConfigurationFree(&wp->mconf); MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf); + ResetMemberSafekeeperPtrs(wp); /* full conf was just logged above */ wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation); } @@ -767,6 +963,9 @@ RecvAcceptorGreeting(Safekeeper *sk) /* Protocol is all good, move to voting. */ sk->state = SS_WAIT_VOTING; + /* In greeting safekeeper sent its id; update mappings accordingly. */ + UpdateMemberSafekeeperPtr(wp, sk); + /* * Note: it would be better to track the counter on per safekeeper basis, * but at worst walproposer would restart with 'term rejected', so leave @@ -778,12 +977,9 @@ RecvAcceptorGreeting(Safekeeper *sk) /* We're still collecting terms from the majority. */ wp->propTerm = Max(sk->greetResponse.term, wp->propTerm); - /* Quorum is acquried, prepare the vote request. */ + /* Quorum is acquired, prepare the vote request. */ if (TermsCollected(wp)) { - wp->propTerm++; - wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); - wp->state = WPS_CAMPAIGN; wp->voteRequest.pam.tag = 'v'; wp->voteRequest.generation = wp->mconf.generation; @@ -832,8 +1028,8 @@ SendVoteRequest(Safekeeper *sk) &sk->outbuf, wp->config->proto_version); /* We have quorum for voting, send our vote request */ - wp_log(LOG, "requesting vote from %s:%s for generation %u term " UINT64_FORMAT, sk->host, sk->port, - wp->voteRequest.generation, wp->voteRequest.term); + wp_log(LOG, "requesting vote from sk {id = %lu, ep = %s:%s} for generation %u term " UINT64_FORMAT, + sk->greetResponse.nodeId, sk->host, sk->port, wp->voteRequest.generation, wp->voteRequest.term); /* On failure, logging & resetting is handled */ BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_WAIT_VERDICT); /* If successful, wait for read-ready with SS_WAIT_VERDICT */ @@ -851,8 +1047,8 @@ RecvVoteResponse(Safekeeper *sk) return; wp_log(LOG, - "got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", - sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term, + "got VoteResponse from sk {id = %lu, ep = %s:%s}, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", + sk->greetResponse.nodeId, sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), @@ -899,6 +1095,53 @@ RecvVoteResponse(Safekeeper *sk) } } +/* + * VotesCollected helper for a single member set `mset`. + * + * `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers + * or new_members_safekeepers. + */ +static bool +VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s) +{ + uint32 n_votes = 0; + + for (uint32 i = 0; i < wp->mconf.members.len; i++) + { + Safekeeper *sk = msk[i]; + + if (sk != NULL && sk->state == SS_WAIT_ELECTED) + { + Assert(sk->voteResponse.voteGiven); + + /* + * Find the highest vote. NULL check is for the legacy case where + * safekeeper might be not initialized with LSN at all and return + * 0 LSN in the vote response; we still want to set donor to + * something in this case. + */ + if (GetLastLogTerm(sk) > wp->donorLastLogTerm || + (GetLastLogTerm(sk) == wp->donorLastLogTerm && + sk->voteResponse.flushLsn > wp->propTermStartLsn) || + wp->donor == NULL) + { + wp->donorLastLogTerm = GetLastLogTerm(sk); + wp->propTermStartLsn = sk->voteResponse.flushLsn; + wp->donor = sk; + } + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); + + if (n_votes > 0) + appendStringInfoString(s, ", "); + appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port); + n_votes++; + } + } + appendStringInfo(s, ", %u/%u total", n_votes, mset->len); + return MsetHasQuorum(mset, n_votes); +} + + /* * Checks if enough votes has been collected to get elected and if that's the * case finds the highest vote, setting donor, donorLastLogTerm, @@ -907,7 +1150,8 @@ RecvVoteResponse(Safekeeper *sk) static bool VotesCollected(WalProposer *wp) { - int n_ready = 0; + StringInfoData s; /* str for logging */ + bool collected = false; /* assumed to be called only when not elected yet */ Assert(wp->state == WPS_CAMPAIGN); @@ -916,25 +1160,62 @@ VotesCollected(WalProposer *wp) wp->donorLastLogTerm = 0; wp->truncateLsn = InvalidXLogRecPtr; - for (int i = 0; i < wp->n_safekeepers; i++) + /* legacy: generations disabled */ + if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION) { - if (wp->safekeeper[i].state == SS_WAIT_ELECTED) - { - n_ready++; + int n_ready = 0; - if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm || - (GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm && - wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn)) + for (int i = 0; i < wp->n_safekeepers; i++) + { + if (wp->safekeeper[i].state == SS_WAIT_ELECTED) { - wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]); - wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn; - wp->donor = i; + n_ready++; + + if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm || + (GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm && + wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn) || + wp->donor == NULL) + { + wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]); + wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn; + wp->donor = &wp->safekeeper[i]; + } + wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); } - wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); } + collected = n_ready >= wp->quorum; + if (collected) + { + wp_log(LOG, "walproposer elected with %d/%d votes", n_ready, wp->n_safekeepers); + } + return collected; } - return n_ready >= wp->quorum; + /* + * if generations are enabled we're expected to get to voting only when + * mconf is established. + */ + Assert(wp->mconf.generation != INVALID_GENERATION); + + /* + * We must get votes from both msets if both are present. + */ + initStringInfo(&s); + appendStringInfoString(&s, "mset voters: "); + if (!VotesCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s)) + goto res; + if (wp->mconf.new_members.len > 0) + { + appendStringInfoString(&s, ", new_mset voters: "); + if (!VotesCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s)) + goto res; + } + wp_log(LOG, "walproposer elected, %s", s.data); + collected = true; + +res: + pfree(s.data); + return collected; } /* @@ -955,7 +1236,7 @@ HandleElectedProposer(WalProposer *wp) * that only for logical replication (and switching logical walsenders to * neon_walreader is a todo.) */ - if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor])) + if (!wp->api.recovery_download(wp, wp->donor)) { wp_log(FATAL, "failed to download WAL for logical replicaiton"); } @@ -1078,7 +1359,7 @@ ProcessPropStartPos(WalProposer *wp) /* * Proposer's term history is the donor's + its own entry. */ - dth = &wp->safekeeper[wp->donor].voteResponse.termHistory; + dth = &wp->donor->voteResponse.termHistory; wp->propTermHistory.n_entries = dth->n_entries + 1; wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries); if (dth->n_entries > 0) @@ -1086,11 +1367,10 @@ ProcessPropStartPos(WalProposer *wp) wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn; - wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", - wp->quorum, + wp_log(LOG, "walproposer elected in term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", wp->propTerm, LSN_FORMAT_ARGS(wp->propTermStartLsn), - wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port, + wp->donor->host, wp->donor->port, LSN_FORMAT_ARGS(wp->truncateLsn)); /* @@ -1508,6 +1788,14 @@ RecvAppendResponses(Safekeeper *sk) readAnything = true; + /* should never happen: sk is expected to send ERROR instead */ + if (sk->appendResponse.generation != wp->mconf.generation) + { + wp_log(FATAL, "safekeeper {id = %lu, ep = %s:%s} sent response with generation %u, expected %u", + sk->greetResponse.nodeId, sk->host, sk->port, + sk->appendResponse.generation, wp->mconf.generation); + } + if (sk->appendResponse.term > wp->propTerm) { /* @@ -1624,30 +1912,101 @@ CalculateMinFlushLsn(WalProposer *wp) } /* - * Calculate WAL position acknowledged by quorum + * GetAcknowledgedByQuorumWALPosition for a single member set `mset`. + * + * `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers + * or new_members_safekeepers. */ static XLogRecPtr -GetAcknowledgedByQuorumWALPosition(WalProposer *wp) +GetCommittedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk) { XLogRecPtr responses[MAX_SAFEKEEPERS]; /* - * Sort acknowledged LSNs + * Ascending sort acknowledged LSNs. */ - for (int i = 0; i < wp->n_safekeepers; i++) + Assert(mset->len <= MAX_SAFEKEEPERS); + for (uint32 i = 0; i < mset->len; i++) { + Safekeeper *sk = msk[i]; + /* * Like in Raft, we aren't allowed to commit entries from previous - * terms, so ignore reported LSN until it gets to epochStartLsn. + * terms, so ignore reported LSN until it gets to propTermStartLsn. + * + * Note: we ignore sk state, which is ok: before first ack flushLsn is + * 0, and later we just preserve value across reconnections. It would + * be ok to check for SS_ACTIVE as well. */ - responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; + if (sk != NULL && sk->appendResponse.flushLsn >= wp->propTermStartLsn) + { + responses[i] = sk->appendResponse.flushLsn; + } + else + { + responses[i] = 0; + } } - qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); + qsort(responses, mset->len, sizeof(XLogRecPtr), CompareLsn); /* - * Get the smallest LSN committed by quorum + * And get value committed by the quorum. A way to view this: to get the + * highest value committed on the quorum, in the ordered array we skip n - + * n_quorum elements to get to the first (lowest) value present on all sks + * of the highest quorum. */ - return responses[wp->n_safekeepers - wp->quorum]; + return responses[mset->len - MsetQuorum(mset)]; +} + +/* + * Calculate WAL position acknowledged by quorum, i.e. which may be regarded + * committed. + * + * Zero may be returned when there is no quorum of nodes recovered to term start + * lsn which sent feedback yet. + */ +static XLogRecPtr +GetAcknowledgedByQuorumWALPosition(WalProposer *wp) +{ + XLogRecPtr committed; + + /* legacy: generations disabled */ + if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION) + { + XLogRecPtr responses[MAX_SAFEKEEPERS]; + + /* + * Sort acknowledged LSNs + */ + for (int i = 0; i < wp->n_safekeepers; i++) + { + /* + * Like in Raft, we aren't allowed to commit entries from previous + * terms, so ignore reported LSN until it gets to + * propTermStartLsn. + * + * Note: we ignore sk state, which is ok: before first ack + * flushLsn is 0, and later we just preserve value across + * reconnections. It would be ok to check for SS_ACTIVE as well. + */ + responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0; + } + qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn); + + /* + * Get the smallest LSN committed by quorum + */ + return responses[wp->n_safekeepers - wp->quorum]; + } + + committed = GetCommittedMset(wp, &wp->mconf.members, wp->members_safekeepers); + if (wp->mconf.new_members.len > 0) + { + XLogRecPtr new_mset_committed = GetCommittedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers); + + committed = Min(committed, new_mset_committed); + } + return committed; } /* @@ -1662,7 +2021,7 @@ UpdateDonorShmem(WalProposer *wp) int i; XLogRecPtr donor_lsn = InvalidXLogRecPtr; - if (wp->n_votes < wp->quorum) + if (wp->state < WPS_ELECTED) { wp_log(WARNING, "UpdateDonorShmem called before elections are won"); return; @@ -1673,9 +2032,9 @@ UpdateDonorShmem(WalProposer *wp) * about its position immediately after election before any feedbacks are * sent. */ - if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED) + if (wp->donor->state >= SS_WAIT_ELECTED) { - donor = &wp->safekeeper[wp->donor]; + donor = wp->donor; donor_lsn = wp->propTermStartLsn; } @@ -1746,13 +2105,13 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) } /* - * Generally sync is done when majority switched the epoch so we committed - * epochStartLsn and made the majority aware of it, ensuring they are - * ready to give all WAL to pageserver. It would mean whichever majority - * is alive, there will be at least one safekeeper who is able to stream - * WAL to pageserver to make basebackup possible. However, since at the - * moment we don't have any good mechanism of defining the healthy and - * most advanced safekeeper who should push the wal into pageserver and + * Generally sync is done when majority reached propTermStartLsn so we + * committed it and made the majority aware of it, ensuring they are ready + * to give all WAL to pageserver. It would mean whichever majority is + * alive, there will be at least one safekeeper who is able to stream WAL + * to pageserver to make basebackup possible. However, since at the moment + * we don't have any good mechanism of defining the healthy and most + * advanced safekeeper who should push the wal into pageserver and * basically the random one gets connected, to prevent hanging basebackup * (due to pageserver connecting to not-synced-safekeeper) we currently * wait for all seemingly alive safekeepers to get synced. @@ -1774,7 +2133,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) n_synced++; } - if (n_synced >= wp->quorum) + if (newCommitLsn >= wp->propTermStartLsn) { /* A quorum of safekeepers has been synced! */ diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index d116bce806..648b0015ad 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -145,6 +145,7 @@ typedef uint64 NNodeId; * This and following structs pair ones in membership.rs. */ typedef uint32 Generation; +#define INVALID_GENERATION 0 typedef struct SafekeeperId { @@ -771,7 +772,17 @@ typedef struct WalProposer /* Current walproposer membership configuration */ MembershipConfiguration mconf; - /* (n_safekeepers / 2) + 1 */ + /* + * Parallels mconf.members with pointers to the member's slot in + * safekeepers array of connections, or NULL if such member is not + * connected. Helps to avoid looking slot per id through all + * .safekeepers[] when doing quorum checks. + */ + Safekeeper *members_safekeepers[MAX_SAFEKEEPERS]; + /* As above, but for new_members. */ + Safekeeper *new_members_safekeepers[MAX_SAFEKEEPERS]; + + /* (n_safekeepers / 2) + 1. Used for static pre-generations quorum checks. */ int quorum; /* @@ -829,7 +840,7 @@ typedef struct WalProposer term_t donorLastLogTerm; /* Most advanced acceptor */ - int donor; + Safekeeper *donor; /* timeline globally starts at this LSN */ XLogRecPtr timelineStartLsn; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d000dcb69f..ba8de1c01c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -79,7 +79,12 @@ from fixtures.remote_storage import ( default_remote_storage, remote_storage_to_toml_dict, ) -from fixtures.safekeeper.http import SafekeeperHttpClient +from fixtures.safekeeper.http import ( + MembershipConfiguration, + SafekeeperHttpClient, + SafekeeperId, + TimelineCreateRequest, +) from fixtures.safekeeper.utils import wait_walreceivers_absent from fixtures.utils import ( ATTACHMENT_NAME_REGEX, @@ -4839,6 +4844,50 @@ class Safekeeper(LogUtils): wait_until(paused) + @staticmethod + def sks_to_safekeeper_ids(sks: list[Safekeeper]) -> list[SafekeeperId]: + return [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in sks] + + @staticmethod + def mconf_sks(env: NeonEnv, mconf: MembershipConfiguration) -> list[Safekeeper]: + """ + List of Safekeepers which are members in `mconf`. + """ + members_ids = [m.id for m in mconf.members] + new_members_ids = [m.id for m in mconf.new_members] if mconf.new_members is not None else [] + return [sk for sk in env.safekeepers if sk.id in members_ids or sk.id in new_members_ids] + + @staticmethod + def create_timeline( + tenant_id: TenantId, + timeline_id: TimelineId, + ps: NeonPageserver, + mconf: MembershipConfiguration, + members_sks: list[Safekeeper], + ): + """ + Manually create timeline on safekeepers with given (presumably inital) + mconf: figure out LSN from pageserver, bake request and execute it on + given safekeepers. + + Normally done by storcon, but some tests want to do it manually so far. + """ + ps_http_cli = ps.http_client() + # figure out initial LSN. + ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id) + init_lsn = ps_timeline_detail["last_record_lsn"] + log.info(f"initial LSN: {init_lsn}") + # sk timeline creation request expects minor version + pg_version = ps_timeline_detail["pg_version"] * 10000 + # create inital mconf + create_r = TimelineCreateRequest( + tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None + ) + log.info(f"sending timeline create: {create_r.to_json()}") + + for sk in members_sks: + sk.http_client().timeline_create(create_r) + class NeonBroker(LogUtils): """An object managing storage_broker instance""" diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index e409151b76..839e985419 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -25,7 +25,7 @@ class Walreceiver: @dataclass class SafekeeperTimelineStatus: - mconf: Configuration | None + mconf: MembershipConfiguration | None term: int last_log_term: int pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2 @@ -78,17 +78,17 @@ class SafekeeperId: @dataclass -class Configuration: +class MembershipConfiguration: generation: int members: list[SafekeeperId] new_members: list[SafekeeperId] | None @classmethod - def from_json(cls, d: dict[str, Any]) -> Configuration: + def from_json(cls, d: dict[str, Any]) -> MembershipConfiguration: generation = d["generation"] members = d["members"] new_members = d.get("new_members") - return Configuration(generation, members, new_members) + return MembershipConfiguration(generation, members, new_members) def to_json(self) -> str: return json.dumps(self, cls=EnhancedJSONEncoder) @@ -98,7 +98,7 @@ class Configuration: class TimelineCreateRequest: tenant_id: TenantId timeline_id: TimelineId - mconf: Configuration + mconf: MembershipConfiguration # not exactly PgVersion, for example 150002 for 15.2 pg_version: int start_lsn: Lsn @@ -110,13 +110,13 @@ class TimelineCreateRequest: @dataclass class TimelineMembershipSwitchResponse: - previous_conf: Configuration - current_conf: Configuration + previous_conf: MembershipConfiguration + current_conf: MembershipConfiguration @classmethod def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse: - previous_conf = Configuration.from_json(d["previous_conf"]) - current_conf = Configuration.from_json(d["current_conf"]) + previous_conf = MembershipConfiguration.from_json(d["previous_conf"]) + current_conf = MembershipConfiguration.from_json(d["current_conf"]) return TimelineMembershipSwitchResponse(previous_conf, current_conf) @@ -194,7 +194,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): resj = res.json() walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]] # It is always normally not None, it is allowed only to make forward compat tests happy. - mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None + mconf = MembershipConfiguration.from_json(resj["mconf"]) if "mconf" in resj else None return SafekeeperTimelineStatus( mconf=mconf, term=resj["acceptor_state"]["term"], @@ -223,7 +223,9 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): return self.timeline_status(tenant_id, timeline_id).commit_lsn # Get timeline membership configuration. - def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration: + def get_membership( + self, tenant_id: TenantId, timeline_id: TimelineId + ) -> MembershipConfiguration: # make mypy happy return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore @@ -275,7 +277,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): return res_json def timeline_exclude( - self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration + self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration ) -> dict[str, Any]: res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude", @@ -287,7 +289,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): return res_json def membership_switch( - self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration + self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration ) -> TimelineMembershipSwitchResponse: res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index e3d39f9315..a9a6699e5c 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -45,7 +45,7 @@ from fixtures.remote_storage import ( s3_storage, ) from fixtures.safekeeper.http import ( - Configuration, + MembershipConfiguration, SafekeeperHttpClient, SafekeeperId, TimelineCreateRequest, @@ -589,7 +589,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re for sk in env.safekeepers: sk.start() cli = sk.http_client() - mconf = Configuration(generation=0, members=[], new_members=None) + mconf = MembershipConfiguration(generation=0, members=[], new_members=None) # set start_lsn to the beginning of the first segment to allow reading # WAL from there (could you intidb LSN as well). r = TimelineCreateRequest( @@ -1948,7 +1948,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock # Request to switch before timeline creation should fail. - init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None) + init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None) with pytest.raises(requests.exceptions.HTTPError): http_cli.membership_switch(tenant_id, timeline_id, init_conf) @@ -1960,7 +1960,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): http_cli.timeline_create(create_r) # Switch into some conf. - joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2]) + joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2]) resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf) log.info(f"joint switch resp: {resp}") assert resp.previous_conf.generation == 1 @@ -1973,24 +1973,26 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder): assert after_restart.generation == 4 # Switch into non joint conf of which sk is not a member, must fail. - non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None) + non_joint_not_member = MembershipConfiguration( + generation=5, members=[sk_id_2], new_members=None + ) with pytest.raises(requests.exceptions.HTTPError): resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member) # Switch into good non joint conf. - non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None) + non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None) resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint) log.info(f"non joint switch resp: {resp}") assert resp.previous_conf.generation == 4 assert resp.current_conf.generation == 6 # Switch request to lower conf should be rejected. - lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None) + lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None) with pytest.raises(requests.exceptions.HTTPError): http_cli.membership_switch(tenant_id, timeline_id, lower_conf) # Now, exclude sk from the membership, timeline should be deleted. - excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None) + excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None) http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf) with pytest.raises(requests.exceptions.HTTPError): http_cli.timeline_status(tenant_id, timeline_id) @@ -2010,11 +2012,6 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): tenant_id = env.initial_tenant timeline_id = env.initial_timeline - ps = env.pageservers[0] - ps_http_cli = ps.http_client() - - http_clis = [sk.http_client() for sk in env.safekeepers] - config_lines = [ "neon.safekeeper_proto_version = 3", ] @@ -2023,22 +2020,11 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder): # expected to fail because timeline is not created on safekeepers with pytest.raises(Exception, match=r".*timed out.*"): ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s") - # figure out initial LSN. - ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id) - init_lsn = ps_timeline_detail["last_record_lsn"] - log.info(f"initial LSN: {init_lsn}") - # sk timeline creation request expects minor version - pg_version = ps_timeline_detail["pg_version"] * 10000 # create inital mconf - sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers] - mconf = Configuration(generation=1, members=sk_ids, new_members=None) - create_r = TimelineCreateRequest( - tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None + mconf = MembershipConfiguration( + generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None ) - log.info(f"sending timeline create: {create_r.to_json()}") - - for sk_http_cli in http_clis: - sk_http_cli.timeline_create(create_r) + Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers) # Once timeline created endpoint should start. ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index b7c7478e78..c5dd34f64f 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -18,6 +18,7 @@ from fixtures.neon_fixtures import ( Safekeeper, ) from fixtures.remote_storage import RemoteStorageKind +from fixtures.safekeeper.http import MembershipConfiguration from fixtures.utils import skip_in_debug_build if TYPE_CHECKING: @@ -452,20 +453,24 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder): asyncio.run(run_concurrent_computes(env)) +async def assert_query_hangs(endpoint: Endpoint, query: str): + """ + Start on endpoint query which is expected to hang and check that it does. + """ + conn = await endpoint.connect_async() + bg_query = asyncio.create_task(conn.execute(query)) + await asyncio.sleep(2) + assert not bg_query.done() + return bg_query + + # Stop safekeeper and check that query cannot be executed while safekeeper is down. # Query will insert a single row into a table. -async def check_unavailability( - sk: Safekeeper, conn: asyncpg.Connection, key: int, start_delay_sec: int = 2 -): +async def check_unavailability(sk: Safekeeper, ep: Endpoint, key: int, start_delay_sec: int = 2): # shutdown one of two acceptors, that is, majority sk.stop() - bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')")) - - await asyncio.sleep(start_delay_sec) - # ensure that the query has not been executed yet - assert not bg_query.done() - + bg_query = await assert_query_hangs(ep, f"INSERT INTO t values ({key}, 'payload')") # start safekeeper and await the query sk.start() await bg_query @@ -480,10 +485,10 @@ async def run_unavailability(env: NeonEnv, endpoint: Endpoint): await conn.execute("INSERT INTO t values (1, 'payload')") # stop safekeeper and check that query cannot be executed while safekeeper is down - await check_unavailability(env.safekeepers[0], conn, 2) + await check_unavailability(env.safekeepers[0], endpoint, 2) # for the world's balance, do the same with second safekeeper - await check_unavailability(env.safekeepers[1], conn, 3) + await check_unavailability(env.safekeepers[1], endpoint, 3) # check that we can execute queries after restart await conn.execute("INSERT INTO t values (4, 'payload')") @@ -514,15 +519,7 @@ async def run_recovery_uncommitted(env: NeonEnv): # insert with only one safekeeper up to create tail of flushed but not committed WAL sk1.stop() sk2.stop() - conn = await ep.connect_async() - # query should hang, so execute in separate task - bg_query = asyncio.create_task( - conn.execute("insert into t select generate_series(1, 2000), 'payload'") - ) - sleep_sec = 2 - await asyncio.sleep(sleep_sec) - # it must still be not finished - assert not bg_query.done() + await assert_query_hangs(ep, "insert into t select generate_series(1, 2000), 'payload'") # note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers. ep.stop_and_destroy() @@ -559,15 +556,7 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int): # insert with only one sk3 up to create tail of flushed but not committed WAL on it sk1.stop() sk2.stop() - conn = await ep.connect_async() - # query should hang, so execute in separate task - bg_query = asyncio.create_task( - conn.execute("insert into t select generate_series(1, 180000), 'Papaya'") - ) - sleep_sec = 2 - await asyncio.sleep(sleep_sec) - # it must still be not finished - assert not bg_query.done() + await assert_query_hangs(ep, "insert into t select generate_series(1, 180000), 'Papaya'") # note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers. ep.stop_and_destroy() @@ -607,6 +596,132 @@ def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_versi asyncio.run(run_wal_truncation(env, safekeeper_proto_version)) +async def quorum_sanity_single( + env: NeonEnv, + compute_sks_ids: list[int], + members_sks_ids: list[int], + new_members_sks_ids: list[int] | None, + sks_to_stop_ids: list[int], + should_work_when_stopped: bool, +): + """ + *_ids params contain safekeeper node ids; it is assumed they are issued + from 1 and sequentially assigned to env.safekeepers. + """ + members_sks = [env.safekeepers[i - 1] for i in members_sks_ids] + new_members_sks = ( + [env.safekeepers[i - 1] for i in new_members_sks_ids] if new_members_sks_ids else None + ) + sks_to_stop = [env.safekeepers[i - 1] for i in sks_to_stop_ids] + + mconf = MembershipConfiguration( + generation=1, + members=Safekeeper.sks_to_safekeeper_ids(members_sks), + new_members=Safekeeper.sks_to_safekeeper_ids(new_members_sks) if new_members_sks else None, + ) + members_sks = Safekeeper.mconf_sks(env, mconf) + + tenant_id = env.initial_tenant + compute_sks_ids_str = "-".join([str(sk_id) for sk_id in compute_sks_ids]) + members_sks_ids_str = "-".join([str(sk.id) for sk in mconf.members]) + new_members_sks_ids_str = "-".join( + [str(sk.id) for sk in mconf.new_members] if mconf.new_members is not None else [] + ) + sks_to_stop_ids_str = "-".join([str(sk.id) for sk in sks_to_stop]) + log.info( + f"running quorum_sanity_single with compute_sks={compute_sks_ids_str}, members_sks={members_sks_ids_str}, new_members_sks={new_members_sks_ids_str}, sks_to_stop={sks_to_stop_ids_str}, should_work_when_stopped={should_work_when_stopped}" + ) + branch_name = f"test_quorum_single_c{compute_sks_ids_str}_m{members_sks_ids_str}_{new_members_sks_ids_str}_s{sks_to_stop_ids_str}" + timeline_id = env.create_branch(branch_name) + + # create timeline on `members_sks` + Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks) + + config_lines = [ + "neon.safekeeper_proto_version = 3", + ] + ep = env.endpoints.create(branch_name, config_lines=config_lines) + ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids) + ep.safe_psql("create table t(key int, value text)") + + # stop specified sks and check whether writes work + for sk in sks_to_stop: + sk.stop() + if should_work_when_stopped: + log.info("checking that writes still work") + ep.safe_psql("insert into t select generate_series(1, 100), 'Papaya'") + # restarting ep should also be fine + ep.stop() + ep.start() + ep.safe_psql("insert into t select generate_series(1, 100), 'plum'") + bg_query = None + else: + log.info("checking that writes hang") + bg_query = await assert_query_hangs( + ep, "insert into t select generate_series(1, 100), 'Papaya'" + ) + # start again; now they should work + for sk in sks_to_stop: + sk.start() + if bg_query: + log.info("awaiting query") + await bg_query + + +# It's a bit tempting to iterate over all possible combinations, but let's stick +# with this for now. +async def run_quorum_sanity(env: NeonEnv): + # 3 members, all up, should work + await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [], True) + # 3 members, 2/3 up, should work + await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [3], True) + # 3 members, 1/3 up, should not work + await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [2, 3], False) + + # 3 members, all up, should work; wp redundantly talks to 4th. + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], None, [], True) + # 3 members, all up, should work with wp talking to 2 of these 3 + plus one redundant + await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [], True) + # 3 members, 2/3 up, could work but wp talks to different 3s, so it shouldn't + await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [3], False) + + # joint conf of 1-2-3 and 4, all up, should work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [], True) + # joint conf of 1-2-3 and 4, 4 down, shouldn't work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [4], False) + + # joint conf of 1-2-3 and 2-3-4, all up, should work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [], True) + # joint conf of 1-2-3 and 2-3-4, 1 and 4 down, should work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 4], True) + # joint conf of 1-2-3 and 2-3-4, 2 down, should work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2], True) + # joint conf of 1-2-3 and 2-3-4, 3 down, should work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [3], True) + # joint conf of 1-2-3 and 2-3-4, 1 and 2 down, shouldn't work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 2], False) + # joint conf of 1-2-3 and 2-3-4, 2 and 4 down, shouldn't work + await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2, 4], False) + + # joint conf of 1-2-3 and 2-3-4 with wp talking to 2-3-4 only. + await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [], True) + # with 1 down should still be ok + await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [1], True) + # but with 2 down not ok + await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False) + + +# Test various combinations of membership configurations / neon.safekeepers +# (list of safekeepers endpoint connects to) values / up & down safekeepers and +# check that endpont can start and write data when we have quorum and can't when +# we don't. +def test_quorum_sanity(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 4 + env = neon_env_builder.init_start() + + asyncio.run(run_quorum_sanity(env)) + + async def run_segment_init_failure(env: NeonEnv): env.create_branch("test_segment_init_failure") ep = env.endpoints.create_start("test_segment_init_failure")