From 4788248e11de4d7fe688e625fa5a4b8c6d7981eb Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 7 May 2021 01:51:58 -0700 Subject: [PATCH] wal_service: remove manual serialization code Commit to serde for serialization of data structures. --- walkeeper/src/wal_service.rs | 258 ++--------------------------------- 1 file changed, 14 insertions(+), 244 deletions(-) diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index dbf32b3b90..5f2fcd92be 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -1,8 +1,7 @@ -// -// WAL service listens for client connections and -// receive WAL from wal_proposer and send it to WAL receivers -// - +/// +/// WAL service listens for client connections and +/// receive WAL from wal_proposer and send it to WAL receivers +/// use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use fs2::FileExt; @@ -205,43 +204,23 @@ struct Connection { conf: WalAcceptorConf, } -/// Customer serializer API (TODO: use protobuf?) -trait NewSerializer: - OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug -{ +/// Serde adapter for BytesMut +/// +// It's not clear whether this will be needed in the long term. +// If so, it should probably move to `zenith_utils::bin_ser` +trait NewSerializer: Serialize + DeserializeOwned { fn pack(&self, buf: &mut BytesMut) { - let mut buf1 = BytesMut::with_capacity(buf.capacity()); - self.pack2(&mut buf1); - let buf2 = BytesMut::with_capacity(buf.capacity()); - let mut buf2w = buf2.writer(); - self.ser_into(&mut buf2w).unwrap(); - let buf2 = buf2w.into_inner(); - assert_eq!(buf1, buf2); - self.pack2(buf); + let mut buf_w = buf.writer(); + self.ser_into(&mut buf_w).unwrap(); } fn unpack(buf: &mut BytesMut) -> Self { - let buf1: &[u8] = buf; - let res1 = Self::des_from(buf1).unwrap(); - let res2 = Self::unpack2(buf); - assert_eq!(res1, res2); - res2 + let buf_r = buf.reader(); + Self::des_from(buf_r).unwrap() } } -impl NewSerializer for T where - T: OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug -{ -} - -trait OldSerializer { - fn pack2(&self, buf: &mut BytesMut); - fn unpack2(buf: &mut BytesMut) -> Self; -} - -// -// Implementations -// +impl NewSerializer for T where T: Serialize + DeserializeOwned {} /// Report and return IO error macro_rules! io_error { @@ -257,20 +236,6 @@ fn parse_hex_str(s: &str) -> Result { } } -impl OldSerializer for NodeId { - fn pack2(&self, buf: &mut BytesMut) { - buf.put_u128_le(self.uuid); - buf.put_u64(self.term); // use big endian to provide compatibility with memcmp - } - - fn unpack2(buf: &mut BytesMut) -> NodeId { - NodeId { - uuid: buf.get_u128_le(), - term: buf.get_u64(), // use big endian to provide compatibility with memcmp - } - } -} - // For use with serde's `serialize_with` attribute fn serialize_u64_flip_endian(x: &u64, s: S) -> std::result::Result where @@ -287,100 +252,6 @@ where let x: u64 = serde::de::Deserialize::deserialize(deserializer)?; Ok(u64::from_be(x)) } -// impl Serialize for NodeId { -// fn serialize(&self, serializer: S) -> std::result::Result -// where -// S: serde::Serializer, -// { -// use serde::ser::SerializeStruct; - -// // 2 is the number of fields in the struct. -// let mut state = serializer.serialize_struct("NodeId", 2)?; -// state.serialize_field("uuid", &self.uuid)?; -// // Flip endian-ness of `term` -// state.serialize_field("term", &u64::from_be(self.term))?; -// state.end() -// } -// } - -// // FIXME: this is a bit quick and dirty; it won't work for -// // any serde data formats besides `bincode`. -// impl<'de> Deserialize<'de> for NodeId { -// fn deserialize(deserializer: D) -> std::result::Result -// where -// D: serde::Deserializer<'de>, -// { -// let uuid: u128 = Deserialize::deserialize(deserializer)?; -// let term: u64 = Deserialize::deserialize(deserializer)?; -// let term = u64::from_be(term); - -// Ok(NodeId { uuid, term }) -// } -// } - -impl OldSerializer for ServerInfo { - fn pack2(&self, buf: &mut BytesMut) { - buf.put_u32_le(self.protocol_version); - buf.put_u32_le(self.pg_version); - self.node_id.pack2(buf); - buf.put_u64_le(self.system_id); - buf.put_slice(&self.timeline_id.as_arr()); // FIXME make timeline use serde - buf.put_u64_le(self.wal_end); - buf.put_u32_le(self.timeline); - buf.put_u32_le(self.wal_seg_size); - } - fn unpack2(buf: &mut BytesMut) -> ServerInfo { - ServerInfo { - protocol_version: buf.get_u32_le(), - pg_version: buf.get_u32_le(), - node_id: NodeId::unpack(buf), - system_id: buf.get_u64_le(), - timeline_id: ZTimelineId::get_from_buf(buf), // FIXME make timeline use serde - wal_end: buf.get_u64_le(), - timeline: buf.get_u32_le(), - wal_seg_size: buf.get_u32_le(), - } - } -} - -impl OldSerializer for RequestVote { - fn pack2(&self, buf: &mut BytesMut) { - self.node_id.pack2(buf); - buf.put_u64_le(self.vcl); - buf.put_u64_le(self.epoch); - } - - fn unpack2(buf: &mut BytesMut) -> RequestVote { - RequestVote { - node_id: NodeId::unpack(buf), - vcl: buf.get_u64_le(), - epoch: buf.get_u64_le(), - } - } -} - -impl OldSerializer for SafeKeeperInfo { - fn pack2(&self, buf: &mut BytesMut) { - buf.put_u32_le(self.magic); - buf.put_u32_le(self.format_version); - buf.put_u64_le(self.epoch); - self.server.pack2(buf); - buf.put_u64_le(self.commit_lsn); - buf.put_u64_le(self.flush_lsn); - buf.put_u64_le(self.restart_lsn); - } - fn unpack2(buf: &mut BytesMut) -> SafeKeeperInfo { - SafeKeeperInfo { - magic: buf.get_u32_le(), - format_version: buf.get_u32_le(), - epoch: buf.get_u64_le(), - server: ServerInfo::unpack(buf), - commit_lsn: buf.get_u64_le(), - flush_lsn: buf.get_u64_le(), - restart_lsn: buf.get_u64_le(), - } - } -} impl SafeKeeperInfo { fn new() -> SafeKeeperInfo { @@ -405,21 +276,6 @@ impl SafeKeeperInfo { } } -impl OldSerializer for HotStandbyFeedback { - fn pack2(&self, buf: &mut BytesMut) { - buf.put_u64_le(self.ts); - buf.put_u64_le(self.xmin); - buf.put_u64_le(self.catalog_xmin); - } - fn unpack2(buf: &mut BytesMut) -> HotStandbyFeedback { - HotStandbyFeedback { - ts: buf.get_u64_le(), - xmin: buf.get_u64_le(), - catalog_xmin: buf.get_u64_le(), - } - } -} - impl HotStandbyFeedback { fn parse(body: &Bytes) -> HotStandbyFeedback { HotStandbyFeedback { @@ -430,40 +286,6 @@ impl HotStandbyFeedback { } } -impl OldSerializer for SafeKeeperRequest { - fn pack2(&self, buf: &mut BytesMut) { - self.sender_id.pack2(buf); - buf.put_u64_le(self.begin_lsn); - buf.put_u64_le(self.end_lsn); - buf.put_u64_le(self.restart_lsn); - buf.put_u64_le(self.commit_lsn); - } - fn unpack2(buf: &mut BytesMut) -> SafeKeeperRequest { - SafeKeeperRequest { - sender_id: NodeId::unpack(buf), - begin_lsn: buf.get_u64_le(), - end_lsn: buf.get_u64_le(), - restart_lsn: buf.get_u64_le(), - commit_lsn: buf.get_u64_le(), - } - } -} - -impl OldSerializer for SafeKeeperResponse { - fn pack2(&self, buf: &mut BytesMut) { - buf.put_u64_le(self.epoch); - buf.put_u64_le(self.flush_lsn); - self.hs_feedback.pack2(buf); - } - fn unpack2(buf: &mut BytesMut) -> SafeKeeperResponse { - SafeKeeperResponse { - epoch: buf.get_u64_le(), - flush_lsn: buf.get_u64_le(), - hs_feedback: HotStandbyFeedback::unpack(buf), - } - } -} - lazy_static! { pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); @@ -1301,55 +1123,3 @@ impl Connection { ) } } - -#[cfg(test)] -mod tests { - use super::*; - - fn test_unpack() { - let mut buffer = BytesMut::new(); - for ii in 7..299 { - buffer.put_u8(ii as u8); - } - - let res = T::unpack(&mut buffer); - - //eprintln!("{:?}", res); - - let mut buffer2 = BytesMut::new(); - res.pack(&mut buffer2); - } - - #[test] - fn test_x() { - let si = ServerInfo { - protocol_version: 168364039, - pg_version: 235736075, - node_id: NodeId { - uuid: 40027986537151223483757821949182545935, - term: 2242829044932683046, - }, - system_id: 3327364263599220775, - timeline_id: ZTimelineId::from([ - 47u8, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, - ]), - wal_end: 5063528411713060927, - timeline: 1246316615, - wal_seg_size: 1313688651, - }; - - let mut buffer2 = BytesMut::new(); - si.pack(&mut buffer2); - } - - #[test] - fn test_pack_unpack() { - test_unpack::(); - test_unpack::(); - test_unpack::(); - test_unpack::(); - test_unpack::(); - test_unpack::(); - test_unpack::(); - } -}