From 976afcee26fb3c6499e00466cd18a6f0f7150364 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 30 Jan 2025 11:10:22 +0100 Subject: [PATCH] Convert VoteResponse, remove timeline_start_lsn. --- pgxn/neon/walproposer.c | 79 ++++++++++++++++-------------------- pgxn/neon/walproposer.h | 10 ++++- safekeeper/src/safekeeper.rs | 32 +++++++++++---- 3 files changed, 66 insertions(+), 55 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 1044331be2..09a5a91a16 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -794,11 +794,12 @@ RecvVoteResponse(Safekeeper *sk) return; wp_log(LOG, - "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", - sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), + "got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X", + sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term, + sk->voteResponse.voteGiven, + GetHighestTerm(&sk->voteResponse.termHistory), LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), - LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn), - LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn)); + LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn)); /* * In case of acceptor rejecting our vote, bail out, but only if either it @@ -958,7 +959,6 @@ DetermineEpochStartLsn(WalProposer *wp) wp->propEpochStartLsn = InvalidXLogRecPtr; wp->donorEpoch = 0; wp->truncateLsn = InvalidXLogRecPtr; - wp->timelineStartLsn = InvalidXLogRecPtr; for (int i = 0; i < wp->n_safekeepers; i++) { @@ -975,20 +975,6 @@ DetermineEpochStartLsn(WalProposer *wp) wp->donor = i; } wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn); - - if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr) - { - /* timelineStartLsn should be the same everywhere or unknown */ - if (wp->timelineStartLsn != InvalidXLogRecPtr && - wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) - { - wp_log(WARNING, - "inconsistent timelineStartLsn: current %X/%X, received %X/%X", - LSN_FORMAT_ARGS(wp->timelineStartLsn), - LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); - } - wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn; - } } } @@ -1011,22 +997,11 @@ DetermineEpochStartLsn(WalProposer *wp) if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) { wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); - if (wp->timelineStartLsn == InvalidXLogRecPtr) - { - wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp); - } wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); } pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn); - /* - * Safekeepers are setting truncateLsn after timelineStartLsn is known, so - * it should never be zero at this point, if we know timelineStartLsn. - * - * timelineStartLsn can be zero only on the first syncSafekeepers run. - */ - Assert((wp->truncateLsn != InvalidXLogRecPtr) || - (wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn)); + Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers); /* * We will be generating WAL since propEpochStartLsn, so we should set @@ -1142,14 +1117,8 @@ SendProposerElected(Safekeeper *sk) { /* safekeeper is empty or no common point, start from the beginning */ sk->startStreamingAt = wp->propTermHistory.entries[0].lsn; - wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u", - sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries); - - /* - * wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline - * is created manually (test_s3_wal_replay) - */ - Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr); + wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u", + sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries); } else { @@ -1178,12 +1147,12 @@ SendProposerElected(Safekeeper *sk) msg.term = wp->propTerm; msg.startStreamingAt = sk->startStreamingAt; msg.termHistory = &wp->propTermHistory; - msg.timelineStartLsn = wp->timelineStartLsn; + msg.timelineStartLsn = 0; lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0; wp_log(LOG, - "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", - sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); + "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s", + sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port); PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version); if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH)) @@ -1897,7 +1866,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf pq_sendint64_le(buf, m->termHistory->entries[i].term); pq_sendint64_le(buf, m->termHistory->entries[i].lsn); } - pq_sendint64_le(buf, m->timelineStartLsn); + pq_sendint64_le(buf, 0); break; } case 'a': @@ -2008,10 +1977,11 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) /* parse it */ s.data = buf; s.len = buf_size; + s.maxlen = buf_size; s.cursor = 0; /* removeme tag check after converting all msgs */ - if (wp->config->proto_version == 3 && anymsg->tag == 'g') + if (wp->config->proto_version == 3 && (anymsg->tag == 'g' || anymsg->tag == 'v')) { tag = pq_getmsgbyte(&s); if (tag != anymsg->tag) @@ -2033,6 +2003,25 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) pq_getmsgend(&s); return true; } + case 'v': + { + VoteResponse *msg = (VoteResponse *) anymsg; + + msg->generation = pq_getmsgint32(&s); + msg->term = pq_getmsgint64(&s); + msg->voteGiven = pq_getmsgbyte(&s); + msg->flushLsn = pq_getmsgint64(&s); + msg->truncateLsn = pq_getmsgint64(&s); + msg->termHistory.n_entries = pq_getmsgint32(&s); + msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries); + for (uint32 i = 0; i < msg->termHistory.n_entries; i++) + { + msg->termHistory.entries[i].term = pq_getmsgint64(&s); + msg->termHistory.entries[i].lsn = pq_getmsgint64(&s); + } + pq_getmsgend(&s); + return true; + } default: { wp_log(FATAL, "unexpected message tag %c to read", (char) tag); @@ -2078,7 +2067,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) msg->termHistory.entries[i].term = pq_getmsgint64_le(&s); msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s); } - msg->timelineStartLsn = pq_getmsgint64_le(&s); + pq_getmsgint64_le(&s); /* timelineStartLsn */ pq_getmsgend(&s); return true; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 94897eed06..f865634f91 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -269,8 +269,15 @@ typedef struct TermHistory typedef struct VoteResponse { AcceptorProposerMessage apm; + + /* + * Membership conf generation. It's redundant because on mismatch + * safekeeper is expected to ERROR the connection, but let's sanity check + * it. + */ + Generation generation; term_t term; - uint64 voteGiven; + uint8 voteGiven; /* * Safekeeper flush_lsn (end of WAL) + history of term switches allow @@ -280,7 +287,6 @@ typedef struct VoteResponse XLogRecPtr truncateLsn; /* minimal LSN which may be needed for* * recovery of some safekeeper */ TermHistory termHistory; - XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */ } VoteResponse; /* diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 0eaade076b..bf6ec599cb 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -253,14 +253,14 @@ pub struct VoteRequestV2 { /// Vote itself, sent from safekeeper to proposer #[derive(Debug, Serialize)] pub struct VoteResponse { + generation: Generation, pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date. - vote_given: u64, // fixme u64 due to padding + vote_given: bool, // Safekeeper flush_lsn (end of WAL) + history of term switches allow // proposer to choose the most advanced one. pub flush_lsn: Lsn, truncate_lsn: Lsn, pub term_history: TermHistory, - timeline_start_lsn: Lsn, } /* @@ -674,7 +674,8 @@ impl AcceptorProposerMessage { pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> { // TODO remove after converting all msgs if proto_version == SK_PROTO_VERSION_3 - && (matches!(self, AcceptorProposerMessage::Greeting(_))) + && (matches!(self, AcceptorProposerMessage::Greeting(_)) + || matches!(self, AcceptorProposerMessage::VoteResponse(_))) { match self { AcceptorProposerMessage::Greeting(msg) => { @@ -683,6 +684,19 @@ impl AcceptorProposerMessage { Self::serialize_mconf(buf, &msg.mconf); buf.put_u64(msg.term) } + AcceptorProposerMessage::VoteResponse(msg) => { + buf.put_u8('v' as u8); + buf.put_u32(msg.generation); + buf.put_u64(msg.term); + buf.put_u8(msg.vote_given as u8); + buf.put_u64(msg.flush_lsn.into()); + buf.put_u64(msg.truncate_lsn.into()); + buf.put_u32(msg.term_history.0.len() as u32); + for e in &msg.term_history.0 { + buf.put_u64(e.term); + buf.put_u64(e.lsn.into()); + } + } _ => bail!("not impl"), } Ok(()) @@ -696,9 +710,10 @@ impl AcceptorProposerMessage { buf.put_u64_le(msg.node_id.0); } AcceptorProposerMessage::VoteResponse(msg) => { + // v2 didn't have generation, had u64 vote_given and timeline_start_lsn buf.put_u64_le('v' as u64); buf.put_u64_le(msg.term); - buf.put_u64_le(msg.vote_given); + buf.put_u64_le(msg.vote_given as u64); buf.put_u64_le(msg.flush_lsn.into()); buf.put_u64_le(msg.truncate_lsn.into()); buf.put_u32_le(msg.term_history.0.len() as u32); @@ -706,7 +721,8 @@ impl AcceptorProposerMessage { buf.put_u64_le(e.term); buf.put_u64_le(e.lsn.into()); } - buf.put_u64_le(msg.timeline_start_lsn.into()); + // removed timeline_start_lsn + buf.put_u64_le(0); } AcceptorProposerMessage::AppendResponse(msg) => { buf.put_u64_le('a' as u64); @@ -908,12 +924,12 @@ where self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { + generation: self.state.mconf.generation, term: self.state.acceptor_state.term, - vote_given: false as u64, + vote_given: false, flush_lsn: self.flush_lsn(), truncate_lsn: self.state.inmem.peer_horizon_lsn, term_history: self.get_term_history(), - timeline_start_lsn: self.state.timeline_start_lsn, }; if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); @@ -922,7 +938,7 @@ where self.state.finish_change(&state).await?; resp.term = self.state.acceptor_state.term; - resp.vote_given = true as u64; + resp.vote_given = true; } info!("processed {:?}: sending {:?}", msg, &resp); Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))