diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 09a5a91a16..284ebe55e2 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -171,6 +171,7 @@ WalProposerFree(WalProposer *wp) Assert(sk->outbuf.data != NULL); pfree(sk->outbuf.data); + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); sk->voteResponse.termHistory.entries = NULL; @@ -1143,16 +1144,16 @@ SendProposerElected(Safekeeper *sk) Assert(sk->startStreamingAt <= wp->availableLsn); - msg.tag = 'e'; + msg.apm.tag = 'e'; msg.term = wp->propTerm; msg.startStreamingAt = sk->startStreamingAt; msg.termHistory = &wp->propTermHistory; - msg.timelineStartLsn = 0; lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0; wp_log(LOG, - "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s", - sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port); + "sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s", + sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), + lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port); PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version); if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH)) @@ -1766,14 +1767,13 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf resetStringInfo(buf); /* removeme tag check after converting all msgs */ - if (proto_version == 3 && (msg->tag == 'g' || msg->tag == 'v')) + if (proto_version == 3 && (msg->tag == 'g' || msg->tag == 'v' || msg->tag == 'e')) { /* * v2 sends structs for some messages as is, so commonly send tag only * for v3 */ - if (proto_version == 3) - pq_sendint8(buf, msg->tag); + pq_sendint8(buf, msg->tag); switch (msg->tag) { @@ -1798,6 +1798,21 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf break; } + case 'e': + { + ProposerElected *m = (ProposerElected *) msg; + + pq_sendint32(buf, m->generation); + pq_sendint64(buf, m->term); + pq_sendint64(buf, m->startStreamingAt); + pq_sendint32(buf, m->termHistory->n_entries); + for (uint32 i = 0; i < m->termHistory->n_entries; i++) + { + pq_sendint64(buf, m->termHistory->entries[i].term); + pq_sendint64(buf, m->termHistory->entries[i].lsn); + } + break; + } default: wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); } @@ -1857,7 +1872,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf { ProposerElected *m = (ProposerElected *) msg; - pq_sendint64_le(buf, m->tag); + pq_sendint64_le(buf, m->apm.tag); pq_sendint64_le(buf, m->term); pq_sendint64_le(buf, m->startStreamingAt); pq_sendint32_le(buf, m->termHistory->n_entries); @@ -1866,7 +1881,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf pq_sendint64_le(buf, m->termHistory->entries[i].term); pq_sendint64_le(buf, m->termHistory->entries[i].lsn); } - pq_sendint64_le(buf, 0); + pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */ break; } case 'a': @@ -2510,6 +2525,8 @@ MembershipConfigurationFree(MembershipConfiguration *mconf) { if (mconf->members.m) pfree(mconf->members.m); + mconf->members.m = NULL; if (mconf->new_members.m) pfree(mconf->new_members.m); + mconf->new_members.m = NULL; } diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index f865634f91..82a11fb99d 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -295,14 +295,13 @@ typedef struct VoteResponse */ typedef struct ProposerElected { - uint64 tag; + AcceptorProposerMessage apm; + Generation generation; /* membership conf generation */ term_t term; /* proposer will send since this point */ XLogRecPtr startStreamingAt; /* history of term switches up to this proposer */ TermHistory *termHistory; - /* timeline globally starts at this LSN */ - XLogRecPtr timelineStartLsn; } ProposerElected; /* diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 5e286155d2..9c788d9684 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -8,7 +8,7 @@ use anyhow::Context; use postgres_backend::QueryError; -use safekeeper_api::membership::Configuration; +use safekeeper_api::membership::{Configuration, INVALID_GENERATION}; use safekeeper_api::{ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -133,6 +133,7 @@ async fn send_proposer_elected( let history = TermHistory(history_entries); let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { + generation: INVALID_GENERATION, term, start_streaming_at: lsn, term_history: history, diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 3e66896af1..b3a14b2dd0 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -306,6 +306,7 @@ async fn recover( // truncate WAL locally let pe = ProposerAcceptorMessage::Elected(ProposerElected { + generation: INVALID_GENERATION, term: donor.term, start_streaming_at: last_common_point.lsn, term_history: donor_th, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 19f3336042..7dc286944b 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -63,8 +63,28 @@ impl TermHistory { TermHistory(Vec::new()) } - // Parse TermHistory as n_entries followed by TermLsn pairs + // Parse TermHistory as n_entries followed by TermLsn pairs in network order. pub fn from_bytes(bytes: &mut Bytes) -> Result { + let n_entries = bytes + .get_u32_f() + .with_context(|| "TermHistory misses len")?; + let mut res = Vec::with_capacity(n_entries as usize); + for i in 0..n_entries { + let term = bytes + .get_u64_f() + .with_context(|| format!("TermHistory pos {} misses term", i))?; + let lsn = bytes + .get_u64_f() + .with_context(|| format!("TermHistory pos {} misses lsn", i))? + .into(); + res.push(TermLsn { term, lsn }) + } + Ok(TermHistory(res)) + } + + // Parse TermHistory as n_entries followed by TermLsn pairs in LE order. + // TODO remove once v2 protocol is fully dropped. + pub fn from_bytes_le(bytes: &mut Bytes) -> Result { if bytes.remaining() < 4 { bail!("TermHistory misses len"); } @@ -269,19 +289,12 @@ pub struct VoteResponse { */ #[derive(Debug)] pub struct ProposerElected { + pub generation: Generation, pub term: Term, pub start_streaming_at: Lsn, pub term_history: TermHistory, } -/// V2 of the message; exists as a struct because we (de)serialized it as is. -pub struct ProposerElectedV2 { - pub term: Term, - pub start_streaming_at: Lsn, - pub term_history: TermHistory, - pub timeline_start_lsn: Lsn, -} - /// Request with WAL message sent from proposer to safekeeper. Along the way it /// communicates commit_lsn. #[derive(Debug)] @@ -463,7 +476,7 @@ impl ProposerAcceptorMessage { pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result { // TODO remove after converting all msgs let t = msg_bytes[0] as char; - if proto_version == SK_PROTO_VERSION_3 && (t == 'g' || t == 'v') { + if proto_version == SK_PROTO_VERSION_3 && (t == 'g' || t == 'v' || t == 'e') { if msg_bytes.is_empty() { bail!("ProposerAcceptorMessage is not complete: missing tag"); } @@ -504,6 +517,24 @@ impl ProposerAcceptorMessage { let v = VoteRequest { generation, term }; Ok(ProposerAcceptorMessage::VoteRequest(v)) } + 'e' => { + let generation = msg_bytes + .get_u32_f() + .with_context(|| "reading generation")?; + let term = msg_bytes.get_u64_f().with_context(|| "reading term")?; + let start_streaming_at: Lsn = msg_bytes + .get_u64_f() + .with_context(|| "reading start_streaming_at")? + .into(); + let term_history = TermHistory::from_bytes(&mut msg_bytes)?; + let msg = ProposerElected { + generation, + term, + start_streaming_at, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) + } _ => bail!("unknown proposer-acceptor message tag: {}", tag), } // TODO remove proto_version == 3 after converting all msgs @@ -544,12 +575,13 @@ impl ProposerAcceptorMessage { } let term = msg_bytes.get_u64_le(); let start_streaming_at = msg_bytes.get_u64_le().into(); - let term_history = TermHistory::from_bytes(&mut msg_bytes)?; + let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?; if msg_bytes.remaining() < 8 { bail!("ProposerElected message is not complete"); } let _timeline_start_lsn = msg_bytes.get_u64_le(); let msg = ProposerElected { + generation: INVALID_GENERATION, term, start_streaming_at, term_history,