walredo: fix EGAGAIN/"os error 11" false page reconstruction failures (#5560)

Stacked atop https://github.com/neondatabase/neon/pull/5559

Before this PR, there was the following race condition:

```
T1: polls for writeable stdin
T1: writes to stdin
T1: enters poll for stdout/stderr
T2: enters poll for stdin write
WALREDO: writes to stderr
KERNEL: wakes up T1 and T2
Tx: reads stderr and prints it
Ty: reads stderr and gets EAGAIN
(valid values for (x, y) are (1, 2) or (2, 1))
```

The concrete symptom that we observed repeatedly was with PG16,
which started logging `registered custom resource manager`
to stderr always, during startup, thereby giving us repeated
opportunity to hit above race condition. PG14 and PG15 didn't log
anything to stderr, hence we could have only hit this race condition
if there was an actual error happening.

This PR fixes the race by moving the reading of stderr into a tokio
task. It exits when the stderr is closed by the child process, which
in turn happens when the child exits, either by itself or because
we killed it.

The downside is that the async scheduling can reorder the log messages,
which can be seen in the new `test_stderr`, which runs in a
single-threaded runtime. I included the output below.

Overall I think we should move the entire walredo to async, as Joonas
proposed many months ago. This PR's asyncification is just the first
step to resolve these
false page reconstruction errors.

After this is fixed, we should stop printing that annoying stderr
message
on walredo startup; it causes noise in the pageserver logs.
That work is tracked in #5399 .

```
2023-10-13T19:05:21.878858Z ERROR apply_wal_records{tenant_id=d546fb76ba529195392fb4d19e243991 pid=753986}: failed to write out the walredo errored input: No such file or directory (os error 2) target=walredo-1697223921878-1132-0.walredo length=1132
2023-10-13T19:05:21.878932Z DEBUG postgres applied 2 WAL records (1062 bytes) in 114666 us to reconstruct page image at LSN 0/0
2023-10-13T19:05:21.878942Z ERROR error applying 2 WAL records 0/16A9388..0/16D4080 (1062 bytes) to base image with LSN 0/0 to reconstruct page image at LSN 0/0 n_attempts=0: apply_wal_records

Caused by:
    WAL redo process closed its stdout unexpectedly
2023-10-13T19:05:21.879027Z  INFO kill_and_wait_impl{pid=753986}: wait successful exit_status=signal: 11 (SIGSEGV) (core dumped)
2023-10-13T19:05:21.879079Z DEBUG wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task started
2023-10-13T19:05:21.879104Z ERROR wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: received output output="2023-10-13 19:05:21.769 GMT [753986] LOG:  registered custom resource manager \"neon\" with ID 134\n"
2023-10-13T19:05:21.879116Z DEBUG wal-redo-postgres-stderr{pid=753986 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task finished
2023-10-13T19:05:22.004439Z ERROR apply_wal_records{tenant_id=d546fb76ba529195392fb4d19e243991 pid=754000}: failed to write out the walredo errored input: No such file or directory (os error 2) target=walredo-1697223922004-1132-0.walredo length=1132
2023-10-13T19:05:22.004493Z DEBUG postgres applied 2 WAL records (1062 bytes) in 125344 us to reconstruct page image at LSN 0/0
2023-10-13T19:05:22.004501Z ERROR error applying 2 WAL records 0/16A9388..0/16D4080 (1062 bytes) to base image with LSN 0/0 to reconstruct page image at LSN 0/0 n_attempts=1: apply_wal_records

Caused by:
    WAL redo process closed its stdout unexpectedly
2023-10-13T19:05:22.004588Z  INFO kill_and_wait_impl{pid=754000}: wait successful exit_status=signal: 11 (SIGSEGV) (core dumped)
2023-10-13T19:05:22.004624Z DEBUG wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task started
2023-10-13T19:05:22.004653Z ERROR wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: received output output="2023-10-13 19:05:21.884 GMT [754000] LOG:  registered custom resource manager \"neon\" with ID 134\n"
2023-10-13T19:05:22.004666Z DEBUG wal-redo-postgres-stderr{pid=754000 tenant_id=d546fb76ba529195392fb4d19e243991 pg_version=16}: wal-redo-postgres stderr_logger_task finished
```
This commit is contained in:
Christian Schwarz
2023-10-23 10:00:13 +02:00
committed by GitHub
parent b1a1126152
commit 11e523f503
3 changed files with 130 additions and 91 deletions

View File

@@ -3668,17 +3668,21 @@ pub(crate) mod harness {
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
}
impl TenantHarness {
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
LOG_HANDLE.get_or_init(|| {
logging::init(
logging::LogFormat::Test,
// enable it in case in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
)
.expect("Failed to init test logging")
});
setup_logging();
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);

View File

@@ -27,13 +27,14 @@ use std::collections::VecDeque;
use std::io;
use std::io::prelude::*;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
@@ -47,7 +48,6 @@ use crate::metrics::{
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::walrecord::NeonWalRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
@@ -72,8 +72,6 @@ pub(crate) struct BufferTag {
struct ProcessInput {
stdin: ChildStdin,
stderr_fd: RawFd,
stdout_fd: RawFd,
n_requests: usize,
}
@@ -121,6 +119,7 @@ impl PostgresRedoManager {
/// The WAL redo is handled by a separate thread, so this just sends a request
/// to the thread and waits for response.
///
/// CANCEL SAFETY: NOT CANCEL SAFE.
pub async fn request_redo(
&self,
key: Key,
@@ -153,6 +152,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -173,6 +173,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -194,7 +195,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -283,19 +284,20 @@ impl PostgresRedoManager {
);
// Avoid concurrent callers hitting the same issue.
// We can't prevent it from happening because we want to enable parallelism.
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
{
let mut guard = self.redo_process.write().unwrap();
match &*guard {
Some(current_field_value) => {
if Arc::ptr_eq(current_field_value, &proc) {
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
*guard = None;
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
None => {
// Another thread was faster to observe the error, and already took the process out of rotation.
}
}
drop(guard);
// NB: there may still be other concurrent threads using `proc`.
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
@@ -308,7 +310,12 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
let mut wait_done = proc.stderr_logger_task_done.clone();
drop(proc);
wait_done
.wait_for(|v| *v)
.await
.expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender");
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
@@ -619,7 +626,8 @@ struct WalRedoProcess {
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stderr: Mutex<ChildStderr>,
stderr_logger_cancel: CancellationToken,
stderr_logger_task_done: tokio::sync::watch::Receiver<bool>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -668,7 +676,6 @@ impl WalRedoProcess {
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
@@ -682,16 +689,73 @@ impl WalRedoProcess {
set_nonblock_or_log_err!(stdout)?;
set_nonblock_or_log_err!(stderr)?;
let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let stderr_logger_cancel = CancellationToken::new();
let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) =
tokio::sync::watch::channel(false);
tokio::spawn({
let stderr_logger_cancel = stderr_logger_cancel.clone();
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
let _ = stderr_logger_task_done_tx.send(true);
}
debug!("wal-redo-postgres stderr_logger_task started");
loop {
// NB: we purposefully don't do a select! for the cancellation here.
// The cancellation would likely cause us to miss stderr messages.
// We can rely on this to return from .await because when we SIGKILL
// the child, the writing end of the stderr pipe gets closed.
match stderr.readable_mut().await {
Ok(mut guard) => {
let mut errbuf = [0; 16384];
let res = guard.try_io(|fd| {
use std::io::Read;
fd.get_mut().read(&mut errbuf)
});
match res {
Ok(Ok(0)) => {
// it closed the stderr pipe
break;
}
Ok(Ok(n)) => {
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
let output = String::from_utf8_lossy(&errbuf[0..n]).to_string();
error!(output, "received output");
},
Ok(Err(e)) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
Err(e) => {
let _e: tokio::io::unix::TryIoError = e;
// the read() returned WouldBlock, that's expected
}
}
}
Err(e) => {
error!(error = ?e, "read() error, waiting for cancellation");
stderr_logger_cancel.cancelled().await;
error!(error = ?e, "read() error, cancellation complete");
break;
}
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version))
});
Ok(Self {
conf,
tenant_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
stdin,
n_requests: 0,
}),
@@ -700,7 +764,8 @@ impl WalRedoProcess {
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stderr: Mutex::new(stderr),
stderr_logger_cancel,
stderr_logger_task_done: stderr_logger_task_done_rx,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
@@ -774,19 +839,11 @@ impl WalRedoProcess {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
];
let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)];
// We do two things simultaneously: send the old base image and WAL records to
// the child process's stdin and forward any logging
// information that the child writes to its stderr to the page server's log.
while nwrite < writebuf.len() {
let n = loop {
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
@@ -796,31 +853,8 @@ impl WalRedoProcess {
anyhow::bail!("WAL redo timed out");
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
} else if in_revents.contains(PollFlags::POLLHUP) {
@@ -845,6 +879,7 @@ impl WalRedoProcess {
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)];
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
@@ -855,7 +890,10 @@ impl WalRedoProcess {
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
@@ -865,31 +903,8 @@ impl WalRedoProcess {
anyhow::bail!("WAL redo timed out");
}
// If we have some messages in stderr, forward them to the log.
let err_revents = pollfds[1].revents().unwrap();
if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr = self.stderr.lock().unwrap();
let len = stderr.read(&mut errbuf)?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
if len > 0 {
error!(
"wal-redo-postgres: {}",
String::from_utf8_lossy(&errbuf[0..len])
);
// To make sure we capture all log from the process if it fails, keep
// reading from the stderr, before checking the stdout.
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
} else if out_revents.contains(PollFlags::POLLHUP) {
@@ -985,6 +1000,8 @@ impl Drop for WalRedoProcess {
.take()
.expect("we only do this once")
.kill_and_wait();
self.stderr_logger_cancel.cancel();
// no way to wait for stderr_logger_task from Drop because that is async only
}
}
@@ -1066,7 +1083,7 @@ impl Drop for NoLeakChild {
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
BACKGROUND_RUNTIME.spawn(async move {
tokio::runtime::Handle::current().spawn(async move {
tokio::task::spawn_blocking(move || {
// Intentionally don't inherit the tracing context from whoever is dropping us.
// This thread here is going to outlive of our dropper.
@@ -1199,6 +1216,22 @@ mod tests {
assert_eq!(page, crate::ZERO_PAGE);
}
#[tokio::test]
async fn test_stderr() {
let h = RedoHarness::new().unwrap();
h
.manager
.request_redo(
Key::from_i128(0),
Lsn::INVALID,
None,
short_records(),
16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
)
.await
.unwrap_err();
}
#[allow(clippy::octal_escapes)]
fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
vec![
@@ -1227,6 +1260,8 @@ mod tests {
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));

View File

@@ -1631,7 +1631,7 @@ class NeonPageserver(PgProtocol):
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
'.*registered custom resource manager "neon".*',
'.*registered custom resource manager \\\\"neon\\\\".*',
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",