Convert VoteResponse, remove timeline_start_lsn.

This commit is contained in:
Arseny Sher
2025-01-30 11:10:22 +01:00
parent 20e974ecdd
commit 976afcee26
3 changed files with 66 additions and 55 deletions

View File

@@ -794,11 +794,12 @@ RecvVoteResponse(Safekeeper *sk)
return;
wp_log(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory),
"got VoteResponse from acceptor %s:%s, generation=%u, term=%lu, voteGiven=%u, last_log_term=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
sk->host, sk->port, sk->voteResponse.generation, sk->voteResponse.term,
sk->voteResponse.voteGiven,
GetHighestTerm(&sk->voteResponse.termHistory),
LSN_FORMAT_ARGS(sk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn),
LSN_FORMAT_ARGS(sk->voteResponse.timelineStartLsn));
LSN_FORMAT_ARGS(sk->voteResponse.truncateLsn));
/*
* In case of acceptor rejecting our vote, bail out, but only if either it
@@ -958,7 +959,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->propEpochStartLsn = InvalidXLogRecPtr;
wp->donorEpoch = 0;
wp->truncateLsn = InvalidXLogRecPtr;
wp->timelineStartLsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
{
@@ -975,20 +975,6 @@ DetermineEpochStartLsn(WalProposer *wp)
wp->donor = i;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (wp->safekeeper[i].voteResponse.timelineStartLsn != InvalidXLogRecPtr)
{
/* timelineStartLsn should be the same everywhere or unknown */
if (wp->timelineStartLsn != InvalidXLogRecPtr &&
wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn)
{
wp_log(WARNING,
"inconsistent timelineStartLsn: current %X/%X, received %X/%X",
LSN_FORMAT_ARGS(wp->timelineStartLsn),
LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn));
}
wp->timelineStartLsn = wp->safekeeper[i].voteResponse.timelineStartLsn;
}
}
}
@@ -1011,22 +997,11 @@ DetermineEpochStartLsn(WalProposer *wp)
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
{
wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp);
if (wp->timelineStartLsn == InvalidXLogRecPtr)
{
wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp);
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
/*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
* it should never be zero at this point, if we know timelineStartLsn.
*
* timelineStartLsn can be zero only on the first syncSafekeepers run.
*/
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
(wp->config->syncSafekeepers && wp->truncateLsn == wp->timelineStartLsn));
Assert(wp->truncateLsn != InvalidXLogRecPtr || wp->config->syncSafekeepers);
/*
* We will be generating WAL since propEpochStartLsn, so we should set
@@ -1142,14 +1117,8 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/*
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
* is created manually (test_s3_wal_replay)
*/
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), wp->propTermHistory.n_entries);
}
else
{
@@ -1178,12 +1147,12 @@ SendProposerElected(Safekeeper *sk)
msg.term = wp->propTerm;
msg.startStreamingAt = sk->startStreamingAt;
msg.termHistory = &wp->propTermHistory;
msg.timelineStartLsn = wp->timelineStartLsn;
msg.timelineStartLsn = 0;
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn));
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
@@ -1897,7 +1866,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
pq_sendint64_le(buf, m->timelineStartLsn);
pq_sendint64_le(buf, 0);
break;
}
case 'a':
@@ -2008,10 +1977,11 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
/* parse it */
s.data = buf;
s.len = buf_size;
s.maxlen = buf_size;
s.cursor = 0;
/* removeme tag check after converting all msgs */
if (wp->config->proto_version == 3 && anymsg->tag == 'g')
if (wp->config->proto_version == 3 && (anymsg->tag == 'g' || anymsg->tag == 'v'))
{
tag = pq_getmsgbyte(&s);
if (tag != anymsg->tag)
@@ -2033,6 +2003,25 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
pq_getmsgend(&s);
return true;
}
case 'v':
{
VoteResponse *msg = (VoteResponse *) anymsg;
msg->generation = pq_getmsgint32(&s);
msg->term = pq_getmsgint64(&s);
msg->voteGiven = pq_getmsgbyte(&s);
msg->flushLsn = pq_getmsgint64(&s);
msg->truncateLsn = pq_getmsgint64(&s);
msg->termHistory.n_entries = pq_getmsgint32(&s);
msg->termHistory.entries = palloc(sizeof(TermSwitchEntry) * msg->termHistory.n_entries);
for (uint32 i = 0; i < msg->termHistory.n_entries; i++)
{
msg->termHistory.entries[i].term = pq_getmsgint64(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64(&s);
}
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
@@ -2078,7 +2067,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
msg->termHistory.entries[i].term = pq_getmsgint64_le(&s);
msg->termHistory.entries[i].lsn = pq_getmsgint64_le(&s);
}
msg->timelineStartLsn = pq_getmsgint64_le(&s);
pq_getmsgint64_le(&s); /* timelineStartLsn */
pq_getmsgend(&s);
return true;
}

View File

@@ -269,8 +269,15 @@ typedef struct TermHistory
typedef struct VoteResponse
{
AcceptorProposerMessage apm;
/*
* Membership conf generation. It's redundant because on mismatch
* safekeeper is expected to ERROR the connection, but let's sanity check
* it.
*/
Generation generation;
term_t term;
uint64 voteGiven;
uint8 voteGiven;
/*
* Safekeeper flush_lsn (end of WAL) + history of term switches allow
@@ -280,7 +287,6 @@ typedef struct VoteResponse
XLogRecPtr truncateLsn; /* minimal LSN which may be needed for*
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
/*

View File

@@ -253,14 +253,14 @@ pub struct VoteRequestV2 {
/// Vote itself, sent from safekeeper to proposer
#[derive(Debug, Serialize)]
pub struct VoteResponse {
generation: Generation,
pub term: Term, // safekeeper's current term; if it is higher than proposer's, the compute is out of date.
vote_given: u64, // fixme u64 due to padding
vote_given: bool,
// Safekeeper flush_lsn (end of WAL) + history of term switches allow
// proposer to choose the most advanced one.
pub flush_lsn: Lsn,
truncate_lsn: Lsn,
pub term_history: TermHistory,
timeline_start_lsn: Lsn,
}
/*
@@ -674,7 +674,8 @@ impl AcceptorProposerMessage {
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
// TODO remove after converting all msgs
if proto_version == SK_PROTO_VERSION_3
&& (matches!(self, AcceptorProposerMessage::Greeting(_)))
&& (matches!(self, AcceptorProposerMessage::Greeting(_))
|| matches!(self, AcceptorProposerMessage::VoteResponse(_)))
{
match self {
AcceptorProposerMessage::Greeting(msg) => {
@@ -683,6 +684,19 @@ impl AcceptorProposerMessage {
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
AcceptorProposerMessage::VoteResponse(msg) => {
buf.put_u8('v' as u8);
buf.put_u32(msg.generation);
buf.put_u64(msg.term);
buf.put_u8(msg.vote_given as u8);
buf.put_u64(msg.flush_lsn.into());
buf.put_u64(msg.truncate_lsn.into());
buf.put_u32(msg.term_history.0.len() as u32);
for e in &msg.term_history.0 {
buf.put_u64(e.term);
buf.put_u64(e.lsn.into());
}
}
_ => bail!("not impl"),
}
Ok(())
@@ -696,9 +710,10 @@ impl AcceptorProposerMessage {
buf.put_u64_le(msg.node_id.0);
}
AcceptorProposerMessage::VoteResponse(msg) => {
// v2 didn't have generation, had u64 vote_given and timeline_start_lsn
buf.put_u64_le('v' as u64);
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.vote_given);
buf.put_u64_le(msg.vote_given as u64);
buf.put_u64_le(msg.flush_lsn.into());
buf.put_u64_le(msg.truncate_lsn.into());
buf.put_u32_le(msg.term_history.0.len() as u32);
@@ -706,7 +721,8 @@ impl AcceptorProposerMessage {
buf.put_u64_le(e.term);
buf.put_u64_le(e.lsn.into());
}
buf.put_u64_le(msg.timeline_start_lsn.into());
// removed timeline_start_lsn
buf.put_u64_le(0);
}
AcceptorProposerMessage::AppendResponse(msg) => {
buf.put_u64_le('a' as u64);
@@ -908,12 +924,12 @@ where
self.wal_store.flush_wal().await?;
// initialize with refusal
let mut resp = VoteResponse {
generation: self.state.mconf.generation,
term: self.state.acceptor_state.term,
vote_given: false as u64,
vote_given: false,
flush_lsn: self.flush_lsn(),
truncate_lsn: self.state.inmem.peer_horizon_lsn,
term_history: self.get_term_history(),
timeline_start_lsn: self.state.timeline_start_lsn,
};
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.start_change();
@@ -922,7 +938,7 @@ where
self.state.finish_change(&state).await?;
resp.term = self.state.acceptor_state.term;
resp.vote_given = true as u64;
resp.vote_given = true;
}
info!("processed {:?}: sending {:?}", msg, &resp);
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))