diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4dae183aea..8b74515efc 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3668,17 +3668,21 @@ pub(crate) mod harness { static LOG_HANDLE: OnceCell<()> = OnceCell::new(); + pub(crate) fn setup_logging() { + LOG_HANDLE.get_or_init(|| { + logging::init( + logging::LogFormat::Test, + // enable it in case the tests exercise code paths that use + // debug_assert_current_span_has_tenant_and_timeline_id + logging::TracingErrorLayerEnablement::EnableWithRustLogFilter, + ) + .expect("Failed to init test logging") + }); + } + impl TenantHarness { pub fn create(test_name: &'static str) -> anyhow::Result { - LOG_HANDLE.get_or_init(|| { - logging::init( - logging::LogFormat::Test, - // enable it in case in case the tests exercise code paths that use - // debug_assert_current_span_has_tenant_and_timeline_id - logging::TracingErrorLayerEnablement::EnableWithRustLogFilter, - ) - .expect("Failed to init test logging") - }); + setup_logging(); let repo_dir = PageServerConf::test_repo_dir(test_name); let _ = fs::remove_dir_all(&repo_dir); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 7e61a1dc37..cffd912e16 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -27,13 +27,14 @@ use std::collections::VecDeque; use std::io; use std::io::prelude::*; use std::ops::{Deref, DerefMut}; -use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::io::AsRawFd; use std::os::unix::prelude::CommandExt; use std::process::Stdio; -use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; +use std::process::{Child, ChildStdin, ChildStdout, Command}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::Duration; use std::time::Instant; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock}; @@ -47,7 +48,6 @@ use crate::metrics::{ }; use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block}; use crate::repository::Key; -use crate::task_mgr::BACKGROUND_RUNTIME; use crate::walrecord::NeonWalRecord; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; @@ -72,8 +72,6 @@ pub(crate) struct BufferTag { struct ProcessInput { stdin: ChildStdin, - stderr_fd: RawFd, - stdout_fd: RawFd, n_requests: usize, } @@ -121,6 +119,7 @@ impl PostgresRedoManager { /// The WAL redo is handled by a separate thread, so this just sends a request /// to the thread and waits for response. /// + /// CANCEL SAFETY: NOT CANCEL SAFE. pub async fn request_redo( &self, key: Key, @@ -153,6 +152,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await }; img = Some(result?); @@ -173,6 +173,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) + .await } } } @@ -194,7 +195,7 @@ impl PostgresRedoManager { /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - fn apply_batch_postgres( + async fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -283,19 +284,20 @@ impl PostgresRedoManager { ); // Avoid concurrent callers hitting the same issue. // We can't prevent it from happening because we want to enable parallelism. - let mut guard = self.redo_process.write().unwrap(); - match &*guard { - Some(current_field_value) => { - if Arc::ptr_eq(current_field_value, &proc) { - // We're the first to observe an error from `proc`, it's our job to take it out of rotation. - *guard = None; + { + let mut guard = self.redo_process.write().unwrap(); + match &*guard { + Some(current_field_value) => { + if Arc::ptr_eq(current_field_value, &proc) { + // We're the first to observe an error from `proc`, it's our job to take it out of rotation. + *guard = None; + } + } + None => { + // Another thread was faster to observe the error, and already took the process out of rotation. } } - None => { - // Another thread was faster to observe the error, and already took the process out of rotation. - } } - drop(guard); // NB: there may still be other concurrent threads using `proc`. // The last one will send SIGKILL when the underlying Arc reaches refcount 0. // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep @@ -308,7 +310,12 @@ impl PostgresRedoManager { // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. + let mut wait_done = proc.stderr_logger_task_done.clone(); drop(proc); + wait_done + .wait_for(|v| *v) + .await + .expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender"); } else if n_attempts != 0 { info!(n_attempts, "retried walredo succeeded"); } @@ -619,7 +626,8 @@ struct WalRedoProcess { child: Option, stdout: Mutex, stdin: Mutex, - stderr: Mutex, + stderr_logger_cancel: CancellationToken, + stderr_logger_task_done: tokio::sync::watch::Receiver, /// Counter to separate same sized walredo inputs failing at the same millisecond. #[cfg(feature = "testing")] dump_sequence: AtomicUsize, @@ -668,7 +676,6 @@ impl WalRedoProcess { let stdin = child.stdin.take().unwrap(); let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); - macro_rules! set_nonblock_or_log_err { ($file:ident) => {{ let res = set_nonblock($file.as_raw_fd()); @@ -682,16 +689,73 @@ impl WalRedoProcess { set_nonblock_or_log_err!(stdout)?; set_nonblock_or_log_err!(stderr)?; + let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?; + // all fallible operations post-spawn are complete, so get rid of the guard let child = scopeguard::ScopeGuard::into_inner(child); + let stderr_logger_cancel = CancellationToken::new(); + let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) = + tokio::sync::watch::channel(false); + tokio::spawn({ + let stderr_logger_cancel = stderr_logger_cancel.clone(); + async move { + scopeguard::defer! { + debug!("wal-redo-postgres stderr_logger_task finished"); + let _ = stderr_logger_task_done_tx.send(true); + } + debug!("wal-redo-postgres stderr_logger_task started"); + loop { + // NB: we purposefully don't do a select! for the cancellation here. + // The cancellation would likely cause us to miss stderr messages. + // We can rely on this to return from .await because when we SIGKILL + // the child, the writing end of the stderr pipe gets closed. + match stderr.readable_mut().await { + Ok(mut guard) => { + let mut errbuf = [0; 16384]; + let res = guard.try_io(|fd| { + use std::io::Read; + fd.get_mut().read(&mut errbuf) + }); + match res { + Ok(Ok(0)) => { + // it closed the stderr pipe + break; + } + Ok(Ok(n)) => { + // The message might not be split correctly into lines here. But this is + // good enough, the important thing is to get the message to the log. + let output = String::from_utf8_lossy(&errbuf[0..n]).to_string(); + error!(output, "received output"); + }, + Ok(Err(e)) => { + error!(error = ?e, "read() error, waiting for cancellation"); + stderr_logger_cancel.cancelled().await; + error!(error = ?e, "read() error, cancellation complete"); + break; + } + Err(e) => { + let _e: tokio::io::unix::TryIoError = e; + // the read() returned WouldBlock, that's expected + } + } + } + Err(e) => { + error!(error = ?e, "read() error, waiting for cancellation"); + stderr_logger_cancel.cancelled().await; + error!(error = ?e, "read() error, cancellation complete"); + break; + } + } + } + }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version)) + }); + Ok(Self { conf, tenant_id, child: Some(child), stdin: Mutex::new(ProcessInput { - stdout_fd: stdout.as_raw_fd(), - stderr_fd: stderr.as_raw_fd(), stdin, n_requests: 0, }), @@ -700,7 +764,8 @@ impl WalRedoProcess { pending_responses: VecDeque::new(), n_processed_responses: 0, }), - stderr: Mutex::new(stderr), + stderr_logger_cancel, + stderr_logger_task_done: stderr_logger_task_done_rx, #[cfg(feature = "testing")] dump_sequence: AtomicUsize::default(), }) @@ -774,19 +839,11 @@ impl WalRedoProcess { let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. let mut nwrite = 0usize; - // Prepare for calling poll() - let mut pollfds = [ - PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT), - PollFd::new(proc.stderr_fd, PollFlags::POLLIN), - PollFd::new(proc.stdout_fd, PollFlags::POLLIN), - ]; + let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)]; - // We do two things simultaneously: send the old base image and WAL records to - // the child process's stdin and forward any logging - // information that the child writes to its stderr to the page server's log. while nwrite < writebuf.len() { let n = loop { - match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) { + match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { Err(nix::errno::Errno::EINTR) => continue, res => break res, } @@ -796,31 +853,8 @@ impl WalRedoProcess { anyhow::bail!("WAL redo timed out"); } - // If we have some messages in stderr, forward them to the log. - let err_revents = pollfds[1].revents().unwrap(); - if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - let mut errbuf: [u8; 16384] = [0; 16384]; - let mut stderr = self.stderr.lock().unwrap(); - let len = stderr.read(&mut errbuf)?; - - // The message might not be split correctly into lines here. But this is - // good enough, the important thing is to get the message to the log. - if len > 0 { - error!( - "wal-redo-postgres: {}", - String::from_utf8_lossy(&errbuf[0..len]) - ); - - // To make sure we capture all log from the process if it fails, keep - // reading from the stderr, before checking the stdout. - continue; - } - } else if err_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stderr unexpectedly"); - } - // If 'stdin' is writeable, do write. - let in_revents = pollfds[0].revents().unwrap(); + let in_revents = stdin_pollfds[0].revents().unwrap(); if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { nwrite += proc.stdin.write(&writebuf[nwrite..])?; } else if in_revents.contains(PollFlags::POLLHUP) { @@ -845,6 +879,7 @@ impl WalRedoProcess { // advancing processed responses number. let mut output = self.stdout.lock().unwrap(); + let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)]; let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it @@ -855,7 +890,10 @@ impl WalRedoProcess { // We do two things simultaneously: reading response from stdout // and forward any logging information that the child writes to its stderr to the page server's log. let n = loop { - match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) { + match nix::poll::poll( + &mut stdout_pollfds[..], + wal_redo_timeout.as_millis() as i32, + ) { Err(nix::errno::Errno::EINTR) => continue, res => break res, } @@ -865,31 +903,8 @@ impl WalRedoProcess { anyhow::bail!("WAL redo timed out"); } - // If we have some messages in stderr, forward them to the log. - let err_revents = pollfds[1].revents().unwrap(); - if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - let mut errbuf: [u8; 16384] = [0; 16384]; - let mut stderr = self.stderr.lock().unwrap(); - let len = stderr.read(&mut errbuf)?; - - // The message might not be split correctly into lines here. But this is - // good enough, the important thing is to get the message to the log. - if len > 0 { - error!( - "wal-redo-postgres: {}", - String::from_utf8_lossy(&errbuf[0..len]) - ); - - // To make sure we capture all log from the process if it fails, keep - // reading from the stderr, before checking the stdout. - continue; - } - } else if err_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stderr unexpectedly"); - } - // If we have some data in stdout, read it to the result buffer. - let out_revents = pollfds[2].revents().unwrap(); + let out_revents = stdout_pollfds[0].revents().unwrap(); if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { nresult += output.stdout.read(&mut resultbuf[nresult..])?; } else if out_revents.contains(PollFlags::POLLHUP) { @@ -985,6 +1000,8 @@ impl Drop for WalRedoProcess { .take() .expect("we only do this once") .kill_and_wait(); + self.stderr_logger_cancel.cancel(); + // no way to wait for stderr_logger_task from Drop because that is async only } } @@ -1066,7 +1083,7 @@ impl Drop for NoLeakChild { // Offload the kill+wait of the child process into the background. // If someone stops the runtime, we'll leak the child process. // We can ignore that case because we only stop the runtime on pageserver exit. - BACKGROUND_RUNTIME.spawn(async move { + tokio::runtime::Handle::current().spawn(async move { tokio::task::spawn_blocking(move || { // Intentionally don't inherit the tracing context from whoever is dropping us. // This thread here is going to outlive of our dropper. @@ -1199,6 +1216,22 @@ mod tests { assert_eq!(page, crate::ZERO_PAGE); } + #[tokio::test] + async fn test_stderr() { + let h = RedoHarness::new().unwrap(); + h + .manager + .request_redo( + Key::from_i128(0), + Lsn::INVALID, + None, + short_records(), + 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */ + ) + .await + .unwrap_err(); + } + #[allow(clippy::octal_escapes)] fn short_records() -> Vec<(Lsn, NeonWalRecord)> { vec![ @@ -1227,6 +1260,8 @@ mod tests { impl RedoHarness { fn new() -> anyhow::Result { + crate::tenant::harness::setup_logging(); + let repo_dir = camino_tempfile::tempdir()?; let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 2d73972eba..38e17985ac 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1631,7 +1631,7 @@ class NeonPageserver(PgProtocol): ".*took more than expected to complete.*", # these can happen during shutdown, but it should not be a reason to fail a test ".*completed, took longer than expected.*", - '.*registered custom resource manager "neon".*', + '.*registered custom resource manager \\\\"neon\\\\".*', # AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these # and it is not a failure of our code when it happens. ".*DeleteObjects.*We encountered an internal error. Please try again.*",