diff --git a/pgxn/neon/neon_utils.c b/pgxn/neon/neon_utils.c index 1fb4ed9522..1fad44bd58 100644 --- a/pgxn/neon/neon_utils.c +++ b/pgxn/neon/neon_utils.c @@ -51,6 +51,26 @@ HexDecodeString(uint8 *result, char *input, int nbytes) return true; } +/* -------------------------------- + * pq_getmsgint16 - get a binary 2-byte int from a message buffer + * -------------------------------- + */ +uint16 +pq_getmsgint16(StringInfo msg) +{ + return pq_getmsgint(msg, 2); +} + +/* -------------------------------- + * pq_getmsgint32 - get a binary 4-byte int from a message buffer + * -------------------------------- + */ +uint32 +pq_getmsgint32(StringInfo msg) +{ + return pq_getmsgint(msg, 4); +} + /* -------------------------------- * pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order * -------------------------------- diff --git a/pgxn/neon/neon_utils.h b/pgxn/neon/neon_utils.h index 89683714f1..7480ac28cc 100644 --- a/pgxn/neon/neon_utils.h +++ b/pgxn/neon/neon_utils.h @@ -8,6 +8,8 @@ #endif bool HexDecodeString(uint8 *result, char *input, int nbytes); +uint16 pq_getmsgint16(StringInfo msg); +uint32 pq_getmsgint32(StringInfo msg); uint32 pq_getmsgint32_le(StringInfo msg); uint64 pq_getmsgint64_le(StringInfo msg); void pq_sendint32_le(StringInfo buf, uint32 i); diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index f49fa2c3f6..67216fdd30 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -82,6 +82,7 @@ static char *FormatSafekeeperState(Safekeeper *sk); static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static char *FormatEvents(WalProposer *wp, uint32 events); static void UpdateDonorShmem(WalProposer *wp); +static void MembershipConfigurationFree(MembershipConfiguration *mconf); WalProposer * WalProposerCreate(WalProposerConfig *config, walproposer_api api) @@ -328,6 +329,7 @@ ShutdownConnection(Safekeeper *sk) sk->state = SS_OFFLINE; sk->streamingAt = InvalidXLogRecPtr; + MembershipConfigurationFree(&sk->greetResponse.mconf); if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); sk->voteResponse.termHistory.entries = NULL; @@ -1840,7 +1842,7 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf } if (proto_version == 2 || proto_version == 3) - //TODO remove proto_version == 3 after converting all msgs + /* TODO remove proto_version == 3 after converting all msgs */ /* removeme tag check after converting all msgs */ { switch (msg->tag) @@ -1922,6 +1924,37 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) return false; } +/* Deserialize membership configuration from buf to mconf. */ +static void +MembershipConfigurationDeserialize(MembershipConfiguration *mconf, StringInfo buf) +{ + uint32 i; + + mconf->generation = pq_getmsgint32(buf); + mconf->members.len = pq_getmsgint32(buf); + mconf->members.m = palloc0(sizeof(SafekeeperId) * mconf->members.len); + for (i = 0; i < mconf->members.len; i++) + { + const char *buf_host; + + mconf->members.m[i].node_id = pq_getmsgint64(buf); + buf_host = pq_getmsgrawstring(buf); + strlcpy(mconf->members.m[i].host, buf_host, sizeof(mconf->members.m[i].host)); + mconf->members.m[i].port = pq_getmsgint16(buf); + } + mconf->new_members.len = pq_getmsgint32(buf); + mconf->new_members.m = palloc0(sizeof(SafekeeperId) * mconf->new_members.len); + for (i = 0; i < mconf->new_members.len; i++) + { + const char *buf_host; + + mconf->new_members.m[i].node_id = pq_getmsgint64(buf); + buf_host = pq_getmsgrawstring(buf); + strlcpy(mconf->new_members.m[i].host, buf_host, sizeof(mconf->new_members.m[i].host)); + mconf->new_members.m[i].port = pq_getmsgint16(buf); + } +} + /* * Read next message with known type into provided struct, by reading a CopyData * block from the safekeeper's postgres connection, returning whether the read @@ -1955,7 +1988,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) /* removeme tag check after converting all msgs */ if (wp->config->proto_version == 3 && anymsg->tag == 'g') { - tag = pq_getmsgint(&s, 1); + tag = pq_getmsgbyte(&s); if (tag != anymsg->tag) { wp_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, @@ -1963,6 +1996,24 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) ResetConnection(sk); return false; } + switch (tag) + { + case 'g': + { + AcceptorGreeting *msg = (AcceptorGreeting *) anymsg; + + msg->nodeId = pq_getmsgint64(&s); + MembershipConfigurationDeserialize(&msg->mconf, &s); + msg->term = pq_getmsgint64(&s); + pq_getmsgend(&s); + return true; + } + default: + { + wp_log(FATAL, "unexpected message tag %c to read", (char) tag); + return false; + } + } } /* removeme tag check after converting all msgs */ else if (wp->config->proto_version == 2 || wp->config->proto_version == 3) @@ -2027,7 +2078,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) default: { - wp_log(FATAL, "unexpected message tag %c", (char) tag); + wp_log(FATAL, "unexpected message tag %c to read", (char) tag); return false; } } @@ -2408,3 +2459,12 @@ FormatEvents(WalProposer *wp, uint32 events) return (char *) &return_str; } + +static void +MembershipConfigurationFree(MembershipConfiguration *mconf) +{ + if (mconf->members.m) + pfree(mconf->members.m); + if (mconf->new_members.m) + pfree(mconf->new_members.m); +} diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 8a9d99b8c0..9046abc006 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -169,6 +169,9 @@ typedef struct MembershipConfiguration MemberSet new_members; } MembershipConfiguration; +/* Used only for debug printing */ +#define MaxMembershipConfLen 8192 + /* * Proposer <-> Acceptor messaging. */ @@ -223,13 +226,15 @@ typedef struct AcceptorProposerMessage } AcceptorProposerMessage; /* - * Acceptor -> Proposer initial response: the highest term acceptor voted for. + * Acceptor -> Proposer initial response: the highest term acceptor voted for, + * its node id and configuration. */ typedef struct AcceptorGreeting { AcceptorProposerMessage apm; - term_t term; NNodeId nodeId; + MembershipConfiguration mconf; + term_t term; } AcceptorGreeting; /* diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index db674d09c8..8b829a8389 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -231,8 +231,9 @@ pub struct ProposerGreetingV2 { /// (acceptor voted for). #[derive(Debug, Serialize)] pub struct AcceptorGreeting { - term: u64, node_id: NodeId, + mconf: membership::Configuration, + term: u64, } /// Vote request sent from proposer to safekeepers @@ -371,7 +372,7 @@ impl BytesF for Bytes { impl ProposerAcceptorMessage { /// Read cstring from Bytes. - fn read_cstr(buf: &mut Bytes) -> Result { + fn get_cstr(buf: &mut Bytes) -> Result { let pos = buf .iter() .position(|x| *x == 0) @@ -385,7 +386,7 @@ impl ProposerAcceptorMessage { } /// Read membership::Configuration from Bytes. - fn read_mconf(buf: &mut Bytes) -> Result { + fn get_mconf(buf: &mut Bytes) -> Result { let generation = buf.get_u32_f().with_context(|| "reading generation")?; let members_len = buf.get_u32_f().with_context(|| "reading members_len")?; // Main member set must have at least someone in valid configuration. @@ -398,8 +399,7 @@ impl ProposerAcceptorMessage { let id = buf .get_u64_f() .with_context(|| format!("reading member {} node_id", i))?; - let host = - Self::read_cstr(buf).with_context(|| format!("reading member {} host", i))?; + let host = Self::get_cstr(buf).with_context(|| format!("reading member {} host", i))?; let pg_port = buf .get_u16_f() .with_context(|| format!("reading member {} port", i))?; @@ -424,7 +424,7 @@ impl ProposerAcceptorMessage { let id = buf .get_u64_f() .with_context(|| format!("reading new member {} node_id", i))?; - let host = Self::read_cstr(buf) + let host = Self::get_cstr(buf) .with_context(|| format!("reading new member {} host", i))?; let pg_port = buf .get_u16_f() @@ -458,12 +458,12 @@ impl ProposerAcceptorMessage { match tag { 'g' => { let tenant_id_str = - Self::read_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?; + Self::get_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?; let tenant_id = TenantId::from_str(&tenant_id_str)?; let timeline_id_str = - Self::read_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?; + Self::get_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?; let timeline_id = TimelineId::from_str(&timeline_id_str)?; - let mconf = Self::read_mconf(&mut msg_bytes)?; + let mconf = Self::get_mconf(&mut msg_bytes)?; let pg_version = msg_bytes .get_u32_f() .with_context(|| "reading pg_version")?; @@ -624,6 +624,32 @@ pub enum AcceptorProposerMessage { } impl AcceptorProposerMessage { + fn put_cstr(buf: &mut BytesMut, s: &str) { + buf.put_slice(s.as_bytes()); + buf.put_u8(0); // null terminator + } + + /// Serialize membership::Configuration into buf. + fn serialize_mconf(buf: &mut BytesMut, mconf: &membership::Configuration) { + buf.put_u32(mconf.generation); + buf.put_u32(mconf.members.m.len() as u32); + for sk in &mconf.members.m { + buf.put_u64(sk.id.0); + Self::put_cstr(buf, &sk.host); + buf.put_u16(sk.pg_port); + } + if let Some(ref new_members) = mconf.new_members { + buf.put_u32(new_members.m.len() as u32); + for sk in &new_members.m { + buf.put_u64(sk.id.0); + Self::put_cstr(buf, &sk.host); + buf.put_u16(sk.pg_port); + } + } else { + buf.put_u32(0); + } + } + /// Serialize acceptor -> proposer message. pub fn serialize(&self, buf: &mut BytesMut, proto_version: u32) -> Result<()> { // TODO remove after converting all msgs @@ -631,13 +657,21 @@ impl AcceptorProposerMessage { && (matches!(self, AcceptorProposerMessage::Greeting(_))) { match self { - _ => bail!("not implemented"), + AcceptorProposerMessage::Greeting(msg) => { + buf.put_u8('g' as u8); + buf.put_u64(msg.node_id.0); + Self::serialize_mconf(buf, &msg.mconf); + buf.put_u64(msg.term) + } + _ => bail!("not impl"), } + Ok(()) // TODO remove 3 after converting all msgs } else if proto_version == SK_PROTO_VERSION_2 || proto_version == SK_PROTO_VERSION_3 { match self { AcceptorProposerMessage::Greeting(msg) => { buf.put_u64_le('g' as u64); + // v2 didn't have mconf and fields were reordered buf.put_u64_le(msg.term); buf.put_u64_le(msg.node_id.0); } @@ -824,14 +858,16 @@ where self.state.finish_change(&state).await?; } - info!( - "processed greeting {:?} from walproposer, sending term {:?}", - msg, self.state.acceptor_state.term - ); - Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { - term: self.state.acceptor_state.term, + let apg = AcceptorGreeting { node_id: self.node_id, - }))) + mconf: self.state.mconf.clone(), + term: self.state.acceptor_state.term, + }; + info!( + "processed greeting {:?} from walproposer, sending {:?}", + msg, apg + ); + Ok(Some(AcceptorProposerMessage::Greeting(apg))) } /// Give vote for the given term, if we haven't done that previously.