diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index cfb8052cf1..178a645967 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -21,7 +21,6 @@ use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; -use nix::poll::*; use pageserver_api::shard::TenantShardId; use serde::Serialize; use std::collections::VecDeque; @@ -31,10 +30,11 @@ use std::ops::{Deref, DerefMut}; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::CommandExt; use std::process::Stdio; -use std::process::{Child, ChildStdin, ChildStdout, Command}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use std::process::{Child, Command}; +use std::sync::{Arc, RwLock}; use std::time::Duration; use std::time::Instant; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::*; use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock}; @@ -73,12 +73,12 @@ pub(crate) struct BufferTag { } struct ProcessInput { - stdin: ChildStdin, + stdin: tokio::process::ChildStdin, n_requests: usize, } struct ProcessOutput { - stdout: ChildStdout, + stdout: tokio::process::ChildStdout, pending_responses: VecDeque>, n_processed_responses: usize, } @@ -157,6 +157,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -177,6 +178,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } } @@ -217,7 +219,7 @@ impl PostgresRedoManager { /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -270,6 +272,7 @@ impl PostgresRedoManager { let buf_tag = BufferTag { rel, blknum }; let result = proc .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout) + .await .context("apply_wal_records"); let duration = started_at.elapsed(); @@ -647,8 +650,8 @@ struct WalRedoProcess { tenant_shard_id: TenantShardId, // Some() on construction, only becomes None on Drop. child: Option, - stdout: Mutex, - stdin: Mutex, + stdout: tokio::sync::Mutex, + stdin: tokio::sync::Mutex, /// Counter to separate same sized walredo inputs failing at the same millisecond. #[cfg(feature = "testing")] dump_sequence: AtomicUsize, @@ -754,12 +757,12 @@ impl WalRedoProcess { conf, tenant_shard_id, child: Some(child), - stdin: Mutex::new(ProcessInput { - stdin, + stdin: tokio::sync::Mutex::new(ProcessInput { + stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling n_requests: 0, }), - stdout: Mutex::new(ProcessOutput { - stdout, + stdout: tokio::sync::Mutex::new(ProcessOutput { + stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling pending_responses: VecDeque::new(), n_processed_responses: 0, }), @@ -779,15 +782,13 @@ impl WalRedoProcess { // new page image. // #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] - fn apply_wal_records( + async fn apply_wal_records( &self, tag: BufferTag, base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { - let input = self.stdin.lock().unwrap(); - // Serialize all the messages to send the WAL redo process first. // // This could be problematic if there are millions of records to replay, @@ -816,7 +817,7 @@ impl WalRedoProcess { build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); + let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await; if res.is_err() { // not all of these can be caused by this particular input, however these are so rare @@ -827,38 +828,17 @@ impl WalRedoProcess { res } - fn apply_wal_records0( + async fn apply_wal_records0( &self, writebuf: &[u8], - input: MutexGuard, - wal_redo_timeout: Duration, + _wal_redo_timeout: Duration, // TODO respect ) -> anyhow::Result { + let input = self.stdin.lock().await; + let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. - let mut nwrite = 0usize; - while nwrite < writebuf.len() { - let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; - let n = loop { - match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; + proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If 'stdin' is writeable, do write. - let in_revents = stdin_pollfds[0].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += proc.stdin.write(&writebuf[nwrite..])?; - } - if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - anyhow::bail!("WAL redo process closed its stdin unexpectedly"); - } - } let request_no = proc.n_requests; proc.n_requests += 1; drop(proc); @@ -875,40 +855,13 @@ impl WalRedoProcess { // pending responses ring buffer and truncate all empty elements from the front, // advancing processed responses number. - let mut output = self.stdout.lock().unwrap(); + let mut output = self.stdout.lock().await; let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it // into this buffer. let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - while nresult < BLCKSZ.into() { - let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; - // We do two things simultaneously: reading response from stdout - // and forward any logging information that the child writes to its stderr to the page server's log. - let n = loop { - match nix::poll::poll( - &mut stdout_pollfds[..], - wal_redo_timeout.as_millis() as i32, - ) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = stdout_pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += output.stdout.read(&mut resultbuf[nresult..])?; - } - if out_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } - } + output.stdout.read_exact(&mut resultbuf).await.unwrap(); output .pending_responses .push_back(Some(Bytes::from(resultbuf)));