convert acceptorgreeting

This commit is contained in:
Arseny Sher
2025-01-22 15:23:29 +01:00
parent 8683462157
commit ec745e4d08
5 changed files with 145 additions and 22 deletions

View File

@@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes)
return true;
}
/* --------------------------------
* pq_getmsgint16 - get a binary 2-byte int from a message buffer
* --------------------------------
*/
uint16
pq_getmsgint16(StringInfo msg)
{
return pq_getmsgint(msg, 2);
}
/* --------------------------------
* pq_getmsgint32 - get a binary 4-byte int from a message buffer
* --------------------------------
*/
uint32
pq_getmsgint32(StringInfo msg)
{
return pq_getmsgint(msg, 4);
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------

View File

@@ -8,6 +8,8 @@
#endif
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint16 pq_getmsgint16(StringInfo msg);
uint32 pq_getmsgint32(StringInfo msg);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);

View File

@@ -82,6 +82,7 @@ static char *FormatSafekeeperState(Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
static void MembershipConfigurationFree(MembershipConfiguration *mconf);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -328,6 +329,7 @@ ShutdownConnection(Safekeeper *sk)
sk->state = SS_OFFLINE;
sk->streamingAt = InvalidXLogRecPtr;
MembershipConfigurationFree(&sk->greetResponse.mconf);
if (sk->voteResponse.termHistory.entries)
pfree(sk->voteResponse.termHistory.entries);
sk->voteResponse.termHistory.entries = NULL;
@@ -1840,7 +1842,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf
}
if (proto_version == 2 || proto_version == 3)
//TODO remove proto_version == 3 after converting all msgs
/* TODO remove proto_version == 3 after converting all msgs */
/* removeme tag check after converting all msgs */
{
switch (msg->tag)
@@ -1922,6 +1924,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size)
return false;
}
/* Deserialize membership configuration from buf to mconf. */
static void
MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf)
{
uint32 i;
mconf->generation = pq_getmsgint32(buf);
mconf->members.len = pq_getmsgint32(buf);
mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len);
for (i = 0; i < mconf->members.len; i++)
{
const char *buf_host;
mconf->members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host));
mconf->members.m[i].port = pq_getmsgint16(buf);
}
mconf->new_members.len = pq_getmsgint32(buf);
mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len);
for (i = 0; i < mconf->new_members.len; i++)
{
const char *buf_host;
mconf->new_members.m[i].node_id = pq_getmsgint64(buf);
buf_host = pq_getmsgrawstring(buf);
strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host));
mconf->new_members.m[i].port = pq_getmsgint16(buf);
}
}
/*
* Read next message with known type into provided struct, by reading a CopyData
* block from the safekeeper's postgres connection, returning whether the read
@@ -1955,7 +1988,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
/* removeme tag check after converting all msgs */
if (wp->config->proto_version == 3 && anymsg->tag == 'g')
{
tag = pq_getmsgint(&s, 1);
tag = pq_getmsgbyte(&s);
if (tag != anymsg->tag)
{
wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host,
@@ -1963,6 +1996,24 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
ResetConnection(sk);
return false;
}
switch (tag)
{
case 'g':
{
AcceptorGreeting *msg = (AcceptorGreeting *) anymsg;
msg->nodeId = pq_getmsgint64(&s);
MembershipConfigurationDeserialize(&msg->mconf, &s);
msg->term = pq_getmsgint64(&s);
pq_getmsgend(&s);
return true;
}
default:
{
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
}
/* removeme tag check after converting all msgs */
else if (wp->config->proto_version == 2 || wp->config->proto_version == 3)
@@ -2027,7 +2078,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
default:
{
wp_log(FATAL, "unexpected message tag %c", (char) tag);
wp_log(FATAL, "unexpected message tag %c to read", (char) tag);
return false;
}
}
@@ -2408,3 +2459,12 @@ FormatEvents(WalProposer *wp, uint32 events)
return (char *) &return_str;
}
static void
MembershipConfigurationFree(MembershipConfiguration *mconf)
{
if (mconf->members.m)
pfree(mconf->members.m);
if (mconf->new_members.m)
pfree(mconf->new_members.m);
}

View File

@@ -169,6 +169,9 @@ typedef struct MembershipConfiguration
MemberSet new_members;
} MembershipConfiguration;
/* Used only for debug printing */
#define MaxMembershipConfLen 8192
/*
* Proposer <-> Acceptor messaging.
*/
@@ -223,13 +226,15 @@ typedef struct AcceptorProposerMessage
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
* Acceptor -> Proposer initial response: the highest term acceptor voted for,
* its node id and configuration.
*/
typedef struct AcceptorGreeting
{
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
MembershipConfiguration mconf;
term_t term;
} AcceptorGreeting;
/*

View File

@@ -231,8 +231,9 @@ pub struct ProposerGreetingV2 {
/// (acceptor voted for).
#[derive(Debug, Serialize)]
pub struct AcceptorGreeting {
term: u64,
node_id: NodeId,
mconf: membership::Configuration,
term: u64,
}
/// Vote request sent from proposer to safekeepers
@@ -371,7 +372,7 @@ impl BytesF for Bytes {
impl ProposerAcceptorMessage {
/// Read cstring from Bytes.
fn read_cstr(buf: &mut Bytes) -> Result<String> {
fn get_cstr(buf: &mut Bytes) -> Result<String> {
let pos = buf
.iter()
.position(|x| *x == 0)
@@ -385,7 +386,7 @@ impl ProposerAcceptorMessage {
}
/// Read membership::Configuration from Bytes.
fn read_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
fn get_mconf(buf: &mut Bytes) -> Result<membership::Configuration> {
let generation = buf.get_u32_f().with_context(|| "reading generation")?;
let members_len = buf.get_u32_f().with_context(|| "reading members_len")?;
// Main member set must have at least someone in valid configuration.
@@ -398,8 +399,7 @@ impl ProposerAcceptorMessage {
let id = buf
.get_u64_f()
.with_context(|| format!("reading member {} node_id", i))?;
let host =
Self::read_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?;
let pg_port = buf
.get_u16_f()
.with_context(|| format!("reading member {} port", i))?;
@@ -424,7 +424,7 @@ impl ProposerAcceptorMessage {
let id = buf
.get_u64_f()
.with_context(|| format!("reading new member {} node_id", i))?;
let host = Self::read_cstr(buf)
let host = Self::get_cstr(buf)
.with_context(|| format!("reading new member {} host", i))?;
let pg_port = buf
.get_u16_f()
@@ -458,12 +458,12 @@ impl ProposerAcceptorMessage {
match tag {
'g' => {
let tenant_id_str =
Self::read_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?;
let tenant_id = TenantId::from_str(&tenant_id_str)?;
let timeline_id_str =
Self::read_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?;
let timeline_id = TimelineId::from_str(&timeline_id_str)?;
let mconf = Self::read_mconf(&mut msg_bytes)?;
let mconf = Self::get_mconf(&mut msg_bytes)?;
let pg_version = msg_bytes
.get_u32_f()
.with_context(|| "reading pg_version")?;
@@ -624,6 +624,32 @@ pub enum AcceptorProposerMessage {
}
impl AcceptorProposerMessage {
fn put_cstr(buf: &mut BytesMut, s: &str) {
buf.put_slice(s.as_bytes());
buf.put_u8(0); // null terminator
}
/// Serialize membership::Configuration into buf.
fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) {
buf.put_u32(mconf.generation);
buf.put_u32(mconf.members.m.len() as u32);
for sk in &mconf.members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
if let Some(ref new_members) = mconf.new_members {
buf.put_u32(new_members.m.len() as u32);
for sk in &new_members.m {
buf.put_u64(sk.id.0);
Self::put_cstr(buf, &sk.host);
buf.put_u16(sk.pg_port);
}
} else {
buf.put_u32(0);
}
}
/// Serialize acceptor -> proposer message.
pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> {
// TODO remove after converting all msgs
@@ -631,13 +657,21 @@ impl AcceptorProposerMessage {
&& (matches!(self, AcceptorProposerMessage::Greeting(_)))
{
match self {
_ => bail!("not implemented"),
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u8('g' as u8);
buf.put_u64(msg.node_id.0);
Self::serialize_mconf(buf, &msg.mconf);
buf.put_u64(msg.term)
}
_ => bail!("not impl"),
}
Ok(())
// TODO remove 3 after converting all msgs
} else if proto_version == SK_PROTO_VERSION_2 || proto_version == SK_PROTO_VERSION_3 {
match self {
AcceptorProposerMessage::Greeting(msg) => {
buf.put_u64_le('g' as u64);
// v2 didn't have mconf and fields were reordered
buf.put_u64_le(msg.term);
buf.put_u64_le(msg.node_id.0);
}
@@ -824,14 +858,16 @@ where
self.state.finish_change(&state).await?;
}
info!(
"processed greeting {:?} from walproposer, sending term {:?}",
msg, self.state.acceptor_state.term
);
Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting {
term: self.state.acceptor_state.term,
let apg = AcceptorGreeting {
node_id: self.node_id,
})))
mconf: self.state.mconf.clone(),
term: self.state.acceptor_state.term,
};
info!(
"processed greeting {:?} from walproposer, sending {:?}",
msg, apg
);
Ok(Some(AcceptorProposerMessage::Greeting(apg)))
}
/// Give vote for the given term, if we haven't done that previously.