From c5ca7d0c686e5c9286472375db650ccf97224215 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 27 Jan 2023 18:36:24 +0200 Subject: [PATCH] Implement asynchronous pipe for communication with walredo process (#3368) Co-authored-by: Christian Schwarz --- pageserver/src/walredo.rs | 329 +++++++++++++++++++++++++++----------- 1 file changed, 236 insertions(+), 93 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index fd0524016f..c943bf0a27 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -22,16 +22,18 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use nix::poll::*; use serde::Serialize; +use std::collections::VecDeque; use std::fs::OpenOptions; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::ops::{Deref, DerefMut}; +use std::os::fd::RawFd; use std::os::unix::io::AsRawFd; use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::process::Stdio; use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; use std::time::Duration; use std::time::Instant; use std::{fs, io}; @@ -90,6 +92,20 @@ pub trait WalRedoManager: Send + Sync { ) -> Result; } +struct ProcessInput { + child: NoLeakChild, + stdin: ChildStdin, + stderr_fd: RawFd, + stdout_fd: RawFd, + n_requests: usize, +} + +struct ProcessOutput { + stdout: ChildStdout, + pending_responses: VecDeque>, + n_processed_responses: usize, +} + /// /// This is the real implementation that uses a Postgres process to /// perform WAL replay. Only one thread can use the process at a time, @@ -101,7 +117,9 @@ pub struct PostgresRedoManager { tenant_id: TenantId, conf: &'static PageServerConf, - process: Mutex>, + stdout: Mutex>, + stdin: Mutex>, + stderr: Mutex>, } /// Can this request be served by neon redo functions @@ -209,16 +227,17 @@ impl PostgresRedoManager { PostgresRedoManager { tenant_id, conf, - process: Mutex::new(None), + stdin: Mutex::new(None), + stdout: Mutex::new(None), + stderr: Mutex::new(None), } } /// Launch process pre-emptively. Should not be needed except for benchmarking. - pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> { - let inner = self.process.get_mut().unwrap(); - if inner.is_none() { - let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?; - *inner = Some(p); + pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> { + let mut proc = self.stdin.lock().unwrap(); + if proc.is_none() { + self.launch(&mut proc, pg_version)?; } Ok(()) } @@ -241,22 +260,19 @@ impl PostgresRedoManager { let start_time = Instant::now(); - let mut process_guard = self.process.lock().unwrap(); + let mut proc = self.stdin.lock().unwrap(); let lock_time = Instant::now(); // launch the WAL redo process on first use - if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?; - *process_guard = Some(p); + if proc.is_none() { + self.launch(&mut proc, pg_version)?; } - let process = process_guard.as_mut().unwrap(); - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; - let result = process - .apply_wal_records(buf_tag, base_img, records, wal_redo_timeout) + let result = self + .apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout) .map_err(WalRedoError::IoError); let end_time = Instant::now(); @@ -295,8 +311,22 @@ impl PostgresRedoManager { base_img_lsn, lsn ); - let process = process_guard.take().unwrap(); - process.kill(); + // self.stdin only holds stdin & stderr as_raw_fd(). + // Dropping it as part of take() doesn't close them. + // The owning objects (ChildStdout and ChildStderr) are stored in + // self.stdout and self.stderr, respsectively. + // We intentionally keep them open here to avoid a race between + // currently running `apply_wal_records()` and a `launch()` call + // after we return here. + // The currently running `apply_wal_records()` must not read from + // the newly launched process. + // By keeping self.stdout and self.stderr open here, `launch()` will + // get other file descriptors for the new child's stdout and stderr, + // and hence the current `apply_wal_records()` calls will observe + // `output.stdout.as_raw_fd() != stdout_fd` . + if let Some(proc) = self.stdin.lock().unwrap().take() { + proc.child.kill_and_wait(); + } } result } @@ -595,32 +625,23 @@ impl CloseFileDescriptors for C { } } -/// -/// Handle to the Postgres WAL redo process -/// -struct PostgresRedoProcess { - tenant_id: TenantId, - child: NoLeakChild, - stdin: ChildStdin, - stdout: ChildStdout, - stderr: ChildStderr, -} - -impl PostgresRedoProcess { +impl PostgresRedoManager { // // Start postgres binary in special WAL redo mode. // - #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))] + #[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))] fn launch( - conf: &PageServerConf, - tenant_id: TenantId, + &self, + input: &mut MutexGuard>, pg_version: u32, - ) -> Result { + ) -> Result<(), Error> { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. let datadir = path_with_suffix_extension( - conf.tenant_path(&tenant_id).join("wal-redo-datadir"), + self.conf + .tenant_path(&self.tenant_id) + .join("wal-redo-datadir"), TEMP_FILE_SUFFIX, ); @@ -634,10 +655,12 @@ impl PostgresRedoProcess { ) })?; } - let pg_bin_dir_path = conf + let pg_bin_dir_path = self + .conf .pg_bin_dir(pg_version) .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?; - let pg_lib_dir_path = conf + let pg_lib_dir_path = self + .conf .pg_lib_dir(pg_version) .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?; @@ -723,27 +746,31 @@ impl PostgresRedoProcess { // all fallible operations post-spawn are complete, so get rid of the guard let child = scopeguard::ScopeGuard::into_inner(child); - Ok(PostgresRedoProcess { - tenant_id, + **input = Some(ProcessInput { child, + stdout_fd: stdout.as_raw_fd(), + stderr_fd: stderr.as_raw_fd(), stdin, + n_requests: 0, + }); + + *self.stdout.lock().unwrap() = Some(ProcessOutput { stdout, - stderr, - }) + pending_responses: VecDeque::new(), + n_processed_responses: 0, + }); + *self.stderr.lock().unwrap() = Some(stderr); + + Ok(()) } - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))] - fn kill(self) { - self.child.kill_and_wait(); - } - - // // Apply given WAL records ('records') over an old page image. Returns // new page image. // - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))] + #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))] fn apply_wal_records( - &mut self, + &self, + mut input: MutexGuard>, tag: BufferTag, base_img: Option, records: &[(Lsn, NeonWalRecord)], @@ -780,33 +807,23 @@ impl PostgresRedoProcess { build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - // The input is now in 'writebuf'. Do a blind write first, writing as much as - // we can, before calling poll(). That skips one call to poll() if the stdin is - // already available for writing, which it almost certainly is because the - // process is idle. - let mut nwrite = self.stdin.write(&writebuf)?; - - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far + let proc = input.as_mut().unwrap(); + let mut nwrite = 0usize; + let stdout_fd = proc.stdout_fd; // Prepare for calling poll() let mut pollfds = [ - PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN), - PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN), - PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT), + PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT), + PollFd::new(proc.stderr_fd, PollFlags::POLLIN), + PollFd::new(stdout_fd, PollFlags::POLLIN), ]; - // We do three things simultaneously: send the old base image and WAL records to - // the child process's stdin, read the result from child's stdout, and forward any logging + // We do two things simultaneously: send the old base image and WAL records to + // the child process's stdin and forward any logging // information that the child writes to its stderr to the page server's log. - while nresult < BLCKSZ.into() { - // If we have more data to write, wake up if 'stdin' becomes writeable or - // we have data to read. Otherwise only wake up if there's data to read. - let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; + while nwrite < writebuf.len() { let n = loop { - match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) { + match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) { Err(e) if e == nix::errno::Errno::EINTR => continue, res => break res, } @@ -820,14 +837,16 @@ impl PostgresRedoProcess { let err_revents = pollfds[1].revents().unwrap(); if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { let mut errbuf: [u8; 16384] = [0; 16384]; - let n = self.stderr.read(&mut errbuf)?; + let mut stderr_guard = self.stderr.lock().unwrap(); + let stderr = stderr_guard.as_mut().unwrap(); + let len = stderr.read(&mut errbuf)?; // The message might not be split correctly into lines here. But this is // good enough, the important thing is to get the message to the log. - if n > 0 { + if len > 0 { error!( "wal-redo-postgres: {}", - String::from_utf8_lossy(&errbuf[0..n]) + String::from_utf8_lossy(&errbuf[0..len]) ); // To make sure we capture all log from the process if it fails, keep @@ -841,33 +860,157 @@ impl PostgresRedoProcess { )); } - // If we have more data to write and 'stdin' is writeable, do write. - if nwrite < writebuf.len() { - let in_revents = pollfds[2].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += self.stdin.write(&writebuf[nwrite..])?; - } else if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - return Err(Error::new( - ErrorKind::BrokenPipe, - "WAL redo process closed its stdin unexpectedly", - )); - } - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += self.stdout.read(&mut resultbuf[nresult..])?; - } else if out_revents.contains(PollFlags::POLLHUP) { + // If 'stdin' is writeable, do write. + let in_revents = pollfds[0].revents().unwrap(); + if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { + nwrite += proc.stdin.write(&writebuf[nwrite..])?; + } else if in_revents.contains(PollFlags::POLLHUP) { + // We still have more data to write, but the process closed the pipe. return Err(Error::new( ErrorKind::BrokenPipe, - "WAL redo process closed its stdout unexpectedly", + "WAL redo process closed its stdin unexpectedly", )); } } + let request_no = proc.n_requests; + proc.n_requests += 1; + drop(input); - Ok(Bytes::from(resultbuf)) + // To improve walredo performance we separate sending requests and receiving + // responses. Them are protected by different mutexes (output and input). + // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process + // then there is not warranty that T1 will first granted output mutex lock. + // To address this issue we maintain number of sent requests, number of processed + // responses and ring buffer with pending responses. After sending response + // (under input mutex), threads remembers request number. Then it releases + // input mutex, locks output mutex and fetch in ring buffer all responses until + // its stored request number. The it takes correspondent element from + // pending responses ring buffer and truncate all empty elements from the front, + // advancing processed responses number. + + let mut output_guard = self.stdout.lock().unwrap(); + let output = output_guard.as_mut().unwrap(); + if output.stdout.as_raw_fd() != stdout_fd { + // If stdout file descriptor is changed then it means that walredo process is crashed and restarted. + // As far as ProcessInput and ProcessOutout are protected by different mutexes, + // it can happen that we send request to one process and waiting response from another. + // To prevent such situation we compare stdout file descriptors. + // As far as old stdout pipe is destroyed only after new one is created, + // it can not reuse the same file descriptor, so this check is safe. + // + // Cross-read this with the comment in apply_batch_postgres if result.is_err(). + // That's where we kill the child process. + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdout unexpectedly", + )); + } + let n_processed_responses = output.n_processed_responses; + while n_processed_responses + output.pending_responses.len() <= request_no { + // We expect the WAL redo process to respond with an 8k page image. We read it + // into this buffer. + let mut resultbuf = vec![0; BLCKSZ.into()]; + let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far + while nresult < BLCKSZ.into() { + // We do two things simultaneously: reading response from stdout + // and forward any logging information that the child writes to its stderr to the page server's log. + let n = loop { + match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) { + Err(e) if e == nix::errno::Errno::EINTR => continue, + res => break res, + } + }?; + + if n == 0 { + return Err(Error::new(ErrorKind::Other, "WAL redo timed out")); + } + + // If we have some messages in stderr, forward them to the log. + let err_revents = pollfds[1].revents().unwrap(); + if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + let mut errbuf: [u8; 16384] = [0; 16384]; + let mut stderr_guard = self.stderr.lock().unwrap(); + let stderr = stderr_guard.as_mut().unwrap(); + let len = stderr.read(&mut errbuf)?; + + // The message might not be split correctly into lines here. But this is + // good enough, the important thing is to get the message to the log. + if len > 0 { + error!( + "wal-redo-postgres: {}", + String::from_utf8_lossy(&errbuf[0..len]) + ); + + // To make sure we capture all log from the process if it fails, keep + // reading from the stderr, before checking the stdout. + continue; + } + } else if err_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stderr unexpectedly", + )); + } + + // If we have some data in stdout, read it to the result buffer. + let out_revents = pollfds[2].revents().unwrap(); + if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { + nresult += output.stdout.read(&mut resultbuf[nresult..])?; + } else if out_revents.contains(PollFlags::POLLHUP) { + return Err(Error::new( + ErrorKind::BrokenPipe, + "WAL redo process closed its stdout unexpectedly", + )); + } + } + output + .pending_responses + .push_back(Some(Bytes::from(resultbuf))); + } + // Replace our request's response with None in `pending_responses`. + // Then make space in the ring buffer by clearing out any seqence of contiguous + // `None`'s from the front of `pending_responses`. + // NB: We can't pop_front() because other requests' responses because another + // requester might have grabbed the output mutex before us: + // T1: grab input mutex + // T1: send request_no 23 + // T1: release input mutex + // T2: grab input mutex + // T2: send request_no 24 + // T2: release input mutex + // T2: grab output mutex + // T2: n_processed_responses + output.pending_responses.len() <= request_no + // 23 0 24 + // T2: enters poll loop that reads stdout + // T2: put response for 23 into pending_responses + // T2: put response for 24 into pending_resposnes + // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back + // T2: takes its response_24 + // pending_responses now looks like this: Front Some(response_23) None Back + // T2: does the while loop below + // pending_responses now looks like this: Front Some(response_23) None Back + // T2: releases output mutex + // T1: grabs output mutex + // T1: n_processed_responses + output.pending_responses.len() > request_no + // 23 2 23 + // T1: skips poll loop that reads stdout + // T1: takes its response_23 + // pending_responses now looks like this: Front None None Back + // T2: does the while loop below + // pending_responses now looks like this: Front Back + // n_processed_responses now has value 25 + let res = output.pending_responses[request_no - n_processed_responses] + .take() + .expect("we own this request_no, nobody else is supposed to take it"); + while let Some(front) = output.pending_responses.front() { + if front.is_none() { + output.pending_responses.pop_front(); + output.n_processed_responses += 1; + } else { + break; + } + } + Ok(res) } }