break wal_service into multiple pieces

The pieces are:
base Connection
SendWal
ReplicationHandler

There are lots of other changes here:
- Put the replication reader in a background thread; this gets rid
  of some hacks with nonblocking mode.
- Stop manually buffering input data; use BufReader instead.
- Use BytesMut a lot less; use Read/Write traits where possible.
This commit is contained in:
Eric Seppanen
2021-05-12 19:59:34 -07:00
committed by Eric Seppanen
parent cedc2eb5c2
commit 513696a485
4 changed files with 565 additions and 468 deletions

View File

@@ -167,7 +167,10 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
.name("WAL acceptor thread".into())
.spawn(|| {
// thread code
wal_service::thread_main(conf);
let thread_result = wal_service::thread_main(conf);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
}
})
.unwrap();
threads.push(wal_acceptor_thread);

View File

@@ -1,7 +1,7 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use pageserver::ZTimelineId;
use std::io;
use std::io::{self, Read};
use std::str;
use std::str::FromStr;
@@ -10,7 +10,6 @@ pub type SystemId = u64;
#[derive(Debug)]
pub enum FeMessage {
StartupMessage(FeStartupMessage),
Query(FeQueryMessage),
Terminate,
CopyData(FeCopyData),
@@ -51,28 +50,22 @@ pub enum StartupRequestCode {
}
impl FeStartupMessage {
pub fn parse(buf: &mut BytesMut) -> io::Result<Option<FeMessage>> {
pub fn read_from(reader: &mut impl Read) -> io::Result<Self> {
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const CANCEL_REQUEST_CODE: u32 = (1234 << 16) | 5678;
const NEGOTIATE_SSL_CODE: u32 = (1234 << 16) | 5679;
const NEGOTIATE_GSS_CODE: u32 = (1234 << 16) | 5680;
if buf.len() < 4 {
return Ok(None);
}
let len = BigEndian::read_u32(&buf[0..4]) as usize;
let len = reader.read_u32::<BigEndian>()? as usize;
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid message length",
"FeStartupMessage: invalid message length",
));
}
if buf.len() < len {
return Ok(None);
}
let version = BigEndian::read_u32(&buf[4..8]);
let version = reader.read_u32::<BigEndian>()?;
let kind = match version {
CANCEL_REQUEST_CODE => StartupRequestCode::Cancel,
@@ -81,7 +74,11 @@ impl FeStartupMessage {
_ => StartupRequestCode::Normal,
};
let params_bytes = &buf[8..len];
// FIXME: A buffer pool would be nice, to avoid zeroing the buffer.
let params_len = len - 8;
let mut params_bytes = vec![0u8; params_len];
reader.read_exact(params_bytes.as_mut())?;
let params_str = str::from_utf8(&params_bytes).unwrap();
let params = params_str.split('\0');
let mut options = false;
@@ -109,13 +106,12 @@ impl FeStartupMessage {
));
}
buf.advance(len as usize);
Ok(Some(FeMessage::StartupMessage(FeStartupMessage {
Ok(FeStartupMessage {
version,
kind,
appname,
timelineid: timelineid.unwrap(),
})))
})
}
}
@@ -201,44 +197,28 @@ impl<'a> BeMessage<'a> {
}
impl FeMessage {
pub fn parse(buf: &mut BytesMut) -> io::Result<Option<FeMessage>> {
if buf.len() < 5 {
let to_read = 5 - buf.len();
buf.reserve(to_read);
return Ok(None);
}
let tag = buf[0];
let len = BigEndian::read_u32(&buf[1..5]);
pub fn read_from(reader: &mut impl Read) -> io::Result<FeMessage> {
let tag = reader.read_u8()?;
let len = reader.read_u32::<BigEndian>()?;
if len < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid message length: parsing u32",
"FeMessage: invalid message length",
));
}
let total_len = len as usize + 1;
if buf.len() < total_len {
let to_read = total_len - buf.len();
buf.reserve(to_read);
return Ok(None);
}
let mut body = buf.split_to(total_len);
body.advance(5);
let body_len = (len - 4) as usize;
let mut body = vec![0u8; body_len];
reader.read_exact(&mut body)?;
match tag {
b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage {
body: body.freeze(),
}))),
b'd' => Ok(Some(FeMessage::CopyData(FeCopyData {
body: body.freeze(),
}))),
b'X' => Ok(Some(FeMessage::Terminate)),
b'Q' => Ok(FeMessage::Query(FeQueryMessage { body: body.into() })),
b'd' => Ok(FeMessage::CopyData(FeCopyData { body: body.into() })),
b'X' => Ok(FeMessage::Terminate),
tag => Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown message tag: {},'{:?}'", tag, buf),
format!("unknown message tag: {},'{:?}'", tag, body),
)),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -93,9 +93,7 @@ pub trait BeSer: Serialize + DeserializeOwned {
}
/// Deserialize from a reader
///
/// tip: `&[u8]` implements `Read`
fn des_from<R: Read>(r: R) -> Result<Self, DeserializeError> {
fn des_from<R: Read>(r: &mut R) -> Result<Self, DeserializeError> {
be_coder().deserialize_from(r).or(Err(DeserializeError))
}
}
@@ -128,11 +126,56 @@ pub trait LeSer: Serialize + DeserializeOwned {
le_coder().deserialize(buf).or(Err(DeserializeError))
}
/// Deserialize from a reader
fn des_from<R: Read>(r: &mut R) -> Result<Self, DeserializeError> {
le_coder().deserialize_from(r).or(Err(DeserializeError))
}
}
/// Binary serialize/deserialize helper functions (Big Endian)
///
/// This version panics on every serialization/deserialization error.
/// That can be useful if you want to see a backtrace to find where the
/// error occurred.
pub trait LeSerPanic: Serialize + DeserializeOwned {
/// Serialize into a byte slice
fn ser_into_slice<W: Write>(&self, b: &mut [u8]) -> Result<(), SerializeError> {
// This is slightly awkward; we need a mutable reference to a mutable reference.
let mut w = b;
self.ser_into(&mut w)
}
/// Serialize into a borrowed writer
///
/// This is useful for most `Write` types except `&mut [u8]`, which
/// can more easily use [`ser_into_slice`](Self::ser_into_slice).
fn ser_into<W: Write>(&self, w: &mut W) -> Result<(), SerializeError> {
le_coder()
.serialize_into(w, &self)
.or_else(|e| panic!("ser_into failed: {}", e))
}
/// Serialize into a new heap-allocated buffer
fn ser(&self) -> Result<Vec<u8>, SerializeError> {
le_coder()
.serialize(&self)
.or_else(|e| panic!("ser failed: {}", e))
}
/// Deserialize from a byte slice
fn des(buf: &[u8]) -> Result<Self, DeserializeError> {
le_coder()
.deserialize(buf)
.or_else(|e| panic!("des failed: {}", e))
}
/// Deserialize from a reader
///
/// tip: `&[u8]` implements `Read`
fn des_from<R: Read>(r: R) -> Result<Self, DeserializeError> {
le_coder().deserialize_from(r).or(Err(DeserializeError))
fn des_from<R: Read>(r: &mut R) -> Result<Self, DeserializeError> {
le_coder()
.deserialize_from(r)
.or_else(|e| panic!("des_from failed: {}", e))
}
}
@@ -140,6 +183,8 @@ impl<T> BeSer for T where T: Serialize + DeserializeOwned {}
impl<T> LeSer for T where T: Serialize + DeserializeOwned {}
impl<T> LeSerPanic for T where T: Serialize + DeserializeOwned {}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
@@ -205,13 +250,13 @@ mod tests {
assert_eq!(buf.into_inner(), SHORT1_ENC_BE_TRAILING);
// deserialize from a `Write` sink.
let buf = Cursor::new(SHORT2_ENC_BE);
let decoded = ShortStruct::des_from(buf).unwrap();
let mut buf = Cursor::new(SHORT2_ENC_BE);
let decoded = ShortStruct::des_from(&mut buf).unwrap();
assert_eq!(decoded, SHORT2);
// deserialize from a `Write` sink that terminates early.
let buf = Cursor::new([0u8; 4]);
ShortStruct::des_from(buf).unwrap_err();
let mut buf = Cursor::new([0u8; 4]);
ShortStruct::des_from(&mut buf).unwrap_err();
}
#[test]
@@ -234,13 +279,13 @@ mod tests {
assert_eq!(buf.into_inner(), SHORT1_ENC_LE_TRAILING);
// deserialize from a `Write` sink.
let buf = Cursor::new(SHORT2_ENC_LE);
let decoded = ShortStruct::des_from(buf).unwrap();
let mut buf = Cursor::new(SHORT2_ENC_LE);
let decoded = ShortStruct::des_from(&mut buf).unwrap();
assert_eq!(decoded, SHORT2);
// deserialize from a `Write` sink that terminates early.
let buf = Cursor::new([0u8; 4]);
ShortStruct::des_from(buf).unwrap_err();
let mut buf = Cursor::new([0u8; 4]);
ShortStruct::des_from(&mut buf).unwrap_err();
}
#[test]