diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 3127f8a712..9bb44cd855 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -73,12 +73,95 @@ pub(crate) struct BufferTag { struct ProcessInput { stdin: tokio::process::ChildStdin, n_requests: usize, + poison: poison::Poison, } struct ProcessOutput { stdout: tokio::process::ChildStdout, pending_responses: VecDeque>, n_processed_responses: usize, + poison: poison::Poison, +} + +mod poison { + use std::time::Instant; + + use tracing::warn; + + pub struct Poison { + what: &'static str, + state: State, + } + + #[derive(Clone, Copy)] + enum State { + Clean, + Armed, + Poisoned { at: Instant }, + } + + impl Poison { + pub fn new(what: &'static str) -> Self { + Self { + what, + state: State::Clean, + } + } + pub fn check_and_arm(&mut self) -> Result { + match self.state { + State::Clean => { + *self = State::Armed; + Ok(Guard(self)) + } + State::Armed => unreachable!("transient state"), + State::Poisoned { at } => Err(Error::Poisoned { + what: self.what, + at, + }), + } + } + } + + struct Guard<'a>(&'a mut Poison); + + impl<'a> Guard<'a> { + pub fn disarm(mut self) { + match self.0.state { + State::Clean => unreachable!("we set it to Armed in check_and_arm()"), + State::Armed => { + *self = State::Clean; + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state") + } + } + } + } + + impl<'a> Drop for Guard<'a> { + fn drop(&mut self) { + match self.0.state { + State::Clean => { + // set by disarm() + } + State::Armed => { + // still armed => poison it + let at = Instant::now(); + self.0.state = State::Poisoned { at }; + warn!(at=%at, "poisoning {}", self.0.what); + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state") + } + } + } + } + + #[derive(thiserror::Error, Debug)] + enum Error { + #[error("poisoned at {at}: {what}")] + Poisoned { what: &'static str, at: Instant }, + } } /// @@ -748,11 +831,13 @@ impl WalRedoProcess { stdin: tokio::sync::Mutex::new(ProcessInput { stdin, n_requests: 0, + poison: poison::Poison::clean("stdin"), }), stdout: tokio::sync::Mutex::new(ProcessOutput { stdout, pending_responses: VecDeque::new(), n_processed_responses: 0, + poison: poison::Poison::clean("stdout"), }), #[cfg(feature = "testing")] dump_sequence: AtomicUsize::default(), @@ -817,6 +902,7 @@ impl WalRedoProcess { async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result { let mut input = self.stdin.lock().await; + let poison = input.poison.check_and_arm()?; input .stdin .write_all(writebuf) @@ -824,6 +910,7 @@ impl WalRedoProcess { .context("write to walredo stdin")?; let request_no = input.n_requests; input.n_requests += 1; + poison.disarm(); drop(input); // To improve walredo performance we separate sending requests and receiving @@ -839,6 +926,7 @@ impl WalRedoProcess { // advancing processed responses number. let mut output = self.stdout.lock().await; + let poison = output.poison.check_and_arm()?; let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it