Implement asynchronous pipe for communication with walredo process (#3368)

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Konstantin Knizhnik
2023-01-27 18:36:24 +02:00
committed by GitHub
parent 0ec84e2f1f
commit c5ca7d0c68

View File

@@ -22,16 +22,18 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::ops::{Deref, DerefMut};
use std::os::fd::RawFd;
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -90,6 +92,20 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
struct ProcessInput {
child: NoLeakChild,
stdin: ChildStdin,
stderr_fd: RawFd,
stdout_fd: RawFd,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
@@ -101,7 +117,9 @@ pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
stderr: Mutex<Option<ChildStderr>>,
}
/// Can this request be served by neon redo functions
@@ -209,16 +227,17 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
stdin: Mutex::new(None),
stdout: Mutex::new(None),
stderr: Mutex::new(None),
}
}
/// Launch process pre-emptively. Should not be needed except for benchmarking.
pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> {
let inner = self.process.get_mut().unwrap();
if inner.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*inner = Some(p);
pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> {
let mut proc = self.stdin.lock().unwrap();
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
}
Ok(())
}
@@ -241,22 +260,19 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let mut proc = self.stdin.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*process_guard = Some(p);
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = process
.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout)
let result = self
.apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let end_time = Instant::now();
@@ -295,8 +311,22 @@ impl PostgresRedoManager {
base_img_lsn,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
// self.stdout and self.stderr, respsectively.
// We intentionally keep them open here to avoid a race between
// currently running `apply_wal_records()` and a `launch()` call
// after we return here.
// The currently running `apply_wal_records()` must not read from
// the newly launched process.
// By keeping self.stdout and self.stderr open here, `launch()` will
// get other file descriptors for the new child's stdout and stderr,
// and hence the current `apply_wal_records()` calls will observe
// `output.stdout.as_raw_fd() != stdout_fd` .
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
}
}
result
}
@@ -595,32 +625,23 @@ impl<C: CommandExt> CloseFileDescriptors for C {
}
}
///
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess {
tenant_id: TenantId,
child: NoLeakChild,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
}
impl PostgresRedoProcess {
impl PostgresRedoManager {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
fn launch(
conf: &PageServerConf,
tenant_id: TenantId,
&self,
input: &mut MutexGuard<Option<ProcessInput>>,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
) -> Result<(), Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = path_with_suffix_extension(
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
self.conf
.tenant_path(&self.tenant_id)
.join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
@@ -634,10 +655,12 @@ impl PostgresRedoProcess {
)
})?;
}
let pg_bin_dir_path = conf
let pg_bin_dir_path = self
.conf
.pg_bin_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
let pg_lib_dir_path = conf
let pg_lib_dir_path = self
.conf
.pg_lib_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
@@ -723,27 +746,31 @@ impl PostgresRedoProcess {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
Ok(PostgresRedoProcess {
tenant_id,
**input = Some(ProcessInput {
child,
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
});
*self.stdout.lock().unwrap() = Some(ProcessOutput {
stdout,
stderr,
})
pending_responses: VecDeque::new(),
n_processed_responses: 0,
});
*self.stderr.lock().unwrap() = Some(stderr);
Ok(())
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn kill(self) {
self.child.kill_and_wait();
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
fn apply_wal_records(
&mut self,
&self,
mut input: MutexGuard<Option<ProcessInput>>,
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
@@ -780,33 +807,23 @@ impl PostgresRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
// The input is now in 'writebuf'. Do a blind write first, writing as much as
// we can, before calling poll(). That skips one call to poll() if the stdin is
// already available for writing, which it almost certainly is because the
// process is idle.
let mut nwrite = self.stdin.write(&writebuf)?;
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN),
PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(stdout_fd, PollFlags::POLLIN),
];
// We do three things simultaneously: send the old base image and WAL records to
// the child process's stdin, read the result from child's stdout, and forward any logging
// We do two things simultaneously: send the old base image and WAL records to
// the child process's stdin and forward any logging
// information that the child writes to its stderr to the page server's log.
while nresult < BLCKSZ.into() {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
while nwrite < writebuf.len() {
let n = loop {
match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
@@ -820,14 +837,16 @@ impl PostgresRedoProcess {
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let n = self.stderr.read(&mut errbuf)?;
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if n > 0 {
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..n])
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
@@ -841,33 +860,157 @@ impl PostgresRedoProcess {
));
}
// If we have more data to write and 'stdin' is writeable, do write.
if nwrite < writebuf.len() {
let in_revents = pollfds[2].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += self.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
}
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += self.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
"WAL redo process closed its stdin unexpectedly",
));
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(input);
Ok(Bytes::from(resultbuf))
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output_guard = self.stdout.lock().unwrap();
let output = output_guard.as_mut().unwrap();
if output.stdout.as_raw_fd() != stdout_fd {
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
// it can happen that we send request to one process and waiting response from another.
// To prevent such situation we compare stdout file descriptors.
// As far as old stdout pipe is destroyed only after new one is created,
// it can not reuse the same file descriptor, so this check is safe.
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
res => break res,
}
}?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
}
}