diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9bb44cd855..dd0d703a85 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -58,6 +58,8 @@ use postgres_ffi::v14::nonrelfile_utils::{ }; use postgres_ffi::BLCKSZ; +use self::poison::Poison; + /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. /// @@ -73,14 +75,12 @@ pub(crate) struct BufferTag { struct ProcessInput { stdin: tokio::process::ChildStdin, n_requests: usize, - poison: poison::Poison, } struct ProcessOutput { stdout: tokio::process::ChildStdout, pending_responses: VecDeque>, n_processed_responses: usize, - poison: poison::Poison, } mod poison { @@ -88,9 +88,10 @@ mod poison { use tracing::warn; - pub struct Poison { + pub struct Poison { what: &'static str, state: State, + data: T, } #[derive(Clone, Copy)] @@ -100,17 +101,18 @@ mod poison { Poisoned { at: Instant }, } - impl Poison { - pub fn new(what: &'static str) -> Self { + impl Poison { + pub fn new(what: &'static str, data: T) -> Self { Self { what, state: State::Clean, + data, } } - pub fn check_and_arm(&mut self) -> Result { + pub fn check_and_arm(&mut self) -> Result, Error> { match self.state { State::Clean => { - *self = State::Armed; + self.state = State::Armed; Ok(Guard(self)) } State::Armed => unreachable!("transient state"), @@ -122,23 +124,30 @@ mod poison { } } - struct Guard<'a>(&'a mut Poison); + pub struct Guard<'a, T>(&'a mut Poison); - impl<'a> Guard<'a> { - pub fn disarm(mut self) { + impl<'a, T> Guard<'a, T> { + pub fn data(&self) -> &T { + &self.0.data + } + pub fn data_mut(&mut self) -> &mut T { + &mut self.0.data + } + + pub fn disarm(self) { match self.0.state { State::Clean => unreachable!("we set it to Armed in check_and_arm()"), State::Armed => { - *self = State::Clean; + self.0.state = State::Clean; } State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state") + unreachable!("we fail check_and_arm() if it's in that state: {at:?}") } } } } - impl<'a> Drop for Guard<'a> { + impl<'a, T> Drop for Guard<'a, T> { fn drop(&mut self) { match self.0.state { State::Clean => { @@ -148,18 +157,18 @@ mod poison { // still armed => poison it let at = Instant::now(); self.0.state = State::Poisoned { at }; - warn!(at=%at, "poisoning {}", self.0.what); + warn!(at=?at, "poisoning {}", self.0.what); } State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state") + unreachable!("we fail check_and_arm() if it's in that state: {at:?}") } } } } #[derive(thiserror::Error, Debug)] - enum Error { - #[error("poisoned at {at}: {what}")] + pub enum Error { + #[error("poisoned at {at:?}: {what}")] Poisoned { what: &'static str, at: Instant }, } } @@ -728,8 +737,8 @@ struct WalRedoProcess { tenant_shard_id: TenantShardId, // Some() on construction, only becomes None on Drop. child: Option, - stdout: tokio::sync::Mutex, - stdin: tokio::sync::Mutex, + stdout: tokio::sync::Mutex>, + stdin: tokio::sync::Mutex>, /// Counter to separate same sized walredo inputs failing at the same millisecond. #[cfg(feature = "testing")] dump_sequence: AtomicUsize, @@ -828,17 +837,21 @@ impl WalRedoProcess { conf, tenant_shard_id, child: Some(child), - stdin: tokio::sync::Mutex::new(ProcessInput { - stdin, - n_requests: 0, - poison: poison::Poison::clean("stdin"), - }), - stdout: tokio::sync::Mutex::new(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - poison: poison::Poison::clean("stdout"), - }), + stdin: tokio::sync::Mutex::new(Poison::new( + "stdin", + ProcessInput { + stdin, + n_requests: 0, + }, + )), + stdout: tokio::sync::Mutex::new(Poison::new( + "stdout", + ProcessOutput { + stdout, + pending_responses: VecDeque::new(), + n_processed_responses: 0, + }, + )), #[cfg(feature = "testing")] dump_sequence: AtomicUsize::default(), }) @@ -901,17 +914,20 @@ impl WalRedoProcess { } async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result { - let mut input = self.stdin.lock().await; - let poison = input.poison.check_and_arm()?; - input - .stdin - .write_all(writebuf) - .await - .context("write to walredo stdin")?; - let request_no = input.n_requests; - input.n_requests += 1; - poison.disarm(); - drop(input); + let request_no = { + let mut lock_guard = self.stdin.lock().await; + let mut poison_guard = lock_guard.check_and_arm()?; + let input = poison_guard.data_mut(); + input + .stdin + .write_all(writebuf) + .await + .context("write to walredo stdin")?; + let request_no = input.n_requests; + input.n_requests += 1; + poison_guard.disarm(); + request_no + }; // To improve walredo performance we separate sending requests and receiving // responses. Them are protected by different mutexes (output and input). @@ -925,8 +941,9 @@ impl WalRedoProcess { // pending responses ring buffer and truncate all empty elements from the front, // advancing processed responses number. - let mut output = self.stdout.lock().await; - let poison = output.poison.check_and_arm()?; + let mut lock_guard = self.stdout.lock().await; + let mut poison_guard = lock_guard.check_and_arm()?; + let output = poison_guard.data_mut(); let n_processed_responses = output.n_processed_responses; while n_processed_responses + output.pending_responses.len() <= request_no { // We expect the WAL redo process to respond with an 8k page image. We read it @@ -984,6 +1001,7 @@ impl WalRedoProcess { break; } } + poison_guard.disarm(); Ok(res) }