diff --git a/Cargo.lock b/Cargo.lock index 9233976f47..0688947f1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1195,6 +1195,7 @@ dependencies = [ "hyper", "lazy_static", "log", + "nix", "postgres", "postgres-protocol", "postgres-types", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 100e6551c2..f0f1230dfa 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,6 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } url = "2" +nix = "0.23" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 50a06f069c..dc0968250b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,23 +22,23 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; +use nix::poll::*; use serde::Serialize; use std::fs; use std::fs::OpenOptions; use std::io::prelude::*; -use std::io::Error; +use std::io::{Error, ErrorKind}; +use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::process::Stdio; +use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; use std::sync::Mutex; use std::time::Duration; use std::time::Instant; -use tokio::io::AsyncBufReadExt; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::process::{ChildStdin, ChildStdout, Command}; -use tokio::time::timeout; use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use zenith_utils::nonblock::set_nonblock; use zenith_utils::zid::ZTenantId; use crate::relish::*; @@ -139,7 +139,6 @@ pub struct PostgresRedoManager { tenantid: ZTenantId, conf: &'static PageServerConf, - runtime: tokio::runtime::Runtime, process: Mutex>, } @@ -215,20 +214,23 @@ impl WalRedoManager for PostgresRedoManager { // launch the WAL redo process on first use if process_guard.is_none() { - let p = self - .runtime - .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?; + let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?; *process_guard = Some(p); } let process = process_guard.as_mut().unwrap(); - result = self - .runtime - .block_on(self.handle_apply_request_postgres(process, &request)); + result = self.handle_apply_request_postgres(process, &request); WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); end_time = Instant::now(); WAL_REDO_TIME.observe(end_time.duration_since(lock_time).as_secs_f64()); + + // If something went wrong, don't try to reuse the process. Kill it, and + // next request will launch a new one. + if result.is_err() { + let process = process_guard.take().unwrap(); + process.kill(); + } } result @@ -240,17 +242,8 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { - // We block on waiting for requests on the walredo request channel, but - // use async I/O to communicate with the child process. Initialize the - // runtime for the async part. - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - // The actual process is launched lazily, on first request. PostgresRedoManager { - runtime, tenantid, conf, process: Mutex::new(None), @@ -260,7 +253,7 @@ impl PostgresRedoManager { /// /// Process one request for WAL redo using wal-redo postgres /// - async fn handle_apply_request_postgres( + fn handle_apply_request_postgres( &self, process: &mut PostgresRedoProcess, request: &WalRedoRequest, @@ -278,7 +271,7 @@ impl PostgresRedoManager { if let RelishTag::Relation(rel) = request.rel { // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; - apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + apply_result = process.apply_wal_records(buf_tag, base_img, records); let duration = start.elapsed(); @@ -469,18 +462,17 @@ impl PostgresRedoManager { /// Handle to the Postgres WAL redo process /// struct PostgresRedoProcess { + child: Child, stdin: ChildStdin, stdout: ChildStdout, + stderr: ChildStderr, } impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch( - conf: &PageServerConf, - tenantid: &ZTenantId, - ) -> Result { + fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. @@ -501,7 +493,6 @@ impl PostgresRedoProcess { .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .output() - .await .expect("failed to execute initdb"); if !initdb.status.success() { @@ -538,47 +529,40 @@ impl PostgresRedoProcess { datadir.display() ); - let stdin = child.stdin.take().expect("failed to open child's stdin"); - let stderr = child.stderr.take().expect("failed to open child's stderr"); - let stdout = child.stdout.take().expect("failed to open child's stdout"); + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); - // This async block reads the child's stderr, and forwards it to the logger - let f_stderr = async { - let mut stderr_buffered = tokio::io::BufReader::new(stderr); + set_nonblock(stdin.as_raw_fd())?; + set_nonblock(stdout.as_raw_fd())?; + set_nonblock(stderr.as_raw_fd())?; - let mut line = String::new(); - loop { - let res = stderr_buffered.read_line(&mut line).await; - if res.is_err() { - debug!("could not convert line to utf-8"); - continue; - } - if res.unwrap() == 0 { - break; - } - error!("wal-redo-postgres: {}", line.trim()); - line.clear(); - } - Ok::<(), Error>(()) - }; - tokio::spawn(f_stderr); + Ok(PostgresRedoProcess { + child, + stdin, + stdout, + stderr, + }) + } - Ok(PostgresRedoProcess { stdin, stdout }) + fn kill(mut self) { + let _ = self.child.kill(); + if let Ok(exit_status) = self.child.wait() { + error!("wal-redo-postgres exited with code {}", exit_status); + } + drop(self); } // // Apply given WAL records ('records') over an old page image. Returns // new page image. // - async fn apply_wal_records( + fn apply_wal_records( &mut self, tag: BufferTag, base_img: Option, records: &[(Lsn, WALRecord)], ) -> Result { - let stdout = &mut self.stdout; - let stdin = &mut self.stdin; - // Serialize all the messages to send the WAL redo process first. // // This could be problematic if there are millions of records to replay, @@ -595,33 +579,89 @@ impl PostgresRedoProcess { build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); + // The input is now in 'writebuf'. Do a blind write first, writing as much as + // we can, before calling poll(). That skips one call to poll() if the stdin is + // already available for writing, which it almost certainly is because the + // process is idle. + let mut nwrite = self.stdin.write(&writebuf)?; + + // We expect the WAL redo process to respond with an 8k page image. We read it + // into this buffer. + let mut resultbuf = vec![0; pg_constants::BLCKSZ.into()]; + let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far + + // Prepare for calling poll() + let mut pollfds = [ + PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT), + ]; + // We do three things simultaneously: send the old base image and WAL records to // the child process's stdin, read the result from child's stdout, and forward any logging // information that the child writes to its stderr to the page server's log. - // - // 'f_stdin' handles writing the base image and WAL records to the child process. - // 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the - // tokio runtime in the 'launch' function already, forwards the logging. - let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - timeout(TIMEOUT, stdin.write_all(&writebuf)).await??; - Ok::<(), Error>(()) - }; + while nresult < pg_constants::BLCKSZ.into() { + // If we have more data to write, wake up if 'stdin' becomes writeable or + // we have data to read. Otherwise only wake up if there's data to read. + let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; + let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?; - // Read back new page image - let f_stdout = async { - let mut buf = [0u8; 8192]; + if n == 0 { + return Err(Error::new(ErrorKind::Other, "WAL redo timed out")); + } - timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - Ok::<[u8; 8192], Error>(buf) - }; + // If we have some messages in stderr, forward them to the log. + let err_revents = pollfds[1].revents().unwrap(); + if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + let mut errbuf: [u8; 16384] = [0; 16384]; + let n = self.stderr.read(&mut errbuf)?; - let res = tokio::try_join!(f_stdout, f_stdin)?; + // The message might not be split correctly into lines here. But this is + // good enough, the important thing is to get the message to the log. + if n > 0 { + error!( + "wal-redo-postgres: {}", + String::from_utf8_lossy(&errbuf[0..n]) + ); - let buf = res.0; + // To make sure we capture all log from the process if it fails, keep + // reading from the stderr, before checking the stdout. + continue; + } + } else if err_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stderr unexpectedly", + )); + } - Ok::(Bytes::from(std::vec::Vec::from(buf))) + // If we have more data to write and 'stdin' is writeable, do write. + if nwrite < writebuf.len() { + let in_revents = pollfds[2].revents().unwrap(); + if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { + nwrite += self.stdin.write(&writebuf[nwrite..])?; + } else if in_revents.contains(PollFlags::POLLHUP) { + // We still have more data to write, but the process closed the pipe. + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdin unexpectedly", + )); + } + } + + // If we have some data in stdout, read it to the result buffer. + let out_revents = pollfds[0].revents().unwrap(); + if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + nresult += self.stdout.read(&mut resultbuf[nresult..])?; + } else if out_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdout unexpectedly", + )); + } + } + + Ok(Bytes::from(resultbuf)) } } diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 912d8308e5..9520d01f53 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -43,3 +43,6 @@ pub mod accum; // Utility for binding TcpListeners with proper socket options. pub mod tcp_listener; + +// Utility for putting a raw file descriptor into non-blocking mode +pub mod nonblock; diff --git a/zenith_utils/src/nonblock.rs b/zenith_utils/src/nonblock.rs new file mode 100644 index 0000000000..8b1fd71ae6 --- /dev/null +++ b/zenith_utils/src/nonblock.rs @@ -0,0 +1,17 @@ +use nix::fcntl::{fcntl, OFlag, F_GETFL, F_SETFL}; +use std::os::unix::io::RawFd; + +/// Put a file descriptor into non-blocking mode +pub fn set_nonblock(fd: RawFd) -> Result<(), std::io::Error> { + let bits = fcntl(fd, F_GETFL)?; + + // Safety: If F_GETFL returns some unknown bits, they should be valid + // for passing back to F_SETFL, too. If we left them out, the F_SETFL + // would effectively clear them, which is not what we want. + let mut flags = unsafe { OFlag::from_bits_unchecked(bits) }; + flags |= OFlag::O_NONBLOCK; + + fcntl(fd, F_SETFL(flags))?; + + Ok(()) +}