From c19a8b69f2e7f25489279d4dbed36ca780f74c50 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 22 Jan 2025 11:57:27 +0100 Subject: [PATCH] convert ProposerGreeting --- pgxn/neon/walproposer.c | 125 ++++++++++++--------- pgxn/neon/walproposer.h | 6 +- safekeeper/src/receive_wal.rs | 4 +- safekeeper/src/safekeeper.rs | 202 +++++++++++++++++++++++++++++----- 4 files changed, 251 insertions(+), 86 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 8be24c233e..9bfdfea6dc 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -144,13 +144,12 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) /* Fill the greeting package */ wp->greetRequest.pam.tag = 'g'; - wp->greetRequest.proto_version = wp->config->proto_version; if (!wp->config->neon_tenant) wp_log(FATAL, "neon.tenant_id is not provided"); wp->greetRequest.tenant_id = wp->config->neon_tenant; if (!wp->config->neon_timeline) wp_log(FATAL, "neon.timeline_id is not provided"); - wp->greetRequest.timeline_id = wp->config->neon_tenant; + wp->greetRequest.timeline_id = wp->config->neon_timeline; wp->greetRequest.pg_version = PG_VERSION_NUM; wp->greetRequest.system_id = wp->config->systemId; wp->greetRequest.wal_seg_size = wp->config->wal_segment_size; @@ -1771,6 +1770,35 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *fromsk) } } +/* Serialize MembershipConfiguration into buf. */ +static void +MembershipConfigurationSerialize(MembershipConfiguration *mconf, StringInfo buf) +{ + int i; + + pq_sendint32(buf, mconf->generation); + + pq_sendint32(buf, mconf->members.len); + for (i = 0; i < mconf->members.len; i++) + { + pq_sendint64(buf, mconf->members.m[i].node_id); + pq_send_ascii_string(buf, mconf->members.m[i].host); + pq_sendint16(buf, mconf->members.m[i].port); + } + + /* + * There is no special mark for absent new_members; zero members in + * invalid, so zero len means absent. + */ + pq_sendint32(buf, mconf->new_members.len); + for (i = 0; i < mconf->new_members.len; i++) + { + pq_sendint64(buf, mconf->new_members.m[i].node_id); + pq_send_ascii_string(buf, mconf->new_members.m[i].host); + pq_sendint16(buf, mconf->new_members.m[i].port); + } +} + /* Serialize proposer -> acceptor message into buf using specified version */ static void PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf, int proto_version) @@ -1780,58 +1808,54 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf resetStringInfo(buf); - /* - * v2 sends structs for some messages as is, so commonly send tag only for - * v3 - */ - if (proto_version == 3) - pq_sendint8(buf, msg->tag); - - switch (msg->tag) + if (proto_version == 3 && msg->tag == 'g') + //removeme tag check after converting all msgs { - case 'g': - { - if (proto_version == 3) + /* + * v2 sends structs for some messages as is, so commonly send tag only + * for v3 + */ + if (proto_version == 3) + pq_sendint8(buf, msg->tag); + + switch (msg->tag) + { + case 'g': { ProposerGreeting *m = (ProposerGreeting *) msg; + pq_send_ascii_string(buf, m->tenant_id); + pq_send_ascii_string(buf, m->timeline_id); + MembershipConfigurationSerialize(&m->mconf, buf); + pq_sendint32(buf, m->pg_version); + pq_sendint64(buf, m->system_id); + pq_sendint32(buf, m->wal_seg_size); break; } - else if (proto_version == 2) + default: + wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); + } + return; + } + + if (proto_version == 2 || proto_version == 3) // TODO remove proto_version == 3 after converting all msgs + //removeme tag check after converting all msgs + { + switch (msg->tag) + { + case 'g': { /* v2 sent struct as is */ pq_sendbytes(buf, (char *) &wp->greetRequestV2, sizeof(wp->greetRequestV2)); break; } - wp_log(FATAL, "unexpected proto_version %d", proto_version); - break; /* keep the compiler quiet */ - } - case 'v': - { - if (proto_version == 3) - { - VoteRequest *m = (VoteRequest *) msg; - - Assert(false); - break; - } - else if (proto_version == 2) + case 'v': { /* v2 sent struct as is */ pq_sendbytes(buf, (char *) &wp->voteRequest, sizeof(wp->voteRequest)); break; } - wp_log(FATAL, "unexpected proto_version %d", proto_version); - break; /* keep the compiler quiet */ - } - case 'e': - { - if (proto_version == 3) - { - Assert(false); - break; - } - else if (proto_version == 2) + case 'e': { ProposerElected *m = (ProposerElected *) msg; @@ -1847,33 +1871,24 @@ PAMessageSerialize(WalProposer *wp, ProposerAcceptorMessage *msg, StringInfo buf pq_sendint64_le(buf, m->timelineStartLsn); break; } - wp_log(FATAL, "unexpected proto_version %d", proto_version); - break; /* keep the compiler quiet */ - } - case 'a': - { + case 'a': + /* * Note: this serializes only AppendRequestHeader, caller is * expected to append WAL data later. */ - if (proto_version == 3) - { - Assert(false); - break; - } - else if (proto_version == 2) { /* v2 sent struct as is */ pq_sendbytes(buf, (char *) msg, sizeof(AppendRequestHeader)); break; } - wp_log(FATAL, "unexpected proto_version %d", proto_version); - break; /* keep the compiler quiet */ - } - default: - wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); + default: + wp_log(FATAL, "unexpected message type %c to serialize", msg->tag); + } + return; } + wp_log(FATAL, "unexpected proto_version %d", proto_version); } /* @@ -1913,6 +1928,8 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) * If the read needs more polling, we return 'false' and keep the state * unmodified, waiting until it becomes read-ready to try again. If it fully * failed, a warning is emitted and the connection is reset. + * + * Note: it pallocs if needed, i.e. for AcceptorGreeting and VoteResponse fields. */ static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index c7c9c0b28d..8a4897cbf7 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -156,8 +156,8 @@ typedef struct SafekeeperId /* Set of safekeepers. */ typedef struct MemberSet { - uint32 n_members; - SafekeeperId *members; + uint32 len; /* number of members */ + SafekeeperId *m; /* ids themselves */ } MemberSet; /* Timeline safekeeper membership configuration. */ @@ -182,7 +182,6 @@ typedef struct ProposerAcceptorMessage typedef struct ProposerGreeting { ProposerAcceptorMessage pam; /* message tag */ - uint32 proto_version; /* proposer-safekeeper protocol version */ /* * tenant/timeline ids as C strings with standard hex notation for ease of @@ -191,6 +190,7 @@ typedef struct ProposerGreeting */ char *tenant_id; char *timeline_id; + /* Full conf is carried to allow safekeeper switch */ MembershipConfiguration mconf; /* diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index cb42f6f414..c9a92e8ca0 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -342,8 +342,8 @@ impl NetworkReader<'_, IO> { let tli = match next_msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( - "start handshake with walproposer {} sysid {} timeline {}", - self.peer_addr, greeting.system_id, greeting.tli, + "start handshake with walproposer {} sysid {}", + self.peer_addr, greeting.system_id, ); let server_info = ServerInfo { pg_version: greeting.pg_version, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 63a19627df..7022aa309a 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,10 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::{TimeLineID, MAX_SEND_SIZE}; +use safekeeper_api::membership; +use safekeeper_api::membership::MemberSet; +use safekeeper_api::membership::SafekeeperId; +use safekeeper_api::membership::INVALID_GENERATION; use safekeeper_api::models::HotStandbyFeedback; use safekeeper_api::Term; use serde::{Deserialize, Serialize}; @@ -12,6 +16,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; +use std::str::FromStr; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -197,6 +202,18 @@ impl AcceptorState { /// Initial Proposer -> Acceptor message #[derive(Debug, Deserialize)] pub struct ProposerGreeting { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub mconf: membership::Configuration, + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + pub wal_seg_size: u32, +} + +/// V2 of the message; exists as a struct because we (de)serialized it as is. +#[derive(Debug, Deserialize)] +pub struct ProposerGreetingV2 { /// proposer-acceptor protocol version pub protocol_version: u32, /// Postgres server version @@ -315,29 +332,178 @@ pub enum ProposerAcceptorMessage { FlushWAL, } +/// Augment Bytes with fallible get_uN where N is number of bytes methods. +/// All reads are in network (big endian) order. +trait BytesF { + fn get_u8_f(&mut self) -> Result; + fn get_u16_f(&mut self) -> Result; + fn get_u32_f(&mut self) -> Result; + fn get_u64_f(&mut self) -> Result; +} + +impl BytesF for Bytes { + fn get_u8_f(&mut self) -> Result { + if self.is_empty() { + bail!("no bytes left, expected 1"); + } + Ok(self.get_u8()) + } + fn get_u16_f(&mut self) -> Result { + if self.is_empty() { + bail!("no bytes left, expected 2"); + } + Ok(self.get_u16()) + } + fn get_u32_f(&mut self) -> Result { + if self.remaining() < 4 { + bail!("only {} bytes left, expected 4", self.remaining()); + } + Ok(self.get_u32()) + } + fn get_u64_f(&mut self) -> Result { + if self.remaining() < 8 { + bail!("only {} bytes left, expected 8", self.remaining()); + } + Ok(self.get_u64()) + } +} + impl ProposerAcceptorMessage { + /// Read cstring from Bytes. + fn read_cstr(buf: &mut Bytes) -> Result { + let pos = buf + .iter() + .position(|x| *x == 0) + .ok_or_else(|| anyhow::anyhow!("missing cstring terminator"))?; + let result = buf.split_to(pos); + buf.advance(1); // drop the null terminator + match std::str::from_utf8(&result) { + Ok(s) => Ok(s.to_string()), + Err(e) => bail!("invalid utf8 in cstring: {}", e), + } + } + + /// Read membership::Configuration from Bytes. + fn read_mconf(buf: &mut Bytes) -> Result { + let generation = buf.get_u32_f().with_context(|| "reading generation")?; + let members_len = buf.get_u32_f().with_context(|| "reading members_len")?; + // Main member set must have at least someone in valid configuration. + // Empty conf is allowed until we fully migrate. + if generation != INVALID_GENERATION && members_len == 0 { + bail!("empty members_len"); + } + let mut members = MemberSet::empty(); + for i in 0..members_len { + let id = buf + .get_u64_f() + .with_context(|| format!("reading member {} node_id", i))?; + let host = + Self::read_cstr(buf).with_context(|| format!("reading member {} host", i))?; + let pg_port = buf + .get_u16_f() + .with_context(|| format!("reading member {} port", i))?; + let sk = SafekeeperId { + id: NodeId(id), + host, + pg_port, + }; + members.add(sk)?; + } + let new_members_len = buf.get_u32_f().with_context(|| "reading new_members_len")?; + // Non joint conf. + if new_members_len == 0 { + Ok(membership::Configuration { + generation, + members, + new_members: None, + }) + } else { + let mut new_members = MemberSet::empty(); + for i in 0..new_members_len { + let id = buf + .get_u64_f() + .with_context(|| format!("reading new member {} node_id", i))?; + let host = Self::read_cstr(buf) + .with_context(|| format!("reading new member {} host", i))?; + let pg_port = buf + .get_u16_f() + .with_context(|| format!("reading new member {} port", i))?; + let sk = SafekeeperId { + id: NodeId(id), + host, + pg_port, + }; + new_members.add(sk)?; + } + Ok(membership::Configuration { + generation, + members, + new_members: Some(new_members), + }) + } + } + /// Parse proposer message. pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result { - if proto_version == 3 { + // TODO remove after converting all msgs + let t = msg_bytes[0] as char; + if proto_version == 3 && t == 'g' { if msg_bytes.is_empty() { bail!("ProposerAcceptorMessage is not complete: missing tag"); } - let tag = msg_bytes.get_u8() as char; + let tag = msg_bytes.get_u8_f().with_context(|| { + "ProposerAcceptorMessage is not complete: missing tag".to_string() + })? as char; match tag { 'g' => { - bail!("not implemented"); + let tenant_id_str = + Self::read_cstr(&mut msg_bytes).with_context(|| "reading tenant_id")?; + let tenant_id = TenantId::from_str(&tenant_id_str)?; + let timeline_id_str = + Self::read_cstr(&mut msg_bytes).with_context(|| "reading timeline_id")?; + let timeline_id = TimelineId::from_str(&timeline_id_str)?; + let mconf = Self::read_mconf(&mut msg_bytes)?; + let pg_version = msg_bytes + .get_u32_f() + .with_context(|| "reading pg_version")?; + let system_id = msg_bytes.get_u64_f().with_context(|| "reading system_id")?; + let wal_seg_size = msg_bytes + .get_u32_f() + .with_context(|| "reading wal_seg_size")?; + let g = ProposerGreeting { + tenant_id, + timeline_id, + mconf, + pg_version, + system_id, + wal_seg_size, + }; + Ok(ProposerAcceptorMessage::Greeting(g)) } _ => bail!("unknown proposer-acceptor message tag: {}", tag), } - } else if proto_version == 2 { + // TODO remove proto_version == 3 after converting all msgs + } else if proto_version == 2 || proto_version == 3 { // xxx using Reader is inefficient but easy to work with bincode let mut stream = msg_bytes.reader(); // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is let tag = stream.read_u64::()? as u8 as char; match tag { 'g' => { - let msg = ProposerGreeting::des_from(&mut stream)?; - Ok(ProposerAcceptorMessage::Greeting(msg)) + let msgv2 = ProposerGreetingV2::des_from(&mut stream)?; + let g = ProposerGreeting { + tenant_id: msgv2.tenant_id, + timeline_id: msgv2.timeline_id, + mconf: membership::Configuration { + generation: INVALID_GENERATION, + members: MemberSet::empty(), + new_members: None, + }, + pg_version: msgv2.pg_version, + system_id: msgv2.system_id, + wal_seg_size: msgv2.wal_seg_size, + }; + Ok(ProposerAcceptorMessage::Greeting(g)) } 'v' => { let msg = VoteRequest::des_from(&mut stream)?; @@ -402,16 +568,7 @@ impl ProposerAcceptorMessage { // We explicitly list all fields, to draw attention here when new fields are added. let mut size = BASE_SIZE; size += match self { - Self::Greeting(ProposerGreeting { - protocol_version: _, - pg_version: _, - proposer_id: _, - system_id: _, - timeline_id: _, - tenant_id: _, - tli: _, - wal_seg_size: _, - }) => 0, + Self::Greeting(_) => 0, Self::VoteRequest(VoteRequest { term: _ }) => 0, @@ -601,14 +758,6 @@ where &mut self, msg: &ProposerGreeting, ) -> Result> { - // Check protocol compatibility - if msg.protocol_version != SK_PROTOCOL_VERSION { - bail!( - "incompatible protocol version {}, expected {}", - msg.protocol_version, - SK_PROTOCOL_VERSION - ); - } /* Postgres major version mismatch is treated as fatal error * because safekeepers parse WAL headers and the format * may change between versions. @@ -664,9 +813,8 @@ where } info!( - "processed greeting from walproposer {}, sending term {:?}", - msg.proposer_id.map(|b| format!("{:X}", b)).join(""), - self.state.acceptor_state.term + "processed greeting {:?} from walproposer, sending term {:?}", + msg, self.state.acceptor_state.term ); Ok(Some(AcceptorProposerMessage::Greeting(AcceptorGreeting { term: self.state.acceptor_state.term,