wal_service: remove manual output buffering

Serialize objects directly to the stream. This allows us to remove a
bunch of buffer management code, along with the NewSerializer trait that
was a temporary bridge between the old code and the new.
This commit is contained in:
Eric Seppanen
2021-05-13 13:40:21 -07:00
committed by Eric Seppanen
parent 78dcf2207e
commit 2148ae78ab

View File

@@ -4,15 +4,13 @@
//!
use anyhow::{bail, Result};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::{Buf, BufMut, BytesMut};
use fs2::FileExt;
use log::*;
use postgres::{Client, NoTls};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::cmp::{max, min};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::str;
use std::sync::Arc;
@@ -190,31 +188,20 @@ impl SharedState {
}
self.control_file = Some(file);
const SIZE: usize = mem::size_of::<SafeKeeperInfo>();
let mut buf = [0u8; SIZE];
if self
.control_file
.as_mut()
.unwrap()
.read_exact(&mut buf)
.is_ok()
{
let mut input = BytesMut::new();
input.extend_from_slice(&buf);
let my_info = SafeKeeperInfo::unpack(&mut input);
let cfile_ref = self.control_file.as_mut().unwrap();
let my_info = SafeKeeperInfo::des_from(cfile_ref)?;
if my_info.magic != SK_MAGIC {
bail!("Invalid control file magic: {}", my_info.magic);
}
if my_info.format_version != SK_FORMAT_VERSION {
bail!(
"Incompatible format version: {} vs. {}",
my_info.format_version,
SK_FORMAT_VERSION
);
}
self.info = my_info;
if my_info.magic != SK_MAGIC {
bail!("Invalid control file magic: {}", my_info.magic);
}
if my_info.format_version != SK_FORMAT_VERSION {
bail!(
"Incompatible format version: {} vs. {}",
my_info.format_version,
SK_FORMAT_VERSION
);
}
self.info = my_info;
}
Err(e) => {
panic!(
@@ -227,12 +214,9 @@ impl SharedState {
}
pub fn save_control_file(&mut self, sync: bool) -> Result<()> {
let mut buf = BytesMut::new();
self.info.pack(&mut buf);
let file = self.control_file.as_mut().unwrap();
file.seek(SeekFrom::Start(0))?;
file.write_all(&buf[..])?;
self.info.ser_into(file)?;
if sync {
file.sync_all()?;
}
@@ -249,30 +233,10 @@ pub struct Connection {
pub stream_out: TcpStream,
/// The cached result of socket.peer_addr()
pub peer_addr: SocketAddr,
/// output buffer
outbuf: BytesMut,
/// wal acceptor configuration
pub conf: WalAcceptorConf,
}
/// Serde adapter for BytesMut
///
// It's not clear whether this will be needed in the long term.
// If so, it should probably move to `zenith_utils::bin_ser`
trait NewSerializer: Serialize + DeserializeOwned {
fn pack(&self, buf: &mut BytesMut) {
let mut buf_w = buf.writer();
self.ser_into(&mut buf_w).unwrap();
}
fn unpack(buf: &mut BytesMut) -> Self {
let mut buf_r = buf.reader();
Self::des_from(&mut buf_r).unwrap()
}
}
impl<T> NewSerializer for T where T: Serialize + DeserializeOwned {}
impl SafeKeeperInfo {
fn new() -> SafeKeeperInfo {
SafeKeeperInfo {
@@ -342,7 +306,6 @@ impl Connection {
stream_in: BufReader::new(socket.try_clone()?),
stream_out: socket,
peer_addr,
outbuf: BytesMut::with_capacity(10 * 1024),
conf,
};
Ok(conn)
@@ -371,8 +334,8 @@ impl Connection {
Ok(())
}
fn read_req<T: NewSerializer>(&mut self) -> Result<T> {
// NewSerializer is always little-endian.
fn read_req<T: LeSer>(&mut self) -> Result<T> {
// As the trait bound implies, this always encodes little-endian.
Ok(T::des_from(&mut self.stream_in)?)
}
@@ -445,18 +408,14 @@ impl Connection {
my_info.server.timeline = timeline;
/* Report my identifier to proxy */
self.start_sending();
my_info.pack(&mut self.outbuf);
self.send()?;
my_info.ser_into(&mut self.stream_out)?;
/* Wait for vote request */
let prop = self.read_req::<RequestVote>()?;
/* This is Paxos check which should ensure that only one master can perform commits */
if prop.node_id < my_info.server.node_id {
/* Send my node-id to inform proxy that it's candidate was rejected */
self.start_sending();
my_info.server.node_id.pack(&mut self.outbuf);
self.send()?;
my_info.server.node_id.ser_into(&mut self.stream_out)?;
bail!(
"Reject connection attempt with term {} because my term is {}",
prop.node_id.term,
@@ -472,9 +431,7 @@ impl Connection {
let wal_seg_size = server_info.wal_seg_size as usize;
/* Acknowledge the proposed candidate by returning it to the proxy */
self.start_sending();
prop.node_id.pack(&mut self.outbuf);
self.send()?;
prop.node_id.ser_into(&mut self.stream_out)?;
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
@@ -555,9 +512,7 @@ impl Connection {
flush_lsn: end_pos,
hs_feedback: self.timeline.get().get_hs_feedback(),
};
self.start_sending();
resp.pack(&mut self.outbuf);
self.send()?;
resp.ser_into(&mut self.stream_out)?;
/*
* Ping wal sender that new data is available.
@@ -570,20 +525,6 @@ impl Connection {
Ok(())
}
///
/// Reset output buffer to start accumulating data of new message
///
fn start_sending(&mut self) {
self.outbuf.clear();
}
///
/// Send buffered messages
///
fn send(&mut self) -> Result<()> {
Ok(self.stream_out.write_all(&self.outbuf)?)
}
fn write_wal_file(
&self,
startpos: Lsn,