diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 65b09d0949..c110a85192 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -4,15 +4,13 @@ //! use anyhow::{bail, Result}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use bytes::{Buf, BufMut, BytesMut}; use fs2::FileExt; use log::*; use postgres::{Client, NoTls}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use std::cmp::{max, min}; use std::fs::{self, File, OpenOptions}; use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write}; -use std::mem; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::str; use std::sync::Arc; @@ -190,31 +188,20 @@ impl SharedState { } self.control_file = Some(file); - const SIZE: usize = mem::size_of::(); - let mut buf = [0u8; SIZE]; - if self - .control_file - .as_mut() - .unwrap() - .read_exact(&mut buf) - .is_ok() - { - let mut input = BytesMut::new(); - input.extend_from_slice(&buf); - let my_info = SafeKeeperInfo::unpack(&mut input); + let cfile_ref = self.control_file.as_mut().unwrap(); + let my_info = SafeKeeperInfo::des_from(cfile_ref)?; - if my_info.magic != SK_MAGIC { - bail!("Invalid control file magic: {}", my_info.magic); - } - if my_info.format_version != SK_FORMAT_VERSION { - bail!( - "Incompatible format version: {} vs. {}", - my_info.format_version, - SK_FORMAT_VERSION - ); - } - self.info = my_info; + if my_info.magic != SK_MAGIC { + bail!("Invalid control file magic: {}", my_info.magic); } + if my_info.format_version != SK_FORMAT_VERSION { + bail!( + "Incompatible format version: {} vs. {}", + my_info.format_version, + SK_FORMAT_VERSION + ); + } + self.info = my_info; } Err(e) => { panic!( @@ -227,12 +214,9 @@ impl SharedState { } pub fn save_control_file(&mut self, sync: bool) -> Result<()> { - let mut buf = BytesMut::new(); - self.info.pack(&mut buf); - let file = self.control_file.as_mut().unwrap(); file.seek(SeekFrom::Start(0))?; - file.write_all(&buf[..])?; + self.info.ser_into(file)?; if sync { file.sync_all()?; } @@ -249,30 +233,10 @@ pub struct Connection { pub stream_out: TcpStream, /// The cached result of socket.peer_addr() pub peer_addr: SocketAddr, - /// output buffer - outbuf: BytesMut, /// wal acceptor configuration pub conf: WalAcceptorConf, } -/// Serde adapter for BytesMut -/// -// It's not clear whether this will be needed in the long term. -// If so, it should probably move to `zenith_utils::bin_ser` -trait NewSerializer: Serialize + DeserializeOwned { - fn pack(&self, buf: &mut BytesMut) { - let mut buf_w = buf.writer(); - self.ser_into(&mut buf_w).unwrap(); - } - - fn unpack(buf: &mut BytesMut) -> Self { - let mut buf_r = buf.reader(); - Self::des_from(&mut buf_r).unwrap() - } -} - -impl NewSerializer for T where T: Serialize + DeserializeOwned {} - impl SafeKeeperInfo { fn new() -> SafeKeeperInfo { SafeKeeperInfo { @@ -342,7 +306,6 @@ impl Connection { stream_in: BufReader::new(socket.try_clone()?), stream_out: socket, peer_addr, - outbuf: BytesMut::with_capacity(10 * 1024), conf, }; Ok(conn) @@ -371,8 +334,8 @@ impl Connection { Ok(()) } - fn read_req(&mut self) -> Result { - // NewSerializer is always little-endian. + fn read_req(&mut self) -> Result { + // As the trait bound implies, this always encodes little-endian. Ok(T::des_from(&mut self.stream_in)?) } @@ -445,18 +408,14 @@ impl Connection { my_info.server.timeline = timeline; /* Report my identifier to proxy */ - self.start_sending(); - my_info.pack(&mut self.outbuf); - self.send()?; + my_info.ser_into(&mut self.stream_out)?; /* Wait for vote request */ let prop = self.read_req::()?; /* This is Paxos check which should ensure that only one master can perform commits */ if prop.node_id < my_info.server.node_id { /* Send my node-id to inform proxy that it's candidate was rejected */ - self.start_sending(); - my_info.server.node_id.pack(&mut self.outbuf); - self.send()?; + my_info.server.node_id.ser_into(&mut self.stream_out)?; bail!( "Reject connection attempt with term {} because my term is {}", prop.node_id.term, @@ -472,9 +431,7 @@ impl Connection { let wal_seg_size = server_info.wal_seg_size as usize; /* Acknowledge the proposed candidate by returning it to the proxy */ - self.start_sending(); - prop.node_id.pack(&mut self.outbuf); - self.send()?; + prop.node_id.ser_into(&mut self.stream_out)?; // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver, we should use callme mechanism @@ -555,9 +512,7 @@ impl Connection { flush_lsn: end_pos, hs_feedback: self.timeline.get().get_hs_feedback(), }; - self.start_sending(); - resp.pack(&mut self.outbuf); - self.send()?; + resp.ser_into(&mut self.stream_out)?; /* * Ping wal sender that new data is available. @@ -570,20 +525,6 @@ impl Connection { Ok(()) } - /// - /// Reset output buffer to start accumulating data of new message - /// - fn start_sending(&mut self) { - self.outbuf.clear(); - } - - /// - /// Send buffered messages - /// - fn send(&mut self) -> Result<()> { - Ok(self.stream_out.write_all(&self.outbuf)?) - } - fn write_wal_file( &self, startpos: Lsn,