WIP poison

This commit is contained in:
Christian Schwarz
2024-01-31 15:57:34 +00:00
parent 4160d407fb
commit 70b37cf88f

View File

@@ -73,12 +73,95 @@ pub(crate) struct BufferTag {
struct ProcessInput {
stdin: tokio::process::ChildStdin,
n_requests: usize,
poison: poison::Poison,
}
struct ProcessOutput {
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
poison: poison::Poison,
}
mod poison {
use std::time::Instant;
use tracing::warn;
pub struct Poison {
what: &'static str,
state: State,
}
#[derive(Clone, Copy)]
enum State {
Clean,
Armed,
Poisoned { at: Instant },
}
impl Poison {
pub fn new(what: &'static str) -> Self {
Self {
what,
state: State::Clean,
}
}
pub fn check_and_arm(&mut self) -> Result<Guard, Error> {
match self.state {
State::Clean => {
*self = State::Armed;
Ok(Guard(self))
}
State::Armed => unreachable!("transient state"),
State::Poisoned { at } => Err(Error::Poisoned {
what: self.what,
at,
}),
}
}
}
struct Guard<'a>(&'a mut Poison);
impl<'a> Guard<'a> {
pub fn disarm(mut self) {
match self.0.state {
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
State::Armed => {
*self = State::Clean;
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state")
}
}
}
}
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
match self.0.state {
State::Clean => {
// set by disarm()
}
State::Armed => {
// still armed => poison it
let at = Instant::now();
self.0.state = State::Poisoned { at };
warn!(at=%at, "poisoning {}", self.0.what);
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state")
}
}
}
}
#[derive(thiserror::Error, Debug)]
enum Error {
#[error("poisoned at {at}: {what}")]
Poisoned { what: &'static str, at: Instant },
}
}
///
@@ -748,11 +831,13 @@ impl WalRedoProcess {
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin,
n_requests: 0,
poison: poison::Poison::clean("stdin"),
}),
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
poison: poison::Poison::clean("stdout"),
}),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
@@ -817,6 +902,7 @@ impl WalRedoProcess {
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let mut input = self.stdin.lock().await;
let poison = input.poison.check_and_arm()?;
input
.stdin
.write_all(writebuf)
@@ -824,6 +910,7 @@ impl WalRedoProcess {
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
poison.disarm();
drop(input);
// To improve walredo performance we separate sending requests and receiving
@@ -839,6 +926,7 @@ impl WalRedoProcess {
// advancing processed responses number.
let mut output = self.stdout.lock().await;
let poison = output.poison.check_and_arm()?;
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it