From 234c3a29df62ddbcb2aa4bdb7dda2b6dc819f5d1 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 20 Jan 2025 11:18:26 +0100 Subject: [PATCH] Pass proto selection to safekeeper, prepare parsing v3. --- pgxn/neon/walproposer.c | 9 ++- pgxn/neon/walproposer_pg.c | 8 +-- safekeeper/src/safekeeper.rs | 120 +++++++++++++++++++---------------- 3 files changed, 74 insertions(+), 63 deletions(-) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 7f41f5b8a0..8be24c233e 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -619,11 +619,14 @@ static void SendStartWALPush(Safekeeper *sk) { WalProposer *wp = sk->wp; +#define CMD_LEN 512 + char cmd[CMD_LEN]; - if (!wp->api.conn_send_query(sk, "START_WAL_PUSH")) + snprintf(cmd, CMD_LEN, "START_WAL_PUSH (proto_version '%d')", wp->config->proto_version); + if (!wp->api.conn_send_query(sk, cmd)) { - wp_log(WARNING, "failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk)); + wp_log(WARNING, "failed to send %s query to safekeeper %s:%s: %s", + cmd, sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; } diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b566358360..b21184de57 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -63,7 +63,7 @@ char *wal_acceptors_list = ""; int wal_acceptor_reconnect_timeout = 1000; int wal_acceptor_connection_timeout = 10000; -int safekeeper_protocol_version = 2; +int safekeeper_proto_version = 2; /* Set to true in the walproposer bgw. */ static bool am_walproposer; @@ -128,7 +128,7 @@ init_walprop_config(bool syncSafekeepers) else walprop_config.systemId = 0; walprop_config.pgTimeline = walprop_pg_get_timeline_id(); - walprop_config.proto_version = safekeeper_protocol_version; + walprop_config.proto_version = safekeeper_proto_version; } /* @@ -224,10 +224,10 @@ nwp_register_gucs(void) NULL, NULL, NULL); DefineCustomIntVariable( - "neon.safekeeper_protocol_version", + "neon.safekeeper_proto_version", "Version of compute <-> safekeeper protocol.", "Used while migrating from 2 to 3.", - &safekeeper_protocol_version, + &safekeeper_proto_version, 2, 0, INT_MAX, PGC_POSTMASTER, 0, diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 45e19c31b6..63a19627df 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -317,70 +317,78 @@ pub enum ProposerAcceptorMessage { impl ProposerAcceptorMessage { /// Parse proposer message. - pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result { - if proto_version != SK_PROTOCOL_VERSION { - bail!( - "incompatible protocol version {}, expected {}", - proto_version, - SK_PROTOCOL_VERSION - ); - } - // xxx using Reader is inefficient but easy to work with bincode - let mut stream = msg_bytes.reader(); - // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is - let tag = stream.read_u64::()? as u8 as char; - match tag { - 'g' => { - let msg = ProposerGreeting::des_from(&mut stream)?; - Ok(ProposerAcceptorMessage::Greeting(msg)) + pub fn parse(mut msg_bytes: Bytes, proto_version: u32) -> Result { + if proto_version == 3 { + if msg_bytes.is_empty() { + bail!("ProposerAcceptorMessage is not complete: missing tag"); } - 'v' => { - let msg = VoteRequest::des_from(&mut stream)?; - Ok(ProposerAcceptorMessage::VoteRequest(msg)) + let tag = msg_bytes.get_u8() as char; + match tag { + 'g' => { + bail!("not implemented"); + } + _ => bail!("unknown proposer-acceptor message tag: {}", tag), } - 'e' => { - let mut msg_bytes = stream.into_inner(); - if msg_bytes.remaining() < 16 { - bail!("ProposerElected message is not complete"); + } else if proto_version == 2 { + // xxx using Reader is inefficient but easy to work with bincode + let mut stream = msg_bytes.reader(); + // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is + let tag = stream.read_u64::()? as u8 as char; + match tag { + 'g' => { + let msg = ProposerGreeting::des_from(&mut stream)?; + Ok(ProposerAcceptorMessage::Greeting(msg)) } - let term = msg_bytes.get_u64_le(); - let start_streaming_at = msg_bytes.get_u64_le().into(); - let term_history = TermHistory::from_bytes(&mut msg_bytes)?; - if msg_bytes.remaining() < 8 { - bail!("ProposerElected message is not complete"); + 'v' => { + let msg = VoteRequest::des_from(&mut stream)?; + Ok(ProposerAcceptorMessage::VoteRequest(msg)) } - let timeline_start_lsn = msg_bytes.get_u64_le().into(); - let msg = ProposerElected { - term, - start_streaming_at, - timeline_start_lsn, - term_history, - }; - Ok(ProposerAcceptorMessage::Elected(msg)) - } - 'a' => { - // read header followed by wal data - let hdr = AppendRequestHeader::des_from(&mut stream)?; - let rec_size = hdr - .end_lsn - .checked_sub(hdr.begin_lsn) - .context("begin_lsn > end_lsn in AppendRequest")? - .0 as usize; - if rec_size > MAX_SEND_SIZE { - bail!( - "AppendRequest is longer than MAX_SEND_SIZE ({})", - MAX_SEND_SIZE - ); + 'e' => { + let mut msg_bytes = stream.into_inner(); + if msg_bytes.remaining() < 16 { + bail!("ProposerElected message is not complete"); + } + let term = msg_bytes.get_u64_le(); + let start_streaming_at = msg_bytes.get_u64_le().into(); + let term_history = TermHistory::from_bytes(&mut msg_bytes)?; + if msg_bytes.remaining() < 8 { + bail!("ProposerElected message is not complete"); + } + let timeline_start_lsn = msg_bytes.get_u64_le().into(); + let msg = ProposerElected { + term, + start_streaming_at, + timeline_start_lsn, + term_history, + }; + Ok(ProposerAcceptorMessage::Elected(msg)) } + 'a' => { + // read header followed by wal data + let hdr = AppendRequestHeader::des_from(&mut stream)?; + let rec_size = hdr + .end_lsn + .checked_sub(hdr.begin_lsn) + .context("begin_lsn > end_lsn in AppendRequest")? + .0 as usize; + if rec_size > MAX_SEND_SIZE { + bail!( + "AppendRequest is longer than MAX_SEND_SIZE ({})", + MAX_SEND_SIZE + ); + } - let mut wal_data_vec: Vec = vec![0; rec_size]; - stream.read_exact(&mut wal_data_vec)?; - let wal_data = Bytes::from(wal_data_vec); - let msg = AppendRequest { h: hdr, wal_data }; + let mut wal_data_vec: Vec = vec![0; rec_size]; + stream.read_exact(&mut wal_data_vec)?; + let wal_data = Bytes::from(wal_data_vec); + let msg = AppendRequest { h: hdr, wal_data }; - Ok(ProposerAcceptorMessage::AppendRequest(msg)) + Ok(ProposerAcceptorMessage::AppendRequest(msg)) + } + _ => bail!("unknown proposer-acceptor message tag: {}", tag), } - _ => bail!("unknown proposer-acceptor message tag: {}", tag), + } else { + bail!("unsupported protocol version {}", proto_version); } }