wal_service: remove manual serialization code

Commit to serde for serialization of data structures.
This commit is contained in:
Eric Seppanen
2021-05-07 01:51:58 -07:00
parent 0cbb3798da
commit 4788248e11

View File

@@ -1,8 +1,7 @@
//
// WAL service listens for client connections and
// receive WAL from wal_proposer and send it to WAL receivers
//
///
/// WAL service listens for client connections and
/// receive WAL from wal_proposer and send it to WAL receivers
///
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use fs2::FileExt;
@@ -205,43 +204,23 @@ struct Connection {
conf: WalAcceptorConf,
}
/// Customer serializer API (TODO: use protobuf?)
trait NewSerializer:
OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug
{
/// Serde adapter for BytesMut
///
// It's not clear whether this will be needed in the long term.
// If so, it should probably move to `zenith_utils::bin_ser`
trait NewSerializer: Serialize + DeserializeOwned {
fn pack(&self, buf: &mut BytesMut) {
let mut buf1 = BytesMut::with_capacity(buf.capacity());
self.pack2(&mut buf1);
let buf2 = BytesMut::with_capacity(buf.capacity());
let mut buf2w = buf2.writer();
self.ser_into(&mut buf2w).unwrap();
let buf2 = buf2w.into_inner();
assert_eq!(buf1, buf2);
self.pack2(buf);
let mut buf_w = buf.writer();
self.ser_into(&mut buf_w).unwrap();
}
fn unpack(buf: &mut BytesMut) -> Self {
let buf1: &[u8] = buf;
let res1 = Self::des_from(buf1).unwrap();
let res2 = Self::unpack2(buf);
assert_eq!(res1, res2);
res2
let buf_r = buf.reader();
Self::des_from(buf_r).unwrap()
}
}
impl<T> NewSerializer for T where
T: OldSerializer + Serialize + DeserializeOwned + PartialEq + std::fmt::Debug
{
}
trait OldSerializer {
fn pack2(&self, buf: &mut BytesMut);
fn unpack2(buf: &mut BytesMut) -> Self;
}
//
// Implementations
//
impl<T> NewSerializer for T where T: Serialize + DeserializeOwned {}
/// Report and return IO error
macro_rules! io_error {
@@ -257,20 +236,6 @@ fn parse_hex_str(s: &str) -> Result<u64> {
}
}
impl OldSerializer for NodeId {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u128_le(self.uuid);
buf.put_u64(self.term); // use big endian to provide compatibility with memcmp
}
fn unpack2(buf: &mut BytesMut) -> NodeId {
NodeId {
uuid: buf.get_u128_le(),
term: buf.get_u64(), // use big endian to provide compatibility with memcmp
}
}
}
// For use with serde's `serialize_with` attribute
fn serialize_u64_flip_endian<S>(x: &u64, s: S) -> std::result::Result<S::Ok, S::Error>
where
@@ -287,100 +252,6 @@ where
let x: u64 = serde::de::Deserialize::deserialize(deserializer)?;
Ok(u64::from_be(x))
}
// impl Serialize for NodeId {
// fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
// where
// S: serde::Serializer,
// {
// use serde::ser::SerializeStruct;
// // 2 is the number of fields in the struct.
// let mut state = serializer.serialize_struct("NodeId", 2)?;
// state.serialize_field("uuid", &self.uuid)?;
// // Flip endian-ness of `term`
// state.serialize_field("term", &u64::from_be(self.term))?;
// state.end()
// }
// }
// // FIXME: this is a bit quick and dirty; it won't work for
// // any serde data formats besides `bincode`.
// impl<'de> Deserialize<'de> for NodeId {
// fn deserialize<D>(deserializer: D) -> std::result::Result<NodeId, D::Error>
// where
// D: serde::Deserializer<'de>,
// {
// let uuid: u128 = Deserialize::deserialize(deserializer)?;
// let term: u64 = Deserialize::deserialize(deserializer)?;
// let term = u64::from_be(term);
// Ok(NodeId { uuid, term })
// }
// }
impl OldSerializer for ServerInfo {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.protocol_version);
buf.put_u32_le(self.pg_version);
self.node_id.pack2(buf);
buf.put_u64_le(self.system_id);
buf.put_slice(&self.timeline_id.as_arr()); // FIXME make timeline use serde
buf.put_u64_le(self.wal_end);
buf.put_u32_le(self.timeline);
buf.put_u32_le(self.wal_seg_size);
}
fn unpack2(buf: &mut BytesMut) -> ServerInfo {
ServerInfo {
protocol_version: buf.get_u32_le(),
pg_version: buf.get_u32_le(),
node_id: NodeId::unpack(buf),
system_id: buf.get_u64_le(),
timeline_id: ZTimelineId::get_from_buf(buf), // FIXME make timeline use serde
wal_end: buf.get_u64_le(),
timeline: buf.get_u32_le(),
wal_seg_size: buf.get_u32_le(),
}
}
}
impl OldSerializer for RequestVote {
fn pack2(&self, buf: &mut BytesMut) {
self.node_id.pack2(buf);
buf.put_u64_le(self.vcl);
buf.put_u64_le(self.epoch);
}
fn unpack2(buf: &mut BytesMut) -> RequestVote {
RequestVote {
node_id: NodeId::unpack(buf),
vcl: buf.get_u64_le(),
epoch: buf.get_u64_le(),
}
}
}
impl OldSerializer for SafeKeeperInfo {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u32_le(self.magic);
buf.put_u32_le(self.format_version);
buf.put_u64_le(self.epoch);
self.server.pack2(buf);
buf.put_u64_le(self.commit_lsn);
buf.put_u64_le(self.flush_lsn);
buf.put_u64_le(self.restart_lsn);
}
fn unpack2(buf: &mut BytesMut) -> SafeKeeperInfo {
SafeKeeperInfo {
magic: buf.get_u32_le(),
format_version: buf.get_u32_le(),
epoch: buf.get_u64_le(),
server: ServerInfo::unpack(buf),
commit_lsn: buf.get_u64_le(),
flush_lsn: buf.get_u64_le(),
restart_lsn: buf.get_u64_le(),
}
}
}
impl SafeKeeperInfo {
fn new() -> SafeKeeperInfo {
@@ -405,21 +276,6 @@ impl SafeKeeperInfo {
}
}
impl OldSerializer for HotStandbyFeedback {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u64_le(self.ts);
buf.put_u64_le(self.xmin);
buf.put_u64_le(self.catalog_xmin);
}
fn unpack2(buf: &mut BytesMut) -> HotStandbyFeedback {
HotStandbyFeedback {
ts: buf.get_u64_le(),
xmin: buf.get_u64_le(),
catalog_xmin: buf.get_u64_le(),
}
}
}
impl HotStandbyFeedback {
fn parse(body: &Bytes) -> HotStandbyFeedback {
HotStandbyFeedback {
@@ -430,40 +286,6 @@ impl HotStandbyFeedback {
}
}
impl OldSerializer for SafeKeeperRequest {
fn pack2(&self, buf: &mut BytesMut) {
self.sender_id.pack2(buf);
buf.put_u64_le(self.begin_lsn);
buf.put_u64_le(self.end_lsn);
buf.put_u64_le(self.restart_lsn);
buf.put_u64_le(self.commit_lsn);
}
fn unpack2(buf: &mut BytesMut) -> SafeKeeperRequest {
SafeKeeperRequest {
sender_id: NodeId::unpack(buf),
begin_lsn: buf.get_u64_le(),
end_lsn: buf.get_u64_le(),
restart_lsn: buf.get_u64_le(),
commit_lsn: buf.get_u64_le(),
}
}
}
impl OldSerializer for SafeKeeperResponse {
fn pack2(&self, buf: &mut BytesMut) {
buf.put_u64_le(self.epoch);
buf.put_u64_le(self.flush_lsn);
self.hs_feedback.pack2(buf);
}
fn unpack2(buf: &mut BytesMut) -> SafeKeeperResponse {
SafeKeeperResponse {
epoch: buf.get_u64_le(),
flush_lsn: buf.get_u64_le(),
hs_feedback: HotStandbyFeedback::unpack(buf),
}
}
}
lazy_static! {
pub static ref TIMELINES: Mutex<HashMap<ZTimelineId, Arc<Timeline>>> =
Mutex::new(HashMap::new());
@@ -1301,55 +1123,3 @@ impl Connection {
)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_unpack<T: NewSerializer>() {
let mut buffer = BytesMut::new();
for ii in 7..299 {
buffer.put_u8(ii as u8);
}
let res = T::unpack(&mut buffer);
//eprintln!("{:?}", res);
let mut buffer2 = BytesMut::new();
res.pack(&mut buffer2);
}
#[test]
fn test_x() {
let si = ServerInfo {
protocol_version: 168364039,
pg_version: 235736075,
node_id: NodeId {
uuid: 40027986537151223483757821949182545935,
term: 2242829044932683046,
},
system_id: 3327364263599220775,
timeline_id: ZTimelineId::from([
47u8, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62,
]),
wal_end: 5063528411713060927,
timeline: 1246316615,
wal_seg_size: 1313688651,
};
let mut buffer2 = BytesMut::new();
si.pack(&mut buffer2);
}
#[test]
fn test_pack_unpack() {
test_unpack::<ServerInfo>();
test_unpack::<RequestVote>();
test_unpack::<SafeKeeperInfo>();
test_unpack::<HotStandbyFeedback>();
test_unpack::<SafeKeeperRequest>();
test_unpack::<SafeKeeperResponse>();
test_unpack::<NodeId>();
}
}