From a163650a99179e9beb3658b0e51ba6ec7ad377e1 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 23 Dec 2021 13:03:16 +0300 Subject: [PATCH] Refactor Postgres command parsing in safekeeper. Do it separately with SafekeeperPostgresCommand enum as a result. Since query is always C string, switch postgres_backend process_query argument from Bytes to &str. Make passing ztli/ztenant id in safekeeper connection string optional; this is needed for upcoming intra-safekeeper heartbeat cmd which is not bound to any timeline. --- pageserver/src/page_service.rs | 9 +- proxy/src/mgmt.rs | 12 +- walkeeper/src/handler.rs | 161 +++++++++++++++++---------- walkeeper/src/json_ctrl.rs | 25 ++--- walkeeper/src/receive_wal.rs | 2 +- walkeeper/src/send_wal.rs | 24 +--- zenith_utils/src/postgres_backend.rs | 32 ++++-- zenith_utils/tests/ssl_test.rs | 12 +- 8 files changed, 149 insertions(+), 128 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d80e23022a..46ee072732 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -532,17 +532,10 @@ impl postgres_backend::Handler for PageServerHandler { fn process_query( &mut self, pgb: &mut PostgresBackend, - query_string: Bytes, + query_string: &str, ) -> anyhow::Result<()> { debug!("process query {:?}", query_string); - // remove null terminator, if any - let mut query_string = query_string; - if query_string.last() == Some(&0) { - query_string.truncate(query_string.len() - 1); - } - let query_string = std::str::from_utf8(&query_string)?; - if query_string.starts_with("pagestream ") { let (_, params_raw) = query_string.split_at("pagestream ".len()); let params = params_raw.split(' ').collect::>(); diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index 97abdf1bf5..1b9d9502f2 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -3,10 +3,9 @@ use std::{ thread, }; -use bytes::Bytes; use serde::Deserialize; use zenith_utils::{ - postgres_backend::{self, query_from_cstring, AuthType, PostgresBackend}, + postgres_backend::{self, AuthType, PostgresBackend}, pq_proto::{BeMessage, SINGLE_COL_ROWDESC}, }; @@ -79,7 +78,7 @@ impl postgres_backend::Handler for MgmtHandler<'_> { fn process_query( &mut self, pgb: &mut PostgresBackend, - query_string: Bytes, + query_string: &str, ) -> anyhow::Result<()> { let res = try_process_query(self, pgb, query_string); // intercept and log error message @@ -93,12 +92,11 @@ impl postgres_backend::Handler for MgmtHandler<'_> { fn try_process_query( mgmt: &mut MgmtHandler, pgb: &mut PostgresBackend, - query_string: Bytes, + query_string: &str, ) -> anyhow::Result<()> { - let query_string = query_from_cstring(query_string); - println!("Got mgmt query: '{}'", std::str::from_utf8(&query_string)?); + println!("Got mgmt query: '{}'", query_string); - let resp: PsqlSessionResponse = serde_json::from_slice(&query_string)?; + let resp: PsqlSessionResponse = serde_json::from_str(query_string)?; use PsqlSessionResult::*; let msg = match resp.result { diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index 7aad3a62f2..5ed599ab07 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -1,16 +1,18 @@ //! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres //! protocol commands. -use crate::json_ctrl::handle_json_ctrl; +use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::receive_wal::ReceiveWalConn; use crate::send_wal::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; use crate::SafeKeeperConf; use anyhow::{anyhow, bail, Context, Result}; -use bytes::Bytes; + use postgres_ffi::xlog_utils::PG_TLI; +use regex::Regex; use std::str::FromStr; use std::sync::Arc; +use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID}; @@ -25,26 +27,62 @@ pub struct SafekeeperPostgresHandler { pub conf: SafeKeeperConf, /// assigned application name pub appname: Option, - pub tenantid: Option, - pub timelineid: Option, + pub ztenantid: Option, + pub ztimelineid: Option, pub timeline: Option>, //sender to communicate with callmemaybe thread pub tx: UnboundedSender, } -impl postgres_backend::Handler for SafekeeperPostgresHandler { - fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { - let ztimelineid = sm - .params - .get("ztimelineid") - .ok_or_else(|| anyhow!("timelineid is required"))?; - self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?); +/// Parsed Postgres command. +enum SafekeeperPostgresCommand { + StartWalPush { pageserver_connstr: Option }, + StartReplication { start_lsn: Lsn }, + IdentifySystem, + JSONCtrl { cmd: AppendLogicalMessage }, +} - let ztenantid = sm - .params - .get("ztenantid") - .ok_or_else(|| anyhow!("tenantid is required"))?; - self.tenantid = Some(ZTenantId::from_str(ztenantid)?); +fn parse_cmd(cmd: &str) -> Result { + if cmd.starts_with("START_WAL_PUSH") { + let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap(); + + let caps = re.captures(cmd).unwrap(); + let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned()); + Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr }) + } else if cmd.starts_with("START_REPLICATION") { + let re = + Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); + let mut caps = re.captures_iter(cmd); + let start_lsn = caps + .next() + .map(|cap| cap[1].parse::()) + .ok_or_else(|| anyhow!("failed to parse start LSN from START_REPLICATION command"))??; + Ok(SafekeeperPostgresCommand::StartReplication { start_lsn }) + } else if cmd.starts_with("IDENTIFY_SYSTEM") { + Ok(SafekeeperPostgresCommand::IdentifySystem) + } else if cmd.starts_with("JSON_CTRL") { + let cmd = cmd + .strip_prefix("JSON_CTRL") + .ok_or_else(|| anyhow!("invalid prefix"))?; + let parsed_cmd: AppendLogicalMessage = serde_json::from_str(cmd)?; + Ok(SafekeeperPostgresCommand::JSONCtrl { cmd: parsed_cmd }) + } else { + bail!("unsupported command {}", cmd); + } +} + +impl postgres_backend::Handler for SafekeeperPostgresHandler { + // ztenant id and ztimeline id are passed in connection string params + fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> { + self.ztenantid = match sm.params.get("ztenantid") { + Some(z) => Some(ZTenantId::from_str(z)?), // just curious, can I do that from .map? + _ => None, + }; + + self.ztimelineid = match sm.params.get("ztimelineid") { + Some(z) => Some(ZTimelineId::from_str(z)?), + _ => None, + }; if let Some(app_name) = sm.params.get("application_name") { self.appname = Some(app_name.clone()); @@ -53,51 +91,52 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { Ok(()) } - fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { - // START_WAL_PUSH is the only command that initializes the timeline in production. - // There is also JSON_CTRL command, which should initialize the timeline for testing. - if self.timeline.is_none() { - if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL") - { - self.timeline.set( - &self.conf, - self.tenantid.unwrap(), - self.timelineid.unwrap(), - CreateControlFile::True, - )?; - } else { - self.timeline.set( - &self.conf, - self.tenantid.unwrap(), - self.timelineid.unwrap(), - CreateControlFile::False, - )?; + fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> { + let cmd = parse_cmd(query_string)?; + + // Is this command is ztimeline scoped? + match cmd { + SafekeeperPostgresCommand::StartWalPush { .. } + | SafekeeperPostgresCommand::StartReplication { .. } + | SafekeeperPostgresCommand::IdentifySystem + | SafekeeperPostgresCommand::JSONCtrl { .. } => { + let tenantid = self + .ztenantid + .ok_or_else(|| anyhow!("tenantid is required"))?; + let timelineid = self + .ztimelineid + .ok_or_else(|| anyhow!("timelineid is required"))?; + if self.timeline.is_none() { + // START_WAL_PUSH is the only command that initializes the timeline in production. + // There is also JSON_CTRL command, which should initialize the timeline for testing. + let create_control_file = match cmd { + SafekeeperPostgresCommand::StartWalPush { .. } + | SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True, + _ => CreateControlFile::False, + }; + self.timeline + .set(&self.conf, tenantid, timelineid, create_control_file)?; + } } } - if query_string.starts_with(b"IDENTIFY_SYSTEM") { - self.handle_identify_system(pgb)?; - } else if query_string.starts_with(b"START_REPLICATION") { - ReplicationConn::new(pgb) - .run(self, pgb, &query_string) - .with_context(|| "failed to run ReplicationConn")?; - } else if query_string.starts_with(b"START_WAL_PUSH") { - // TODO: this repeats query decoding logic from page_service so it is probably - // a good idea to refactor it in pgbackend and pass string to process query instead of bytes - let decoded_query_string = match query_string.last() { - Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?, - _ => std::str::from_utf8(&query_string)?, - }; - let pageserver_connstr = decoded_query_string - .split_whitespace() - .nth(1) - .map(|s| s.to_owned()); - ReceiveWalConn::new(pgb, pageserver_connstr) - .run(self) - .with_context(|| "failed to run ReceiveWalConn")?; - } else if query_string.starts_with(b"JSON_CTRL") { - handle_json_ctrl(self, pgb, &query_string)?; - } else { - bail!("Unexpected command {:?}", query_string); + + match cmd { + SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => { + ReceiveWalConn::new(pgb, pageserver_connstr) + .run(self) + .with_context(|| "failed to run ReceiveWalConn")?; + } + SafekeeperPostgresCommand::StartReplication { start_lsn } => { + ReplicationConn::new(pgb) + .run(self, pgb, start_lsn) + .with_context(|| "failed to run ReplicationConn")?; + } + SafekeeperPostgresCommand::IdentifySystem => { + self.handle_identify_system(pgb)?; + } + SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { + handle_json_ctrl(self, pgb, cmd)?; + } } Ok(()) } @@ -108,8 +147,8 @@ impl SafekeeperPostgresHandler { SafekeeperPostgresHandler { conf, appname: None, - tenantid: None, - timelineid: None, + ztenantid: None, + ztimelineid: None, timeline: None, tx, } diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs index 82fa748c7f..715ed559a9 100644 --- a/walkeeper/src/json_ctrl.rs +++ b/walkeeper/src/json_ctrl.rs @@ -6,7 +6,7 @@ //! modifications in tests. //! -use anyhow::{anyhow, Result}; +use anyhow::Result; use bytes::{BufMut, Bytes, BytesMut}; use crc32c::crc32c_append; use serde::{Deserialize, Serialize}; @@ -27,7 +27,7 @@ use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; #[derive(Serialize, Deserialize, Debug)] -struct AppendLogicalMessage { +pub struct AppendLogicalMessage { // prefix and message to build LogicalMessage lm_prefix: String, lm_message: String, @@ -59,15 +59,8 @@ struct AppendResult { pub fn handle_json_ctrl( spg: &mut SafekeeperPostgresHandler, pgb: &mut PostgresBackend, - cmd: &Bytes, + append_request: &AppendLogicalMessage, ) -> Result<()> { - let cmd = cmd - .strip_prefix(b"JSON_CTRL") - .ok_or_else(|| anyhow!("invalid prefix"))?; - // trim zeroes in the end - let cmd = cmd.strip_suffix(&[0u8]).unwrap_or(cmd); - - let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?; info!("JSON_CTRL request: {:?}", append_request); // need to init safekeeper state before AppendRequest @@ -104,8 +97,8 @@ fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> { pg_version: 0, // unknown proposer_id: [0u8; 16], system_id: 0, - ztli: spg.timelineid.unwrap(), - tenant_id: spg.tenantid.unwrap(), + ztli: spg.ztimelineid.unwrap(), + tenant_id: spg.ztenantid.unwrap(), tli: 0, wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests }); @@ -146,9 +139,9 @@ struct InsertedWAL { /// create AppendRequest with new WAL and pass it to safekeeper. fn append_logical_message( spg: &mut SafekeeperPostgresHandler, - msg: AppendLogicalMessage, + msg: &AppendLogicalMessage, ) -> Result { - let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); + let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); let sk_state = spg.timeline.get().get_info(); let begin_lsn = msg.begin_lsn; @@ -206,7 +199,7 @@ impl XlLogicalMessage { /// Create new WAL record for non-transactional logical message. /// Used for creating artificial WAL for tests, as LogicalMessage /// record is basically no-op. -fn encode_logical_message(prefix: String, message: String) -> Vec { +fn encode_logical_message(prefix: &str, message: &str) -> Vec { let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); prefix_bytes.put(prefix.as_bytes()); prefix_bytes.put_u8(0); @@ -270,6 +263,6 @@ fn test_encode_logical_message() { 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, ]; - let actual = encode_logical_message("prefix".to_string(), "message".to_string()); + let actual = encode_logical_message("prefix", "message"); assert_eq!(expected, actual[..]); } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index bd8a2ed0dd..96aa19030d 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -72,7 +72,7 @@ impl<'pg> ReceiveWalConn<'pg> { /// Receive WAL from wal_proposer pub fn run(&mut self, spg: &mut SafekeeperPostgresHandler) -> Result<()> { - let _enter = info_span!("WAL acceptor", timeline = %spg.timelineid.unwrap()).entered(); + let _enter = info_span!("WAL acceptor", timeline = %spg.ztimelineid.unwrap()).entered(); // Notify the libpq client that it's allowed to send `CopyData` messages self.pg_backend diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index fdc56f729f..2c3288e584 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -4,11 +4,11 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline, TimelineTools}; use anyhow::{anyhow, bail, Context, Result}; -use bytes::Bytes; + use postgres_ffi::xlog_utils::{ get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI, }; -use regex::Regex; + use serde::{Deserialize, Serialize}; use std::cmp::min; use std::fs::File; @@ -170,18 +170,6 @@ impl ReplicationConn { Ok(()) } - /// Helper function that parses a single LSN. - fn parse_start(cmd: &[u8]) -> Result { - let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); - let caps = re.captures_iter(str::from_utf8(cmd)?); - let mut lsns = caps.map(|cap| cap[1].parse::()); - let start_pos = lsns - .next() - .ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??; - assert!(lsns.next().is_none()); - Ok(start_pos) - } - /// Helper function for opening a wal file. fn open_wal_file(wal_file_path: &Path) -> Result { // First try to open the .partial file. @@ -207,9 +195,9 @@ impl ReplicationConn { &mut self, spg: &mut SafekeeperPostgresHandler, pgb: &mut PostgresBackend, - cmd: &Bytes, + mut start_pos: Lsn, ) -> Result<()> { - let _enter = info_span!("WAL sender", timeline = %spg.timelineid.unwrap()).entered(); + let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered(); // spawn the background thread which receives HotStandbyFeedback messages. let bg_timeline = Arc::clone(spg.timeline.get()); @@ -230,8 +218,6 @@ impl ReplicationConn { }) .unwrap(); - let mut start_pos = Self::parse_start(cmd)?; - let mut wal_seg_size: usize; loop { wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize; @@ -266,7 +252,7 @@ impl ReplicationConn { None } else { let timelineid = spg.timeline.get().timelineid; - let tenant_id = spg.tenantid.unwrap(); + let tenant_id = spg.ztenantid.unwrap(); let tx_clone = spg.tx.clone(); spg.tx .send(CallmeEvent::Pause(tenant_id, timelineid)) diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 37ecc74c41..8938c2803b 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -27,7 +27,7 @@ pub trait Handler { /// postgres_backend will issue ReadyForQuery after calling this (this /// might be not what we want after CopyData streaming, but currently we don't /// care). - fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()>; + fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()>; /// Called on startup packet receival, allows to process params. /// @@ -164,6 +164,17 @@ pub fn is_socket_read_timed_out(error: &anyhow::Error) -> bool { false } +// Truncate 0 from C string in Bytes and stringify it (returns slice, no allocations) +// PG protocol strings are always C strings. +fn cstr_to_str(b: &Bytes) -> Result<&str> { + let without_null = if b.last() == Some(&0) { + &b[..b.len() - 1] + } else { + &b[..] + }; + std::str::from_utf8(without_null).map_err(|e| e.into()) +} + impl PostgresBackend { pub fn new( socket: TcpStream, @@ -417,15 +428,18 @@ impl PostgresBackend { } FeMessage::Query(m) => { - trace!("got query {:?}", m.body); + // remove null terminator + let query_string = cstr_to_str(&m.body)?; + + trace!("got query {:?}", query_string); // xxx distinguish fatal and recoverable errors? - if let Err(e) = handler.process_query(self, m.body.clone()) { + if let Err(e) = handler.process_query(self, query_string) { let errmsg = format!("{}", e); // ":#" uses the alternate formatting style, which makes anyhow display the // full cause of the error, not just the top-level context. We don't want to // send that in the ErrorResponse though, because it's not relevant to the // compute node logs. - warn!("query handler for {:?} failed: {:#}", m.body, e); + warn!("query handler for {} failed: {:#}", query_string, e); if e.to_string().contains("failed to run") { self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; return Ok(ProcessMsgResult::Break); @@ -454,15 +468,13 @@ impl PostgresBackend { } FeMessage::Execute(_) => { - trace!("got execute {:?}", unnamed_query_string); + let query_string = cstr_to_str(unnamed_query_string)?; + trace!("got execute {:?}", query_string); // xxx distinguish fatal and recoverable errors? - if let Err(e) = handler.process_query(self, unnamed_query_string.clone()) { + if let Err(e) = handler.process_query(self, query_string) { let errmsg = format!("{}", e); - warn!( - "query handler for {:?} failed: {:#}", - unnamed_query_string, e - ); + warn!("query handler for {:?} failed: {:#}", query_string, e); self.write_message(&BeMessage::ErrorResponse(errmsg))?; } // NOTE there is no ReadyForQuery message. This handler is used diff --git a/zenith_utils/tests/ssl_test.rs b/zenith_utils/tests/ssl_test.rs index 2a597700ae..ef2bf1ed4a 100644 --- a/zenith_utils/tests/ssl_test.rs +++ b/zenith_utils/tests/ssl_test.rs @@ -35,7 +35,7 @@ lazy_static! { fn ssl() { let (mut client_sock, server_sock) = make_tcp_pair(); - const QUERY: &[u8] = b"hello world"; + const QUERY: &str = "hello world"; let client_jh = std::thread::spawn(move || { // SSLRequest @@ -82,7 +82,7 @@ fn ssl() { stream .write_u32::(4u32 + QUERY.len() as u32) .unwrap(); - stream.write_all(QUERY).unwrap(); + stream.write_all(QUERY.as_ref()).unwrap(); stream.flush().unwrap(); // ReadyForQuery @@ -97,9 +97,9 @@ fn ssl() { fn process_query( &mut self, _pgb: &mut PostgresBackend, - query_string: bytes::Bytes, + query_string: &str, ) -> anyhow::Result<()> { - self.got_query = query_string.as_ref() == QUERY; + self.got_query = query_string == QUERY; Ok(()) } } @@ -142,7 +142,7 @@ fn no_ssl() { fn process_query( &mut self, _pgb: &mut PostgresBackend, - _query_string: bytes::Bytes, + _query_string: &str, ) -> anyhow::Result<()> { panic!() } @@ -202,7 +202,7 @@ fn server_forces_ssl() { fn process_query( &mut self, _pgb: &mut PostgresBackend, - _query_string: bytes::Bytes, + _query_string: &str, ) -> anyhow::Result<()> { panic!() }