diff --git a/Cargo.lock b/Cargo.lock index eb191e681c..2df732e815 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1181,6 +1181,7 @@ dependencies = [ "regex", "rocksdb", "rust-s3", + "serde", "slog", "slog-async", "slog-scope", @@ -2246,6 +2247,7 @@ dependencies = [ "postgres_ffi", "regex", "rust-s3", + "serde", "slog", "slog-async", "slog-scope", @@ -2255,6 +2257,7 @@ dependencies = [ "tokio-stream", "walkdir", "workspace_hack", + "zenith_utils", ] [[package]] diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index f832539a76..47e7eb6fcc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,6 +38,8 @@ thiserror = "1.0" hex = "0.4.3" tar = "0.4.33" parse_duration = "2.1.1" +serde = { version = "1.0", features = ["derive"] } + postgres_ffi = { path = "../postgres_ffi" } zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 63570e59cf..2bfac69dee 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,3 +1,4 @@ +use serde::{Deserialize, Serialize}; use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; @@ -49,7 +50,7 @@ pub struct PageServerConf { /// is separate from PostgreSQL timelines, and doesn't have those /// limitations. A zenith timeline is identified by a 128-bit ID, which /// is usually printed out as a hex string. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ZTimelineId([u8; 16]); impl FromStr for ZTimelineId { diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index dc8b9c9bd3..2e58ad0dd0 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -29,8 +29,10 @@ anyhow = "1.0" crc32c = "0.6.0" parse_duration = "2.1.1" walkdir = "2" +serde = { version = "1.0", features = ["derive"] } # FIXME: 'pageserver' is needed for ZTimelineId. Refactor pageserver = { path = "../pageserver" } postgres_ffi = { path = "../postgres_ffi" } workspace_hack = { path = "../workspace_hack" } +zenith_utils = { path = "../zenith_utils" } diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index c270ca5efc..dbf32b3b90 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -10,6 +10,7 @@ use lazy_static::lazy_static; use log::*; use postgres::{Client, NoTls}; use regex::Regex; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; @@ -19,6 +20,7 @@ use std::net::{TcpListener, TcpStream}; use std::str; use std::sync::{Arc, Condvar, Mutex}; use std::thread; +use zenith_utils::bin_ser::LeSer; use crate::pq_protocol::*; use crate::WalAcceptorConf; @@ -72,99 +74,108 @@ fn read_into(r: &mut impl Read, buf: &mut BytesMut) -> io::Result { Ok(num_bytes) } -/* - * Unique node identifier used by Paxos - */ +/// Unique node identifier used by Paxos #[repr(C)] -#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Ord, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] struct NodeId { - term: u64, uuid: u128, + #[serde(serialize_with = "serialize_u64_flip_endian")] + #[serde(deserialize_with = "deserialize_u64_flip_endian")] + term: u64, } #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] struct ServerInfo { - protocol_version: u32, /* proxy-safekeeper protocol version */ - pg_version: u32, /* Postgres server version */ + /// proxy-safekeeper protocol version + protocol_version: u32, + /// Postgres server version + pg_version: u32, node_id: NodeId, system_id: SystemId, - timeline_id: ZTimelineId, /* Zenith timelineid */ + /// Zenith timelineid + timeline_id: ZTimelineId, wal_end: XLogRecPtr, timeline: TimeLineID, wal_seg_size: u32, } -/* - * Vote request sent from proxy to safekeepers - */ +/// Vote request sent from proxy to safekeepers #[repr(C)] -#[derive(Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] struct RequestVote { node_id: NodeId, - vcl: XLogRecPtr, /* volume commit LSN */ - epoch: u64, /* new epoch when safekeeper reaches vcl */ + /// volume commit LSN + vcl: XLogRecPtr, + /// new epoch when safekeeper reaches vcl + epoch: u64, } -/* - * Information of about storage node - */ +/// Information of about storage node #[repr(C)] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] struct SafeKeeperInfo { - magic: u32, /* magic for verifying content the control file */ - format_version: u32, /* safekeeper format version */ - epoch: u64, /* safekeeper's epoch */ - server: ServerInfo, /* information about server */ - commit_lsn: XLogRecPtr, /* part of WAL acknowledged by quorum */ - flush_lsn: XLogRecPtr, /* locally flushed part of WAL */ - restart_lsn: XLogRecPtr, /* minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers */ + /// magic for verifying content the control file + magic: u32, + /// safekeeper format version + format_version: u32, + /// safekeeper's epoch + epoch: u64, + /// information about server + server: ServerInfo, + /// part of WAL acknowledged by quorum + commit_lsn: XLogRecPtr, + /// locally flushed part of WAL + flush_lsn: XLogRecPtr, + /// minimal LSN which may be needed for recovery of some safekeeper: min(commit_lsn) for all safekeepers + restart_lsn: XLogRecPtr, } -/* - * Hot standby feedback received from replica - */ +/// Hot standby feedback received from replica #[repr(C)] -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] struct HotStandbyFeedback { ts: TimestampTz, xmin: FullTransactionId, catalog_xmin: FullTransactionId, } -/* - * Request with WAL message sent from proxy to safekeeper. - */ +/// Request with WAL message sent from proxy to safekeeper. #[repr(C)] -#[derive(Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] struct SafeKeeperRequest { - sender_id: NodeId, /* Sender's node identifier (looks like we do not need it for TCP streaming connection) */ - begin_lsn: XLogRecPtr, /* start position of message in WAL */ - end_lsn: XLogRecPtr, /* end position of message in WAL */ - restart_lsn: XLogRecPtr, /* restart LSN position (minimal LSN which may be needed by proxy to perform recovery) */ - commit_lsn: XLogRecPtr, /* LSN committed by quorum of safekeepers */ + /// Sender's node identifier (looks like we do not need it for TCP streaming connection) + sender_id: NodeId, + /// start position of message in WAL + begin_lsn: XLogRecPtr, + /// end position of message in WAL + end_lsn: XLogRecPtr, + /// restart LSN position (minimal LSN which may be needed by proxy to perform recovery) + restart_lsn: XLogRecPtr, + /// LSN committed by quorum of safekeepers + commit_lsn: XLogRecPtr, } -/* - * Report safekeeper state to proxy - */ +/// Report safekeeper state to proxy #[repr(C)] -#[derive(Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] struct SafeKeeperResponse { epoch: u64, flush_lsn: XLogRecPtr, hs_feedback: HotStandbyFeedback, } -/* - * Shared state associated with database instance (tenant) - */ +/// Shared state associated with database instance (tenant) #[derive(Debug)] struct SharedState { - commit_lsn: XLogRecPtr, /* quorum commit LSN */ - info: SafeKeeperInfo, /* information about this safekeeper */ - control_file: Option, /* opened file control file handle (needed to hold exlusive file lock */ - hs_feedback: HotStandbyFeedback, /* combined hot standby feedback from all replicas */ + /// quorum commit LSN + commit_lsn: XLogRecPtr, + /// information about this safekeeper + info: SafeKeeperInfo, + /// opened file control file handle (needed to hold exlusive file lock + control_file: Option, + /// combined hot standby feedback from all replicas + hs_feedback: HotStandbyFeedback, } /// Database instance (tenant) @@ -176,38 +187,68 @@ pub struct Timeline { cond: Condvar, } -/* - * Private data -*/ +/// Private data #[derive(Debug)] struct Connection { timeline: Option>, - stream: TcpStream, /* Postgres connection */ - inbuf: BytesMut, /* input buffer */ - outbuf: BytesMut, /* output buffer */ - init_done: bool, /* startup packet proceeded */ - appname: Option, /* assigned application name */ - conf: WalAcceptorConf, /* wal acceptor configuration */ + /// Postgres connection + stream: TcpStream, + /// input buffer + inbuf: BytesMut, + /// output buffer + outbuf: BytesMut, + /// startup packet proceeded + init_done: bool, + /// assigned application name + appname: Option, + /// wal acceptor configuration + conf: WalAcceptorConf, } -/* - * Customer serializer API (TODO: use protobuf?) - */ -trait Serializer { - fn pack(&self, buf: &mut BytesMut); - fn unpack(buf: &mut BytesMut) -> Self; +/// Customer serializer API (TODO: use protobuf?) +trait NewSerializer: + OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug +{ + fn pack(&self, buf: &mut BytesMut) { + let mut buf1 = BytesMut::with_capacity(buf.capacity()); + self.pack2(&mut buf1); + let buf2 = BytesMut::with_capacity(buf.capacity()); + let mut buf2w = buf2.writer(); + self.ser_into(&mut buf2w).unwrap(); + let buf2 = buf2w.into_inner(); + assert_eq!(buf1, buf2); + self.pack2(buf); + } + + fn unpack(buf: &mut BytesMut) -> Self { + let buf1: &[u8] = buf; + let res1 = Self::des_from(buf1).unwrap(); + let res2 = Self::unpack2(buf); + assert_eq!(res1, res2); + res2 + } +} + +impl NewSerializer for T where + T: OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug +{ +} + +trait OldSerializer { + fn pack2(&self, buf: &mut BytesMut); + fn unpack2(buf: &mut BytesMut) -> Self; } // // Implementations // -//Report and return IO error */ +/// Report and return IO error macro_rules! io_error { ($($arg:tt)*) => (error!($($arg)*); return Err(io::Error::new(io::ErrorKind::Other,format!($($arg)*)))) } -// Safe hex string parser returning proper result +/// Safe hex string parser returning proper result fn parse_hex_str(s: &str) -> Result { if let Ok(val) = u32::from_str_radix(s, 16) { Ok(val as u64) @@ -216,13 +257,13 @@ fn parse_hex_str(s: &str) -> Result { } } -impl Serializer for NodeId { - fn pack(&self, buf: &mut BytesMut) { +impl OldSerializer for NodeId { + fn pack2(&self, buf: &mut BytesMut) { buf.put_u128_le(self.uuid); buf.put_u64(self.term); // use big endian to provide compatibility with memcmp } - fn unpack(buf: &mut BytesMut) -> NodeId { + fn unpack2(buf: &mut BytesMut) -> NodeId { NodeId { uuid: buf.get_u128_le(), term: buf.get_u64(), // use big endian to provide compatibility with memcmp @@ -230,24 +271,71 @@ impl Serializer for NodeId { } } -impl Serializer for ServerInfo { - fn pack(&self, buf: &mut BytesMut) { +// For use with serde's `serialize_with` attribute +fn serialize_u64_flip_endian(x: &u64, s: S) -> std::result::Result +where + S: serde::Serializer, +{ + s.serialize_u64(u64::from_be(*x)) +} + +// For use with serde's `deserialize_with` attribute +fn deserialize_u64_flip_endian<'de, D>(deserializer: D) -> std::result::Result +where + D: serde::de::Deserializer<'de>, +{ + let x: u64 = serde::de::Deserialize::deserialize(deserializer)?; + Ok(u64::from_be(x)) +} +// impl Serialize for NodeId { +// fn serialize(&self, serializer: S) -> std::result::Result +// where +// S: serde::Serializer, +// { +// use serde::ser::SerializeStruct; + +// // 2 is the number of fields in the struct. +// let mut state = serializer.serialize_struct("NodeId", 2)?; +// state.serialize_field("uuid", &self.uuid)?; +// // Flip endian-ness of `term` +// state.serialize_field("term", &u64::from_be(self.term))?; +// state.end() +// } +// } + +// // FIXME: this is a bit quick and dirty; it won't work for +// // any serde data formats besides `bincode`. +// impl<'de> Deserialize<'de> for NodeId { +// fn deserialize(deserializer: D) -> std::result::Result +// where +// D: serde::Deserializer<'de>, +// { +// let uuid: u128 = Deserialize::deserialize(deserializer)?; +// let term: u64 = Deserialize::deserialize(deserializer)?; +// let term = u64::from_be(term); + +// Ok(NodeId { uuid, term }) +// } +// } + +impl OldSerializer for ServerInfo { + fn pack2(&self, buf: &mut BytesMut) { buf.put_u32_le(self.protocol_version); buf.put_u32_le(self.pg_version); - self.node_id.pack(buf); + self.node_id.pack2(buf); buf.put_u64_le(self.system_id); - buf.put_slice(&self.timeline_id.as_arr()); + buf.put_slice(&self.timeline_id.as_arr()); // FIXME make timeline use serde buf.put_u64_le(self.wal_end); buf.put_u32_le(self.timeline); buf.put_u32_le(self.wal_seg_size); } - fn unpack(buf: &mut BytesMut) -> ServerInfo { + fn unpack2(buf: &mut BytesMut) -> ServerInfo { ServerInfo { protocol_version: buf.get_u32_le(), pg_version: buf.get_u32_le(), node_id: NodeId::unpack(buf), system_id: buf.get_u64_le(), - timeline_id: ZTimelineId::get_from_buf(buf), + timeline_id: ZTimelineId::get_from_buf(buf), // FIXME make timeline use serde wal_end: buf.get_u64_le(), timeline: buf.get_u32_le(), wal_seg_size: buf.get_u32_le(), @@ -255,14 +343,14 @@ impl Serializer for ServerInfo { } } -impl Serializer for RequestVote { - fn pack(&self, buf: &mut BytesMut) { - self.node_id.pack(buf); +impl OldSerializer for RequestVote { + fn pack2(&self, buf: &mut BytesMut) { + self.node_id.pack2(buf); buf.put_u64_le(self.vcl); buf.put_u64_le(self.epoch); } - fn unpack(buf: &mut BytesMut) -> RequestVote { + fn unpack2(buf: &mut BytesMut) -> RequestVote { RequestVote { node_id: NodeId::unpack(buf), vcl: buf.get_u64_le(), @@ -271,17 +359,17 @@ impl Serializer for RequestVote { } } -impl Serializer for SafeKeeperInfo { - fn pack(&self, buf: &mut BytesMut) { +impl OldSerializer for SafeKeeperInfo { + fn pack2(&self, buf: &mut BytesMut) { buf.put_u32_le(self.magic); buf.put_u32_le(self.format_version); buf.put_u64_le(self.epoch); - self.server.pack(buf); + self.server.pack2(buf); buf.put_u64_le(self.commit_lsn); buf.put_u64_le(self.flush_lsn); buf.put_u64_le(self.restart_lsn); } - fn unpack(buf: &mut BytesMut) -> SafeKeeperInfo { + fn unpack2(buf: &mut BytesMut) -> SafeKeeperInfo { SafeKeeperInfo { magic: buf.get_u32_le(), format_version: buf.get_u32_le(), @@ -317,13 +405,13 @@ impl SafeKeeperInfo { } } -impl Serializer for HotStandbyFeedback { - fn pack(&self, buf: &mut BytesMut) { +impl OldSerializer for HotStandbyFeedback { + fn pack2(&self, buf: &mut BytesMut) { buf.put_u64_le(self.ts); buf.put_u64_le(self.xmin); buf.put_u64_le(self.catalog_xmin); } - fn unpack(buf: &mut BytesMut) -> HotStandbyFeedback { + fn unpack2(buf: &mut BytesMut) -> HotStandbyFeedback { HotStandbyFeedback { ts: buf.get_u64_le(), xmin: buf.get_u64_le(), @@ -342,15 +430,15 @@ impl HotStandbyFeedback { } } -impl Serializer for SafeKeeperRequest { - fn pack(&self, buf: &mut BytesMut) { - self.sender_id.pack(buf); +impl OldSerializer for SafeKeeperRequest { + fn pack2(&self, buf: &mut BytesMut) { + self.sender_id.pack2(buf); buf.put_u64_le(self.begin_lsn); buf.put_u64_le(self.end_lsn); buf.put_u64_le(self.restart_lsn); buf.put_u64_le(self.commit_lsn); } - fn unpack(buf: &mut BytesMut) -> SafeKeeperRequest { + fn unpack2(buf: &mut BytesMut) -> SafeKeeperRequest { SafeKeeperRequest { sender_id: NodeId::unpack(buf), begin_lsn: buf.get_u64_le(), @@ -361,13 +449,13 @@ impl Serializer for SafeKeeperRequest { } } -impl Serializer for SafeKeeperResponse { - fn pack(&self, buf: &mut BytesMut) { +impl OldSerializer for SafeKeeperResponse { + fn pack2(&self, buf: &mut BytesMut) { buf.put_u64_le(self.epoch); buf.put_u64_le(self.flush_lsn); - self.hs_feedback.pack(buf); + self.hs_feedback.pack2(buf); } - fn unpack(buf: &mut BytesMut) -> SafeKeeperResponse { + fn unpack2(buf: &mut BytesMut) -> SafeKeeperResponse { SafeKeeperResponse { epoch: buf.get_u64_le(), flush_lsn: buf.get_u64_le(), @@ -576,7 +664,7 @@ impl Connection { Ok(()) } - fn read_req(&mut self) -> Result { + fn read_req(&mut self) -> Result { let size = mem::size_of::(); self.inbuf.resize(size, 0u8); self.stream.read_exact(&mut self.inbuf[0..size])?; @@ -1213,3 +1301,55 @@ impl Connection { ) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_unpack() { + let mut buffer = BytesMut::new(); + for ii in 7..299 { + buffer.put_u8(ii as u8); + } + + let res = T::unpack(&mut buffer); + + //eprintln!("{:?}", res); + + let mut buffer2 = BytesMut::new(); + res.pack(&mut buffer2); + } + + #[test] + fn test_x() { + let si = ServerInfo { + protocol_version: 168364039, + pg_version: 235736075, + node_id: NodeId { + uuid: 40027986537151223483757821949182545935, + term: 2242829044932683046, + }, + system_id: 3327364263599220775, + timeline_id: ZTimelineId::from([ + 47u8, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, + ]), + wal_end: 5063528411713060927, + timeline: 1246316615, + wal_seg_size: 1313688651, + }; + + let mut buffer2 = BytesMut::new(); + si.pack(&mut buffer2); + } + + #[test] + fn test_pack_unpack() { + test_unpack::(); + test_unpack::(); + test_unpack::(); + test_unpack::(); + test_unpack::(); + test_unpack::(); + test_unpack::(); + } +}