walredo: fix race condition where shutdown kills the wrong process (#5557)

Before this PR, the following race condition existed:

```
T1: does the apply_wal_records() call and gets back an error
T2: does the apply_wal_records() call and gets back an error
T2: does the kill_and_shutdown
T2: new loop iteration
T2: launches new walredo process
T1: does the kill_and_shutdown of the new process
```

That last step is wrong, T2 already did the kill_and_shutdown.

The symptom of this race condition was that T2 would observe an error
when it tried to do something with the process after T1 killed it.
For example, but not limited to:
`POLLHUP` /  `"WAL redo process closed its stderr unexpectedly"`.

The fix in this PR is the following:

* Use Arc to represent walredo processes.
  The Arc lives at least as long as the walredo process.
* Use Arc::ptr_eq to determine whether to kill the process or not.

The price is an additional RwLock to protect the new `redo_process`
field
that holds the Arc. I guess that could perhaps be an atomic pointer
swap some day. But, let's get one race fixed without risking introducing
a new one.

The use of Arc/drop is also not super great here because it now allows
for an unlimited number of to-be-killed processes to exist concurrently.
See the various `NB` comments above `drop(proc)` for why it's "ok" right
now due to the blocking `wait` inside `drop`.

Note: an earlier fix attempt was
https://github.com/neondatabase/neon/pull/5545
where we apply_batch_postgres would compare stdout_fd for equality.
That's incorrect because the kernel can reuse the file descriptor when
T2 launches the new process.
Details:
https://github.com/neondatabase/neon/pull/5545#pullrequestreview-1676589373
This commit is contained in:
Christian Schwarz
2023-10-17 09:55:39 +02:00
committed by GitHub
parent 3666df6342
commit b08a0ee186

View File

@@ -26,13 +26,12 @@ use serde::Serialize;
use std::collections::VecDeque;
use std::io;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::{Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
@@ -93,7 +92,6 @@ pub trait WalRedoManager: Send + Sync {
}
struct ProcessInput {
child: NoLeakChild,
stdin: ChildStdin,
stderr_fd: RawFd,
stdout_fd: RawFd,
@@ -116,13 +114,7 @@ struct ProcessOutput {
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
stderr: Mutex<Option<ChildStderr>>,
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
}
/// Can this request be served by neon redo functions
@@ -215,11 +207,7 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
stdin: Mutex::new(None),
stdout: Mutex::new(None),
stderr: Mutex::new(None),
redo_process: RwLock::new(None),
}
}
@@ -242,20 +230,38 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut n_attempts = 0u32;
loop {
let mut proc = self.stdin.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if proc.is_none() {
self.launch(&mut proc, pg_version)
.context("launch process")?;
}
let proc: Arc<WalRedoProcess> = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
// "upgrade" to write lock to launch the process
drop(proc_guard);
let mut proc_guard = self.redo_process.write().unwrap();
match &*proc_guard {
None => {
let proc = Arc::new(
WalRedoProcess::launch(self.conf, self.tenant_id, pg_version)
.context("launch walredo process")?,
);
*proc_guard = Some(Arc::clone(&proc));
proc
}
Some(proc) => Arc::clone(proc),
}
}
Some(proc) => Arc::clone(proc),
}
};
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = self
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
let result = proc
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.context("apply_wal_records");
let end_time = Instant::now();
@@ -296,22 +302,34 @@ impl PostgresRedoManager {
n_attempts,
e,
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
// self.stdout and self.stderr, respsectively.
// We intentionally keep them open here to avoid a race between
// currently running `apply_wal_records()` and a `launch()` call
// after we return here.
// The currently running `apply_wal_records()` must not read from
// the newly launched process.
// By keeping self.stdout and self.stderr open here, `launch()` will
// get other file descriptors for the new child's stdout and stderr,
// and hence the current `apply_wal_records()` calls will observe
// `output.stdout.as_raw_fd() != stdout_fd` .
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
drop(guard);
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
// holding the lock while waiting for the process to exit.
// NB: the drop impl blocks the current threads with a wait() system call for
// the child process. We dropped the `guard` above so that other threads aren't
// affected. But, it's good that the current thread _does_ block to wait.
// If we instead deferred the waiting into the background / to tokio, it could
// happen that if walredo always fails immediately, we spawn processes faster
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
drop(proc);
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -614,24 +632,32 @@ impl<C: CommandExt> CloseFileDescriptors for C {
}
}
impl PostgresRedoManager {
struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_id: TenantId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stderr: Mutex<ChildStderr>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
fn launch(
&self,
input: &mut MutexGuard<Option<ProcessInput>>,
conf: &'static PageServerConf,
tenant_id: TenantId,
pg_version: u32,
) -> Result<(), Error> {
let pg_bin_dir_path = self
.conf
.pg_bin_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
let pg_lib_dir_path = self
.conf
.pg_lib_dir(pg_version)
.map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
) -> anyhow::Result<Self> {
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
@@ -652,13 +678,8 @@ impl PostgresRedoManager {
// as close-on-exec by default, but that's not enough, since we use
// libraries that directly call libc open without setting that flag.
.close_fds()
.spawn_no_leak_child(self.tenant_id)
.map_err(|e| {
Error::new(
e.kind(),
format!("postgres --wal-redo command failed to start: {}", e),
)
})?;
.spawn_no_leak_child(tenant_id)
.context("spawn process")?;
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
@@ -685,36 +706,47 @@ impl PostgresRedoManager {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
**input = Some(ProcessInput {
child,
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
});
Ok(Self {
conf,
tenant_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stderr: Mutex::new(stderr),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
}
*self.stdout.lock().unwrap() = Some(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
});
*self.stderr.lock().unwrap() = Some(stderr);
Ok(())
fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))]
fn apply_wal_records(
&self,
input: MutexGuard<Option<ProcessInput>>,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -757,18 +789,17 @@ impl PostgresRedoManager {
fn apply_wal_records0(
&self,
writebuf: &[u8],
mut input: MutexGuard<Option<ProcessInput>>,
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let proc = input.as_mut().unwrap();
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(stdout_fd, PollFlags::POLLIN),
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
];
// We do two things simultaneously: send the old base image and WAL records to
@@ -790,8 +821,7 @@ impl PostgresRedoManager {
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
@@ -821,7 +851,7 @@ impl PostgresRedoManager {
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(input);
drop(proc);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
@@ -835,20 +865,7 @@ impl PostgresRedoManager {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output_guard = self.stdout.lock().unwrap();
let output = output_guard.as_mut().unwrap();
if output.stdout.as_raw_fd() != stdout_fd {
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
// it can happen that we send request to one process and waiting response from another.
// To prevent such situation we compare stdout file descriptors.
// As far as old stdout pipe is destroyed only after new one is created,
// it can not reuse the same file descriptor, so this check is safe.
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
@@ -873,8 +890,7 @@ impl PostgresRedoManager {
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
@@ -984,6 +1000,15 @@ impl PostgresRedoManager {
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait();
}
}
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
struct NoLeakChild {