convert ProposerElected

This commit is contained in:
Arseny Sher
2025-01-30 12:23:27 +01:00
parent 217309c7ef
commit 0d0cd16ea2
5 changed files with 74 additions and 24 deletions

View File

@@ -171,6 +171,7 @@ WalProposerFree(WalProposer *wp)
Assert(sk->outbuf.data != NULL);
pfree(sk->outbuf.data);
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -1143,16 +1144,16 @@ SendProposerElected(Safekeeper *sk)
Assert(sk->startStreamingAt <= wp->availableLsn);
msg.tag = 'e';
msg.apm.tag = 'e';
msg.term = wp->propTerm;
msg.startStreamingAt = sk->startStreamingAt;
msg.termHistory = &wp->propTermHistory;
msg.timelineStartLsn = 0;
lastCommonTerm = idx >= 0 ? wp->propTermHistory.entries[idx].term : 0;
wp_log(LOG,
"sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
"sending elected msg to node " UINT64_FORMAT " generation=%u term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s",
sk->greetResponse.nodeId, msg.generation, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt),
lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port);
PAMessageSerialize(wp, (ProposerAcceptorMessage *) &msg, &sk->outbuf, wp->config->proto_version);
if (!AsyncWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_SEND_ELECTED_FLUSH))
@@ -1766,14 +1767,13 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
resetStringInfo(buf);
/* removeme tag check after converting all msgs */
if (proto_version == 3 && (msg->tag == 'g' || msg->tag == 'v'))
if (proto_version == 3 && (msg->tag == 'g' || msg->tag == 'v' || msg->tag == 'e'))
{
/*
* v2 sends structs for some messages as is, so commonly send tag only
* for v3
*/
if (proto_version == 3)
pq_sendint8(buf, msg->tag);
pq_sendint8(buf, msg->tag);
switch (msg->tag)
{
@@ -1798,6 +1798,21 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
break;
}
case 'e':
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint32(buf, m->generation);
pq_sendint64(buf, m->term);
pq_sendint64(buf, m->startStreamingAt);
pq_sendint32(buf, m->termHistory->n_entries);
for (uint32 i = 0; i < m->termHistory->n_entries; i++)
{
pq_sendint64(buf, m->termHistory->entries[i].term);
pq_sendint64(buf, m->termHistory->entries[i].lsn);
}
break;
}
default:
wp_log(FATAL, "unexpected message type %c to serialize", msg->tag);
}
@@ -1857,7 +1872,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
{
ProposerElected *m = (ProposerElected *) msg;
pq_sendint64_le(buf, m->tag);
pq_sendint64_le(buf, m->apm.tag);
pq_sendint64_le(buf, m->term);
pq_sendint64_le(buf, m->startStreamingAt);
pq_sendint32_le(buf, m->termHistory->n_entries);
@@ -1866,7 +1881,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
pq_sendint64_le(buf, m->termHistory->entries[i].term);
pq_sendint64_le(buf, m->termHistory->entries[i].lsn);
}
pq_sendint64_le(buf, 0);
pq_sendint64_le(buf, 0); /* removed timeline_start_lsn */
break;
}
case 'a':
@@ -2510,6 +2525,8 @@ MembershipConfigurationFree(MembershipConfiguration *mconf)
{
if (mconf->members.m)
pfree(mconf->members.m);
mconf->members.m = NULL;
if (mconf->new_members.m)
pfree(mconf->new_members.m);
mconf->new_members.m = NULL;
}

View File

@@ -295,14 +295,13 @@ typedef struct VoteResponse
*/
typedef struct ProposerElected
{
uint64 tag;
AcceptorProposerMessage apm;
Generation generation; /* membership conf generation */
term_t term;
/* proposer will send since this point */
XLogRecPtr startStreamingAt;
/* history of term switches up to this proposer */
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
/*

View File

@@ -8,7 +8,7 @@
use anyhow::Context;
use postgres_backend::QueryError;
use safekeeper_api::membership::Configuration;
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -133,6 +133,7 @@ async fn send_proposer_elected(
let history = TermHistory(history_entries);
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at: lsn,
term_history: history,

View File

@@ -306,6 +306,7 @@ async fn recover(
// truncate WAL locally
let pe = ProposerAcceptorMessage::Elected(ProposerElected {
generation: INVALID_GENERATION,
term: donor.term,
start_streaming_at: last_common_point.lsn,
term_history: donor_th,

View File

@@ -63,8 +63,28 @@ impl TermHistory {
TermHistory(Vec::new())
}
// Parse TermHistory as n_entries followed by TermLsn pairs
// Parse TermHistory as n_entries followed by TermLsn pairs in network order.
pub fn from_bytes(bytes: &mut Bytes) -> Result<TermHistory> {
let n_entries = bytes
.get_u32_f()
.with_context(|| "TermHistory misses len")?;
let mut res = Vec::with_capacity(n_entries as usize);
for i in 0..n_entries {
let term = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses term", i))?;
let lsn = bytes
.get_u64_f()
.with_context(|| format!("TermHistory pos {} misses lsn", i))?
.into();
res.push(TermLsn { term, lsn })
}
Ok(TermHistory(res))
}
// Parse TermHistory as n_entries followed by TermLsn pairs in LE order.
// TODO remove once v2 protocol is fully dropped.
pub fn from_bytes_le(bytes: &mut Bytes) -> Result<TermHistory> {
if bytes.remaining() < 4 {
bail!("TermHistory misses len");
}
@@ -269,19 +289,12 @@ pub struct VoteResponse {
*/
#[derive(Debug)]
pub struct ProposerElected {
pub generation: Generation,
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
}
/// V2 of the message; exists as a struct because we (de)serialized it as is.
pub struct ProposerElectedV2 {
pub term: Term,
pub start_streaming_at: Lsn,
pub term_history: TermHistory,
pub timeline_start_lsn: Lsn,
}
/// Request with WAL message sent from proposer to safekeeper. Along the way it
/// communicates commit_lsn.
#[derive(Debug)]
@@ -463,7 +476,7 @@ impl ProposerAcceptorMessage {
pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result<ProposerAcceptorMessage> {
// TODO remove after converting all msgs
let t = msg_bytes[0] as char;
if proto_version == SK_PROTO_VERSION_3 && (t == 'g' || t == 'v') {
if proto_version == SK_PROTO_VERSION_3 && (t == 'g' || t == 'v' || t == 'e') {
if msg_bytes.is_empty() {
bail!("ProposerAcceptorMessage is not complete: missing tag");
}
@@ -504,6 +517,24 @@ impl ProposerAcceptorMessage {
let v = VoteRequest { generation, term };
Ok(ProposerAcceptorMessage::VoteRequest(v))
}
'e' => {
let generation = msg_bytes
.get_u32_f()
.with_context(|| "reading generation")?;
let term = msg_bytes.get_u64_f().with_context(|| "reading term")?;
let start_streaming_at: Lsn = msg_bytes
.get_u64_f()
.with_context(|| "reading start_streaming_at")?
.into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
let msg = ProposerElected {
generation,
term,
start_streaming_at,
term_history,
};
Ok(ProposerAcceptorMessage::Elected(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
}
// TODO remove proto_version == 3 after converting all msgs
@@ -544,12 +575,13 @@ impl ProposerAcceptorMessage {
}
let term = msg_bytes.get_u64_le();
let start_streaming_at = msg_bytes.get_u64_le().into();
let term_history = TermHistory::from_bytes(&mut msg_bytes)?;
let term_history = TermHistory::from_bytes_le(&mut msg_bytes)?;
if msg_bytes.remaining() < 8 {
bail!("ProposerElected message is not complete");
}
let _timeline_start_lsn = msg_bytes.get_u64_le();
let msg = ProposerElected {
generation: INVALID_GENERATION,
term,
start_streaming_at,
term_history,