From 7403d55013e44aa7f1a7420676cdcb1fde95335e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 4 Dec 2023 17:06:41 +0100 Subject: [PATCH] walredo: stderr cleanup & make explicitly cancel safe (#6031) # Problem I need walredo to be cancellation-safe for https://github.com/neondatabase/neon/pull/6000#discussion_r1412049728 # Solution We are only `async fn` because of `wait_for(stderr_logger_task_done).await`, added in #5560 . The `stderr_logger_cancel` and `stderr_logger_task_done` were there out of precaution that the stderr logger task might for some reason not stop when the walredo process terminates. That hasn't been a problem in practice. So, simplify things: - remove `stderr_logger_cancel` and the `wait_for(...stderr_logger_task_done...)` - use `tokio::process::ChildStderr` in the stderr logger task - add metrics to track number of running stderr logger tasks so in case I'm wrong here, we can use these metrics to identify the issue (not planning to put them into a dashboard or anything) --- pageserver/src/metrics.rs | 17 +++++++ pageserver/src/tenant.rs | 6 +++ pageserver/src/walredo.rs | 93 +++++++++++++-------------------------- 3 files changed, 53 insertions(+), 63 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index d2684691e0..0cfbfcdf2f 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1385,6 +1385,8 @@ pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy = pub(crate) struct WalRedoProcessCounters { pub(crate) started: IntCounter, pub(crate) killed_by_cause: enum_map::EnumMap, + pub(crate) active_stderr_logger_tasks_started: IntCounter, + pub(crate) active_stderr_logger_tasks_finished: IntCounter, } #[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)] @@ -1408,6 +1410,19 @@ impl Default for WalRedoProcessCounters { &["cause"], ) .unwrap(); + + let active_stderr_logger_tasks_started = register_int_counter!( + "pageserver_walredo_stderr_logger_tasks_started_total", + "Number of active walredo stderr logger tasks that have started", + ) + .unwrap(); + + let active_stderr_logger_tasks_finished = register_int_counter!( + "pageserver_walredo_stderr_logger_tasks_finished_total", + "Number of active walredo stderr logger tasks that have finished", + ) + .unwrap(); + Self { started, killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| { @@ -1415,6 +1430,8 @@ impl Default for WalRedoProcessCounters { let cause_str: &'static str = cause.into(); killed.with_label_values(&[cause_str]) })), + active_stderr_logger_tasks_started, + active_stderr_logger_tasks_finished, } } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 422cb671fe..f67a4174af 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -312,6 +312,9 @@ impl WalRedoManager { } } + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn request_redo( &self, key: crate::repository::Key, @@ -3893,6 +3896,9 @@ pub(crate) mod harness { pub(crate) struct TestRedoManager; impl TestRedoManager { + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn request_redo( &self, key: Key, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index edce158e75..94e95fd3b3 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -34,7 +34,6 @@ use std::process::{Child, ChildStdin, ChildStdout, Command}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::Duration; use std::time::Instant; -use tokio_util::sync::CancellationToken; use tracing::*; use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock}; @@ -124,7 +123,9 @@ impl PostgresRedoManager { /// The WAL redo is handled by a separate thread, so this just sends a request /// to the thread and waits for response. /// - /// CANCEL SAFETY: NOT CANCEL SAFE. + /// # Cancel-Safety + /// + /// This method is cancellation-safe. pub async fn request_redo( &self, key: Key, @@ -157,7 +158,6 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await }; img = Some(result?); @@ -178,7 +178,6 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, ) - .await } } } @@ -216,7 +215,7 @@ impl PostgresRedoManager { /// Process one request for WAL redo using wal-redo postgres /// #[allow(clippy::too_many_arguments)] - async fn apply_batch_postgres( + fn apply_batch_postgres( &self, key: Key, lsn: Lsn, @@ -332,12 +331,7 @@ impl PostgresRedoManager { // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here, // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads. // This probably needs revisiting at some later point. - let mut wait_done = proc.stderr_logger_task_done.clone(); drop(proc); - wait_done - .wait_for(|v| *v) - .await - .expect("we use scopeguard to ensure we always send `true` to the channel before dropping the sender"); } else if n_attempts != 0 { info!(n_attempts, "retried walredo succeeded"); } @@ -649,8 +643,6 @@ struct WalRedoProcess { child: Option, stdout: Mutex, stdin: Mutex, - stderr_logger_cancel: CancellationToken, - stderr_logger_task_done: tokio::sync::watch::Receiver, /// Counter to separate same sized walredo inputs failing at the same millisecond. #[cfg(feature = "testing")] dump_sequence: AtomicUsize, @@ -699,6 +691,8 @@ impl WalRedoProcess { let stdin = child.stdin.take().unwrap(); let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); + let stderr = tokio::process::ChildStderr::from_std(stderr) + .context("convert to tokio::ChildStderr")?; macro_rules! set_nonblock_or_log_err { ($file:ident) => {{ let res = set_nonblock($file.as_raw_fd()); @@ -710,69 +704,45 @@ impl WalRedoProcess { } set_nonblock_or_log_err!(stdin)?; set_nonblock_or_log_err!(stdout)?; - set_nonblock_or_log_err!(stderr)?; - - let mut stderr = tokio::io::unix::AsyncFd::new(stderr).context("AsyncFd::with_interest")?; // all fallible operations post-spawn are complete, so get rid of the guard let child = scopeguard::ScopeGuard::into_inner(child); - let stderr_logger_cancel = CancellationToken::new(); - let (stderr_logger_task_done_tx, stderr_logger_task_done_rx) = - tokio::sync::watch::channel(false); - tokio::spawn({ - let stderr_logger_cancel = stderr_logger_cancel.clone(); + tokio::spawn( async move { scopeguard::defer! { debug!("wal-redo-postgres stderr_logger_task finished"); - let _ = stderr_logger_task_done_tx.send(true); + crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); } debug!("wal-redo-postgres stderr_logger_task started"); - loop { - // NB: we purposefully don't do a select! for the cancellation here. - // The cancellation would likely cause us to miss stderr messages. - // We can rely on this to return from .await because when we SIGKILL - // the child, the writing end of the stderr pipe gets closed. - match stderr.readable_mut().await { - Ok(mut guard) => { - let mut errbuf = [0; 16384]; - let res = guard.try_io(|fd| { - use std::io::Read; - fd.get_mut().read(&mut errbuf) - }); - match res { - Ok(Ok(0)) => { - // it closed the stderr pipe - break; - } - Ok(Ok(n)) => { - // The message might not be split correctly into lines here. But this is - // good enough, the important thing is to get the message to the log. - let output = String::from_utf8_lossy(&errbuf[0..n]).to_string(); - error!(output, "received output"); - }, - Ok(Err(e)) => { - error!(error = ?e, "read() error, waiting for cancellation"); - stderr_logger_cancel.cancelled().await; - error!(error = ?e, "read() error, cancellation complete"); - break; - } - Err(e) => { - let _e: tokio::io::unix::TryIoError = e; - // the read() returned WouldBlock, that's expected - } - } + crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); + + use tokio::io::AsyncBufReadExt; + let mut stderr_lines = tokio::io::BufReader::new(stderr); + let mut buf = Vec::new(); + let res = loop { + buf.clear(); + // TODO we don't trust the process to cap its stderr length. + // Currently it can do unbounded Vec allocation. + match stderr_lines.read_until(b'\n', &mut buf).await { + Ok(0) => break Ok(()), // eof + Ok(num_bytes) => { + let output = String::from_utf8_lossy(&buf[..num_bytes]); + error!(%output, "received output"); } Err(e) => { - error!(error = ?e, "read() error, waiting for cancellation"); - stderr_logger_cancel.cancelled().await; - error!(error = ?e, "read() error, cancellation complete"); - break; + break Err(e); } } + }; + match res { + Ok(()) => (), + Err(e) => { + error!(error=?e, "failed to read from walredo stderr"); + } } }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_id, %pg_version)) - }); + ); Ok(Self { conf, @@ -787,8 +757,6 @@ impl WalRedoProcess { pending_responses: VecDeque::new(), n_processed_responses: 0, }), - stderr_logger_cancel, - stderr_logger_task_done: stderr_logger_task_done_rx, #[cfg(feature = "testing")] dump_sequence: AtomicUsize::default(), }) @@ -1029,7 +997,6 @@ impl Drop for WalRedoProcess { .take() .expect("we only do this once") .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); - self.stderr_logger_cancel.cancel(); // no way to wait for stderr_logger_task from Drop because that is async only } }