Use poll() in communication with WAL redo process.

The tokio futures added some overhead, so switch to plain non-blocking
I/O with poll(). In a simple pgbench test on my laptop (select-only
queries, scale-factor 1 `pgbench -P1 -T50 -S`), this gives about 10%
improvement, from about 4300 TPS to 4800 TPS.
This commit is contained in:
Heikki Linnakangas
2021-11-04 10:39:04 +02:00
parent 3a0111c75e
commit b38e841f2d
5 changed files with 136 additions and 74 deletions

1
Cargo.lock generated
View File

@@ -1195,6 +1195,7 @@ dependencies = [
"hyper",
"lazy_static",
"log",
"nix",
"postgres",
"postgres-protocol",
"postgres-types",

View File

@@ -38,6 +38,7 @@ const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
url = "2"
nix = "0.23"
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -22,23 +22,23 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use lazy_static::lazy_static;
use log::*;
use nix::poll::*;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::Error;
use std::io::{Error, ErrorKind};
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{ChildStdin, ChildStdout, Command};
use tokio::time::timeout;
use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::nonblock::set_nonblock;
use zenith_utils::zid::ZTenantId;
use crate::relish::*;
@@ -139,7 +139,6 @@ pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
}
@@ -215,20 +214,23 @@ impl WalRedoManager for PostgresRedoManager {
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = self
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
result = self
.runtime
.block_on(self.handle_apply_request_postgres(process, &request));
result = self.handle_apply_request_postgres(process, &request);
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64());
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if result.is_err() {
let process = process_guard.take().unwrap();
process.kill();
}
}
result
@@ -240,17 +242,8 @@ impl PostgresRedoManager {
/// Create a new PostgresRedoManager.
///
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
// The actual process is launched lazily, on first request.
PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
@@ -260,7 +253,7 @@ impl PostgresRedoManager {
///
/// Process one request for WAL redo using wal-redo postgres
///
async fn handle_apply_request_postgres(
fn handle_apply_request_postgres(
&self,
process: &mut PostgresRedoProcess,
request: &WalRedoRequest,
@@ -278,7 +271,7 @@ impl PostgresRedoManager {
if let RelishTag::Relation(rel) = request.rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records).await;
apply_result = process.apply_wal_records(buf_tag, base_img, records);
let duration = start.elapsed();
@@ -469,18 +462,17 @@ impl PostgresRedoManager {
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess {
child: Child,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(
conf: &PageServerConf,
tenantid: &ZTenantId,
) -> Result<PostgresRedoProcess, Error> {
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
@@ -501,7 +493,6 @@ impl PostgresRedoProcess {
.env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap())
.output()
.await
.expect("failed to execute initdb");
if !initdb.status.success() {
@@ -538,47 +529,40 @@ impl PostgresRedoProcess {
datadir.display()
);
let stdin = child.stdin.take().expect("failed to open child's stdin");
let stderr = child.stderr.take().expect("failed to open child's stderr");
let stdout = child.stdout.take().expect("failed to open child's stdout");
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
// This async block reads the child's stderr, and forwards it to the logger
let f_stderr = async {
let mut stderr_buffered = tokio::io::BufReader::new(stderr);
set_nonblock(stdin.as_raw_fd())?;
set_nonblock(stdout.as_raw_fd())?;
set_nonblock(stderr.as_raw_fd())?;
let mut line = String::new();
loop {
let res = stderr_buffered.read_line(&mut line).await;
if res.is_err() {
debug!("could not convert line to utf-8");
continue;
}
if res.unwrap() == 0 {
break;
}
error!("wal-redo-postgres: {}", line.trim());
line.clear();
}
Ok::<(), Error>(())
};
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess {
child,
stdin,
stdout,
stderr,
})
}
Ok(PostgresRedoProcess { stdin, stdout })
fn kill(mut self) {
let _ = self.child.kill();
if let Ok(exit_status) = self.child.wait() {
error!("wal-redo-postgres exited with code {}", exit_status);
}
drop(self);
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
async fn apply_wal_records(
fn apply_wal_records(
&mut self,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, WALRecord)],
) -> Result<Bytes, std::io::Error> {
let stdout = &mut self.stdout;
let stdin = &mut self.stdin;
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -595,33 +579,89 @@ impl PostgresRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// information that the child writes to its stderr to the page server's log.
//
// 'f_stdin' handles writing the base image and WAL records to the child process.
// 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the
// tokio runtime in the 'launch' function already, forwards the logging.
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(TIMEOUT, stdin.write_all(&writebuf)).await??;
Ok::<(), Error>(())
};
while nresult < pg_constants::BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
Ok::<[u8; 8192], Error>(buf)
};
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
let res = tokio::try_join!(f_stdout, f_stdin)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
);
let buf = res.0;
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
Ok(Bytes::from(resultbuf))
}
}

View File

@@ -43,3 +43,6 @@ pub mod accum;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;
// Utility for putting a raw file descriptor into non-blocking mode
pub mod nonblock;

View File

@@ -0,0 +1,17 @@
use nix::fcntl::{fcntl, OFlag, F_GETFL, F_SETFL};
use std::os::unix::io::RawFd;
/// Put a file descriptor into non-blocking mode
pub fn set_nonblock(fd: RawFd) -> Result<(), std::io::Error> {
let bits = fcntl(fd, F_GETFL)?;
// Safety: If F_GETFL returns some unknown bits, they should be valid
// for passing back to F_SETFL, too. If we left them out, the F_SETFL
// would effectively clear them, which is not what we want.
let mut flags = unsafe { OFlag::from_bits_unchecked(bits) };
flags |= OFlag::O_NONBLOCK;
fcntl(fd, F_SETFL(flags))?;
Ok(())
}