From cedc2eb5c240696384f93167c304677a5070e9e9 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Tue, 11 May 2021 15:59:58 -0700 Subject: [PATCH] wal_service: add BufReader If we try to read a few bytes at a time, we will perform a lot more syscalls than necessary. Wrap the socket in a BufReader, which will buffer bytes as needed. --- walkeeper/src/wal_service.rs | 75 ++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index ec97deded4..f4c64d9ef9 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -14,9 +14,9 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; use std::mem; -use std::net::{TcpListener, TcpStream}; +use std::net::{SocketAddr, TcpListener, TcpStream}; use std::str; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -185,8 +185,12 @@ pub struct Timeline { #[derive(Debug)] struct Connection { timeline: Option>, - /// Postgres connection - stream: TcpStream, + /// Postgres connection, buffered input + stream_in: BufReader, + /// Postgres connection, output FIXME: To buffer, or not to buffer? flush() is a pain. + stream_out: TcpStream, + /// The cached result of socket.peer_addr() + peer_addr: SocketAddr, /// input buffer inbuf: BytesMut, /// output buffer @@ -263,17 +267,28 @@ pub fn thread_main(conf: WalAcceptorConf) { main_loop(&conf).unwrap(); } +/// This is run by main_loop, inside a background thread. +/// +/// This is only a separate function to make a convenient place to collect +/// all errors for logging. Our caller can log errors in a single place. +fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { + socket.set_nodelay(true)?; + let mut conn = Connection::new(socket, conf)?; + conn.run()?; + Ok(()) +} + +/// Accept incoming TCP connections and spawn them into a background thread. fn main_loop(conf: &WalAcceptorConf) -> Result<()> { let listener = TcpListener::bind(conf.listen_addr)?; loop { match listener.accept() { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); - socket.set_nodelay(true)?; - let mut conn = Connection::new(socket, &conf); + let conf = conf.clone(); thread::spawn(move || { - if let Err(err) = conn.run() { - error!("error: {}", err); + if let Err(err) = handle_socket(socket, conf) { + error!("socket error: {}", err); } }); } @@ -425,16 +440,20 @@ impl Timeline { } impl Connection { - pub fn new(socket: TcpStream, conf: &WalAcceptorConf) -> Connection { - Connection { + pub fn new(socket: TcpStream, conf: WalAcceptorConf) -> Result { + let peer_addr = socket.peer_addr()?; + let conn = Connection { timeline: None, - stream: socket, + stream_in: BufReader::new(socket.try_clone()?), + stream_out: socket, + peer_addr, inbuf: BytesMut::with_capacity(10 * 1024), outbuf: BytesMut::with_capacity(10 * 1024), init_done: false, appname: None, - conf: conf.clone(), - } + conf, + }; + Ok(conn) } fn timeline(&self) -> Arc { @@ -443,7 +462,7 @@ impl Connection { fn run(&mut self) -> Result<()> { self.inbuf.resize(4, 0u8); - self.stream.read_exact(&mut self.inbuf[0..4])?; + self.stream_in.read_exact(&mut self.inbuf[0..4])?; let startup_pkg_len = BigEndian::read_u32(&self.inbuf[0..4]); if startup_pkg_len == 0 { self.receive_wal()?; // internal protocol between wal_proposer and wal_acceptor @@ -456,7 +475,7 @@ impl Connection { fn read_req(&mut self) -> Result { let size = mem::size_of::(); self.inbuf.resize(size, 0u8); - self.stream.read_exact(&mut self.inbuf[0..size])?; + self.stream_in.read_exact(&mut self.inbuf[0..size])?; Ok(T::unpack(&mut self.inbuf)) } @@ -503,9 +522,7 @@ impl Connection { let server_info = self.read_req::()?; info!( "Start handshake with wal_proposer {} sysid {} timeline {}", - self.stream.peer_addr()?, - server_info.system_id, - server_info.timeline_id, + self.peer_addr, server_info.system_id, server_info.timeline_id, ); // FIXME: also check that the system identifier matches self.set_timeline(server_info.timeline_id)?; @@ -582,8 +599,7 @@ impl Connection { info!( "Start streaming from timeline {} address {:?}", - server_info.timeline_id, - self.stream.peer_addr()? + server_info.timeline_id, self.peer_addr, ); // Main loop @@ -611,7 +627,7 @@ impl Connection { /* Receive message body */ self.inbuf.resize(rec_size, 0u8); - self.stream.read_exact(&mut self.inbuf[0..rec_size])?; + self.stream_in.read_exact(&mut self.inbuf[0..rec_size])?; /* Save message in file */ self.write_wal_file(start_pos, timeline, wal_seg_size, &self.inbuf[0..rec_size])?; @@ -675,7 +691,7 @@ impl Connection { return Ok(Some(message)); } - if read_into(&mut self.stream, &mut self.inbuf)? == 0 { + if read_into(&mut self.stream_in, &mut self.inbuf)? == 0 { if self.inbuf.is_empty() { return Ok(None); } else { @@ -708,14 +724,14 @@ impl Connection { /// Send buffered messages /// fn send(&mut self) -> Result<()> { - Ok(self.stream.write_all(&self.outbuf)?) + Ok(self.stream_out.write_all(&self.outbuf)?) } /// /// Send WAL to replica or WAL receiver using standard libpq replication protocol /// fn send_wal(&mut self) -> Result<()> { - info!("WAL sender to {:?} is started", self.stream.peer_addr()?); + info!("WAL sender to {:?} is started", self.peer_addr); loop { self.start_sending(); match self.read_message()? { @@ -756,7 +772,7 @@ impl Connection { } } } - info!("WAL sender to {:?} is finished", self.stream.peer_addr()?); + info!("WAL sender to {:?} is finished", self.peer_addr); Ok(()) } @@ -878,9 +894,10 @@ impl Connection { // Temporarily set this stream into nonblocking mode. // FIXME: This seems like a dirty hack. // Should this task be done on a background thread? - self.stream.set_nonblocking(true).unwrap(); - let read_result = self.stream.read(&mut self.inbuf); - self.stream.set_nonblocking(false).unwrap(); + // FIXME: set_nonblocking plus BufReader seems questionable. + self.stream_in.get_ref().set_nonblocking(true).unwrap(); + let read_result = self.stream_in.read(&mut self.inbuf); + self.stream_in.get_ref().set_nonblocking(false).unwrap(); match read_result { Ok(0) => break, @@ -950,7 +967,7 @@ impl Connection { BigEndian::write_u64(&mut self.outbuf[14..22], end_pos.0); BigEndian::write_u64(&mut self.outbuf[22..30], get_current_timestamp()); - self.stream.write_all(&self.outbuf[0..msg_size])?; + self.stream_out.write_all(&self.outbuf[0..msg_size])?; start_pos += send_size as u64; debug!("Sent WAL to page server up to {}", end_pos);