wal_service: add BufReader

If we try to read a few bytes at a time, we will perform a lot more
syscalls than necessary. Wrap the socket in a BufReader, which will
buffer bytes as needed.
This commit is contained in:
Eric Seppanen
2021-05-11 15:59:58 -07:00
committed by Eric Seppanen
parent e3e593f571
commit cedc2eb5c2

View File

@@ -14,9 +14,9 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::mem;
use std::net::{TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::str;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
@@ -185,8 +185,12 @@ pub struct Timeline {
#[derive(Debug)]
struct Connection {
timeline: Option<Arc<Timeline>>,
/// Postgres connection
stream: TcpStream,
/// Postgres connection, buffered input
stream_in: BufReader<TcpStream>,
/// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain.
stream_out: TcpStream,
/// The cached result of socket.peer_addr()
peer_addr: SocketAddr,
/// input buffer
inbuf: BytesMut,
/// output buffer
@@ -263,17 +267,28 @@ pub fn thread_main(conf: WalAcceptorConf) {
main_loop(&conf).unwrap();
}
/// This is run by main_loop, inside a background thread.
///
/// This is only a separate function to make a convenient place to collect
/// all errors for logging. Our caller can log errors in a single place.
fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> {
socket.set_nodelay(true)?;
let mut conn = Connection::new(socket, conf)?;
conn.run()?;
Ok(())
}
/// Accept incoming TCP connections and spawn them into a background thread.
fn main_loop(conf: &WalAcceptorConf) -> Result<()> {
let listener = TcpListener::bind(conf.listen_addr)?;
loop {
match listener.accept() {
Ok((socket, peer_addr)) => {
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true)?;
let mut conn = Connection::new(socket, &conf);
let conf = conf.clone();
thread::spawn(move || {
if let Err(err) = conn.run() {
error!("error: {}", err);
if let Err(err) = handle_socket(socket, conf) {
error!("socket error: {}", err);
}
});
}
@@ -425,16 +440,20 @@ impl Timeline {
}
impl Connection {
pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection {
Connection {
pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result<Connection> {
let peer_addr = socket.peer_addr()?;
let conn = Connection {
timeline: None,
stream: socket,
stream_in: BufReader::new(socket.try_clone()?),
stream_out: socket,
peer_addr,
inbuf: BytesMut::with_capacity(10 * 1024),
outbuf: BytesMut::with_capacity(10 * 1024),
init_done: false,
appname: None,
conf: conf.clone(),
}
conf,
};
Ok(conn)
}
fn timeline(&self) -> Arc<Timeline> {
@@ -443,7 +462,7 @@ impl Connection {
fn run(&mut self) -> Result<()> {
self.inbuf.resize(4, 0u8);
self.stream.read_exact(&mut self.inbuf[0..4])?;
self.stream_in.read_exact(&mut self.inbuf[0..4])?;
let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]);
if startup_pkg_len == 0 {
self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor
@@ -456,7 +475,7 @@ impl Connection {
fn read_req<T: NewSerializer>(&mut self) -> Result<T> {
let size = mem::size_of::<T>();
self.inbuf.resize(size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..size])?;
self.stream_in.read_exact(&mut self.inbuf[0..size])?;
Ok(T::unpack(&mut self.inbuf))
}
@@ -503,9 +522,7 @@ impl Connection {
let server_info = self.read_req::<ServerInfo>()?;
info!(
"Start handshake with wal_proposer {} sysid {} timeline {}",
self.stream.peer_addr()?,
server_info.system_id,
server_info.timeline_id,
self.peer_addr, server_info.system_id, server_info.timeline_id,
);
// FIXME: also check that the system identifier matches
self.set_timeline(server_info.timeline_id)?;
@@ -582,8 +599,7 @@ impl Connection {
info!(
"Start streaming from timeline {} address {:?}",
server_info.timeline_id,
self.stream.peer_addr()?
server_info.timeline_id, self.peer_addr,
);
// Main loop
@@ -611,7 +627,7 @@ impl Connection {
/* Receive message body */
self.inbuf.resize(rec_size, 0u8);
self.stream.read_exact(&mut self.inbuf[0..rec_size])?;
self.stream_in.read_exact(&mut self.inbuf[0..rec_size])?;
/* Save message in file */
self.write_wal_file(start_pos, timeline, wal_seg_size, &self.inbuf[0..rec_size])?;
@@ -675,7 +691,7 @@ impl Connection {
return Ok(Some(message));
}
if read_into(&mut self.stream, &mut self.inbuf)? == 0 {
if read_into(&mut self.stream_in, &mut self.inbuf)? == 0 {
if self.inbuf.is_empty() {
return Ok(None);
} else {
@@ -708,14 +724,14 @@ impl Connection {
/// Send buffered messages
///
fn send(&mut self) -> Result<()> {
Ok(self.stream.write_all(&self.outbuf)?)
Ok(self.stream_out.write_all(&self.outbuf)?)
}
///
/// Send WAL to replica or WAL receiver using standard libpq replication protocol
///
fn send_wal(&mut self) -> Result<()> {
info!("WAL sender to {:?} is started", self.stream.peer_addr()?);
info!("WAL sender to {:?} is started", self.peer_addr);
loop {
self.start_sending();
match self.read_message()? {
@@ -756,7 +772,7 @@ impl Connection {
}
}
}
info!("WAL sender to {:?} is finished", self.stream.peer_addr()?);
info!("WAL sender to {:?} is finished", self.peer_addr);
Ok(())
}
@@ -878,9 +894,10 @@ impl Connection {
// Temporarily set this stream into nonblocking mode.
// FIXME: This seems like a dirty hack.
// Should this task be done on a background thread?
self.stream.set_nonblocking(true).unwrap();
let read_result = self.stream.read(&mut self.inbuf);
self.stream.set_nonblocking(false).unwrap();
// FIXME: set_nonblocking plus BufReader seems questionable.
self.stream_in.get_ref().set_nonblocking(true).unwrap();
let read_result = self.stream_in.read(&mut self.inbuf);
self.stream_in.get_ref().set_nonblocking(false).unwrap();
match read_result {
Ok(0) => break,
@@ -950,7 +967,7 @@ impl Connection {
BigEndian::write_u64(&mut self.outbuf[14..22], end_pos.0);
BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp());
self.stream.write_all(&self.outbuf[0..msg_size])?;
self.stream_out.write_all(&self.outbuf[0..msg_size])?;
start_pos += send_size as u64;
debug!("Sent WAL to page server up to {}", end_pos);