From 6fe4c6798f76aaf103f1f44166970f4677ff6c9d Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 16 Jan 2025 11:01:19 +0300 Subject: [PATCH] Add START_WAL_PUSH proto_version and allow_timeline_creation options. (#10406) ## Problem As part of https://github.com/neondatabase/neon/issues/8614 we need to pass options to START_WAL_PUSH. ## Summary of changes Add two options. `allow_timeline_creation`, default true, disables implicit timeline creation in the connection from compute. Eventually such creation will be forbidden completely, but as we migrate to configurations we need to support both: current mode and configurations enabled where creation by compute is disabled. `proto_version` specifies compute <-> sk protocol version. We have it currently in the first greeting package also, but I plan to change tag size from u64 to u8, which would make it hard to use. Command is more appropriate place for it anyway. --- safekeeper/src/handler.rs | 107 ++++++++++++++++-- safekeeper/src/receive_wal.rs | 57 +++++++--- safekeeper/src/safekeeper.rs | 11 +- .../tests/walproposer_sim/safekeeper.rs | 6 +- 4 files changed, 152 insertions(+), 29 deletions(-) diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index bb639bfb32..e77eeb4130 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -52,16 +52,70 @@ pub struct SafekeeperPostgresHandler { /// Parsed Postgres command. enum SafekeeperPostgresCommand { - StartWalPush, - StartReplication { start_lsn: Lsn, term: Option }, + StartWalPush { + proto_version: u32, + // Eventually timelines will be always created explicitly by storcon. + // This option allows legacy behaviour for compute to do that until we + // fully migrate. + allow_timeline_creation: bool, + }, + StartReplication { + start_lsn: Lsn, + term: Option, + }, IdentifySystem, TimelineStatus, - JSONCtrl { cmd: AppendLogicalMessage }, + JSONCtrl { + cmd: AppendLogicalMessage, + }, } fn parse_cmd(cmd: &str) -> anyhow::Result { if cmd.starts_with("START_WAL_PUSH") { - Ok(SafekeeperPostgresCommand::StartWalPush) + // Allow additional options in postgres START_REPLICATION style like + // START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false'). + // Parsing here is very naive and breaks in case of commas or + // whitespaces in values, but enough for our purposes. + let re = Regex::new(r"START_WAL_PUSH(\s+?\((.*)\))?").unwrap(); + let caps = re + .captures(cmd) + .context(format!("failed to parse START_WAL_PUSH command {}", cmd))?; + // capture () content + let options = caps.get(2).map(|m| m.as_str()).unwrap_or(""); + // default values + let mut proto_version = 2; + let mut allow_timeline_creation = true; + for kvstr in options.split(",") { + if kvstr.is_empty() { + continue; + } + let mut kvit = kvstr.split_whitespace(); + let key = kvit.next().context(format!( + "failed to parse key in kv {} in command {}", + kvstr, cmd + ))?; + let value = kvit.next().context(format!( + "failed to parse value in kv {} in command {}", + kvstr, cmd + ))?; + let value_trimmed = value.trim_matches('\''); + if key == "proto_version" { + proto_version = value_trimmed.parse::().context(format!( + "failed to parse proto_version value {} in command {}", + value, cmd + ))?; + } + if key == "allow_timeline_creation" { + allow_timeline_creation = value_trimmed.parse::().context(format!( + "failed to parse allow_timeline_creation value {} in command {}", + value, cmd + ))?; + } + } + Ok(SafekeeperPostgresCommand::StartWalPush { + proto_version, + allow_timeline_creation, + }) } else if cmd.starts_with("START_REPLICATION") { let re = Regex::new( // We follow postgres START_REPLICATION LOGICAL options to pass term. @@ -95,7 +149,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str { match cmd { - SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH", + SafekeeperPostgresCommand::StartWalPush { .. } => "START_WAL_PUSH", SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION", SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS", SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM", @@ -293,8 +347,11 @@ impl postgres_backend::Handler self.ttid = TenantTimelineId::new(tenant_id, timeline_id); match cmd { - SafekeeperPostgresCommand::StartWalPush => { - self.handle_start_wal_push(pgb) + SafekeeperPostgresCommand::StartWalPush { + proto_version, + allow_timeline_creation, + } => { + self.handle_start_wal_push(pgb, proto_version, allow_timeline_creation) .instrument(info_span!("WAL receiver")) .await } @@ -467,3 +524,39 @@ impl SafekeeperPostgresHandler { } } } + +#[cfg(test)] +mod tests { + use super::SafekeeperPostgresCommand; + + /// Test parsing of START_WAL_PUSH command + #[test] + fn test_start_wal_push_parse() { + let cmd = "START_WAL_PUSH"; + let parsed = super::parse_cmd(cmd).expect("failed to parse"); + match parsed { + SafekeeperPostgresCommand::StartWalPush { + proto_version, + allow_timeline_creation, + } => { + assert_eq!(proto_version, 2); + assert!(allow_timeline_creation); + } + _ => panic!("unexpected command"), + } + + let cmd = + "START_WAL_PUSH (proto_version '3', allow_timeline_creation 'false', unknown 'hoho')"; + let parsed = super::parse_cmd(cmd).expect("failed to parse"); + match parsed { + SafekeeperPostgresCommand::StartWalPush { + proto_version, + allow_timeline_creation, + } => { + assert_eq!(proto_version, 3); + assert!(!allow_timeline_creation); + } + _ => panic!("unexpected command"), + } + } +} diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index daaa8a253d..cb42f6f414 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -200,9 +200,14 @@ impl SafekeeperPostgresHandler { pub async fn handle_start_wal_push( &mut self, pgb: &mut PostgresBackend, + proto_version: u32, + allow_timeline_creation: bool, ) -> Result<(), QueryError> { let mut tli: Option = None; - if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await { + if let Err(end) = self + .handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation) + .await + { // Log the result and probably send it to the client, closing the stream. let handle_end_fut = pgb.handle_copy_stream_end(end); // If we managed to create the timeline, augment logging with current LSNs etc. @@ -222,6 +227,8 @@ impl SafekeeperPostgresHandler { &mut self, pgb: &mut PostgresBackend, tli: &mut Option, + proto_version: u32, + allow_timeline_creation: bool, ) -> Result<(), CopyStreamHandlerEnd> { // The `tli` parameter is only used for passing _out_ a timeline, one should // not have been passed in. @@ -250,12 +257,17 @@ impl SafekeeperPostgresHandler { conn_id: self.conn_id, pgb_reader: &mut pgb_reader, peer_addr, + proto_version, acceptor_handle: &mut acceptor_handle, global_timelines: self.global_timelines.clone(), }; - // Read first message and create timeline if needed. - let res = network_reader.read_first_message().await; + // Read first message and create timeline if needed and allowed. This + // won't be when timelines will be always created by storcon and + // allow_timeline_creation becomes false. + let res = network_reader + .read_first_message(allow_timeline_creation) + .await; let network_res = if let Ok((timeline, next_msg)) = res { let pageserver_feedback_rx: tokio::sync::broadcast::Receiver = @@ -313,6 +325,7 @@ struct NetworkReader<'a, IO> { conn_id: ConnectionId, pgb_reader: &'a mut PostgresBackendReader, peer_addr: SocketAddr, + proto_version: u32, // WalAcceptor is spawned when we learn server info from walproposer and // create timeline; handle is put here. acceptor_handle: &'a mut Option>>, @@ -322,9 +335,10 @@ struct NetworkReader<'a, IO> { impl NetworkReader<'_, IO> { async fn read_first_message( &mut self, + allow_timeline_creation: bool, ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> { // Receive information about server to create timeline, if not yet. - let next_msg = read_message(self.pgb_reader).await?; + let next_msg = read_message(self.pgb_reader, self.proto_version).await?; let tli = match next_msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( @@ -336,17 +350,22 @@ impl NetworkReader<'_, IO> { system_id: greeting.system_id, wal_seg_size: greeting.wal_seg_size, }; - let tli = self - .global_timelines - .create( - self.ttid, - Configuration::empty(), - server_info, - Lsn::INVALID, - Lsn::INVALID, - ) - .await - .context("create timeline")?; + let tli = if allow_timeline_creation { + self.global_timelines + .create( + self.ttid, + Configuration::empty(), + server_info, + Lsn::INVALID, + Lsn::INVALID, + ) + .await + .context("create timeline")? + } else { + self.global_timelines + .get(self.ttid) + .context("get timeline")? + }; tli.wal_residence_guard().await? } _ => { @@ -375,7 +394,7 @@ impl NetworkReader<'_, IO> { )); // Forward all messages to WalAcceptor - read_network_loop(self.pgb_reader, msg_tx, next_msg).await + read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await } } @@ -383,9 +402,10 @@ impl NetworkReader<'_, IO> { /// TODO: Return Ok(None) on graceful termination. async fn read_message( pgb_reader: &mut PostgresBackendReader, + proto_version: u32, ) -> Result { let copy_data = pgb_reader.read_copy_message().await?; - let msg = ProposerAcceptorMessage::parse(copy_data)?; + let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?; Ok(msg) } @@ -393,6 +413,7 @@ async fn read_network_loop( pgb_reader: &mut PostgresBackendReader, msg_tx: Sender, mut next_msg: ProposerAcceptorMessage, + proto_version: u32, ) -> Result<(), CopyStreamHandlerEnd> { /// Threshold for logging slow WalAcceptor sends. const SLOW_THRESHOLD: Duration = Duration::from_secs(5); @@ -425,7 +446,7 @@ async fn read_network_loop( WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc(); WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64); - next_msg = read_message(pgb_reader).await?; + next_msg = read_message(pgb_reader, proto_version).await?; } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 06403228e9..45e19c31b6 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -29,7 +29,7 @@ use utils::{ lsn::Lsn, }; -const SK_PROTOCOL_VERSION: u32 = 2; +pub const SK_PROTOCOL_VERSION: u32 = 2; pub const UNKNOWN_SERVER_VERSION: u32 = 0; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] @@ -317,7 +317,14 @@ pub enum ProposerAcceptorMessage { impl ProposerAcceptorMessage { /// Parse proposer message. - pub fn parse(msg_bytes: Bytes) -> Result { + pub fn parse(msg_bytes: Bytes, proto_version: u32) -> Result { + if proto_version != SK_PROTOCOL_VERSION { + bail!( + "incompatible protocol version {}, expected {}", + proto_version, + SK_PROTOCOL_VERSION + ); + } // xxx using Reader is inefficient but easy to work with bincode let mut stream = msg_bytes.reader(); // u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is diff --git a/safekeeper/tests/walproposer_sim/safekeeper.rs b/safekeeper/tests/walproposer_sim/safekeeper.rs index e0d593851e..0023a4d22a 100644 --- a/safekeeper/tests/walproposer_sim/safekeeper.rs +++ b/safekeeper/tests/walproposer_sim/safekeeper.rs @@ -15,7 +15,9 @@ use desim::{ }; use http::Uri; use safekeeper::{ - safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION}, + safekeeper::{ + ProposerAcceptorMessage, SafeKeeper, SK_PROTOCOL_VERSION, UNKNOWN_SERVER_VERSION, + }, state::{TimelinePersistentState, TimelineState}, timeline::TimelineError, wal_storage::Storage, @@ -285,7 +287,7 @@ impl ConnState { bail!("finished processing START_REPLICATION") } - let msg = ProposerAcceptorMessage::parse(copy_data)?; + let msg = ProposerAcceptorMessage::parse(copy_data, SK_PROTOCOL_VERSION)?; debug!("got msg: {:?}", msg); self.process(msg, global) } else {