From f923464b935f381ff376fe37fef33b1e115db7cb Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 16 Jun 2021 18:52:36 +0300 Subject: [PATCH] Remove pq_protocol.rs. I forgot to do that in b2f51026aa0. --- walkeeper/src/lib.rs | 1 - walkeeper/src/pq_protocol.rs | 224 ----------------------------------- walkeeper/src/receive_wal.rs | 2 +- 3 files changed, 1 insertion(+), 226 deletions(-) delete mode 100644 walkeeper/src/pq_protocol.rs diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 82f9948ad7..d269776064 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; -pub mod pq_protocol; pub mod receive_wal; pub mod replication; pub mod s3_offload; diff --git a/walkeeper/src/pq_protocol.rs b/walkeeper/src/pq_protocol.rs deleted file mode 100644 index bad7a7eeca..0000000000 --- a/walkeeper/src/pq_protocol.rs +++ /dev/null @@ -1,224 +0,0 @@ -use byteorder::{BigEndian, ReadBytesExt}; -use bytes::{BufMut, Bytes, BytesMut}; -use pageserver::ZTimelineId; -use std::io::{self, Read}; -use std::str; -use std::str::FromStr; - -pub type Oid = u32; -pub type SystemId = u64; - -#[derive(Debug)] -pub enum FeMessage { - Query(FeQueryMessage), - Terminate, - CopyData(FeCopyData), -} - -#[derive(Debug)] -pub struct RowDescriptor { - pub typoid: Oid, - pub typlen: i16, - pub name: &'static [u8], -} - -#[derive(Debug)] -pub enum BeMessage<'a> { - AuthenticationOk, - ReadyForQuery, - RowDescription(&'a [RowDescriptor]), - DataRow(&'a [Option<&'a [u8]>]), - CommandComplete(&'a [u8]), - Negotiate, - Copy, -} - -#[derive(Debug)] -pub struct FeStartupMessage { - pub version: u32, - pub kind: StartupRequestCode, - pub timelineid: ZTimelineId, - pub appname: Option, -} - -#[derive(Debug)] -pub enum StartupRequestCode { - Cancel, - NegotiateSsl, - NegotiateGss, - Normal, -} - -impl FeStartupMessage { - pub fn read_from(reader: &mut impl Read) -> io::Result { - const MAX_STARTUP_PACKET_LENGTH: usize = 10000; - const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678; - const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679; - const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680; - - let len = reader.read_u32::()? as usize; - - if len < 4 || len > MAX_STARTUP_PACKET_LENGTH { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "FeStartupMessage: invalid message length", - )); - } - - let version = reader.read_u32::()?; - - let kind = match version { - CANCEL_REQUEST_CODE => StartupRequestCode::Cancel, - NEGOTIATE_SSL_CODE => StartupRequestCode::NegotiateSsl, - NEGOTIATE_GSS_CODE => StartupRequestCode::NegotiateGss, - _ => StartupRequestCode::Normal, - }; - - let params_len = len - 8; - let mut params_bytes = vec![0u8; params_len]; - reader.read_exact(params_bytes.as_mut())?; - - let params_str = str::from_utf8(¶ms_bytes).unwrap(); - let params = params_str.split('\0'); - let mut options = false; - let mut timelineid: Option = None; - let mut appname: Option = None; - for p in params { - if p == "options" { - options = true; - } else if options { - for opt in p.split(' ') { - if let Some(ztimelineid_str) = opt.strip_prefix("ztimelineid=") { - // FIXME: rethrow parsing error, don't unwrap - timelineid = Some(ZTimelineId::from_str(ztimelineid_str).unwrap()); - } else if let Some(val) = opt.strip_prefix("application_name=") { - appname = Some(val.to_string()); - } - } - break; - } - } - if timelineid.is_none() { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "timelineid is required", - )); - } - - Ok(FeStartupMessage { - version, - kind, - appname, - timelineid: timelineid.unwrap(), - }) - } -} - -#[derive(Debug)] -pub struct FeQueryMessage { - pub body: Bytes, -} - -#[derive(Debug)] -pub struct FeCopyData { - pub body: Bytes, -} - -impl<'a> BeMessage<'a> { - pub fn write(buf: &mut BytesMut, message: &BeMessage) { - match message { - BeMessage::AuthenticationOk => { - buf.put_u8(b'R'); - buf.put_i32(4 + 4); - buf.put_i32(0); - } - - BeMessage::ReadyForQuery => { - buf.put_u8(b'Z'); - buf.put_i32(4 + 1); - buf.put_u8(b'I'); - } - - BeMessage::Negotiate => { - buf.put_u8(b'N'); - } - - BeMessage::Copy => { - buf.put_u8(b'W'); - buf.put_i32(7); - buf.put_u8(b'\0'); - buf.put_u8(b'\0'); - buf.put_u8(b'\0'); - } - - BeMessage::RowDescription(rows) => { - buf.put_u8(b'T'); - - let mut body = BytesMut::new(); - body.put_i16(rows.len() as i16); // # of fields - for row in rows.iter() { - body.put_slice(row.name); - body.put_i32(0); /* table oid */ - body.put_i16(0); /* attnum */ - body.put_u32(row.typoid); - body.put_i16(row.typlen); - body.put_i32(-1); /* typmod */ - body.put_i16(0); /* format code */ - } - buf.put_i32((4 + body.len()) as i32); // # of bytes, including len field itself - buf.put(body); - } - - BeMessage::DataRow(vals) => { - buf.put_u8(b'D'); - let total_len: usize = vals - .iter() - .fold(0, |acc, row| acc + 4 + row.map_or(0, |s| s.len())); - buf.put_u32(4 + 2 + total_len as u32); - buf.put_u16(vals.len() as u16); - for val_opt in vals.iter() { - if let Some(val) = val_opt { - buf.put_u32(val.len() as u32); - buf.put_slice(val); - } else { - buf.put_i32(-1); - } - } - } - - BeMessage::CommandComplete(cmd) => { - buf.put_u8(b'C'); - buf.put_i32(4 + cmd.len() as i32); - buf.put_slice(cmd); - } - } - } -} - -impl FeMessage { - pub fn read_from(reader: &mut impl Read) -> io::Result { - let tag = reader.read_u8()?; - let len = reader.read_u32::()?; - - if len < 4 { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "FeMessage: invalid message length", - )); - } - - let body_len = (len - 4) as usize; - let mut body = vec![0u8; body_len]; - reader.read_exact(&mut body)?; - - match tag { - b'Q' => Ok(FeMessage::Query(FeQueryMessage { body: body.into() })), - b'd' => Ok(FeMessage::CopyData(FeCopyData { body: body.into() })), - b'X' => Ok(FeMessage::Terminate), - tag => Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("unknown message tag: {},'{:?}'", tag, body), - )), - } - } -} diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index 7dac0a4440..55aa2e6c29 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -17,12 +17,12 @@ use std::thread::sleep; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; -use crate::pq_protocol::*; use crate::replication::HotStandbyFeedback; use crate::timeline::{Timeline, TimelineTools}; use crate::WalAcceptorConf; use pageserver::ZTimelineId; use postgres_ffi::xlog_utils::{TimeLineID, XLogFileName, MAX_SEND_SIZE, XLOG_BLCKSZ}; +use zenith_utils::pq_proto::SystemId; pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 1;