working impl

This commit is contained in:
Christian Schwarz
2024-01-31 16:08:40 +00:00
parent 70b37cf88f
commit b1b8ca32c8

View File

@@ -58,6 +58,8 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use self::poison::Poison;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -73,14 +75,12 @@ pub(crate) struct BufferTag {
struct ProcessInput {
stdin: tokio::process::ChildStdin,
n_requests: usize,
poison: poison::Poison,
}
struct ProcessOutput {
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
poison: poison::Poison,
}
mod poison {
@@ -88,9 +88,10 @@ mod poison {
use tracing::warn;
pub struct Poison {
pub struct Poison<T> {
what: &'static str,
state: State,
data: T,
}
#[derive(Clone, Copy)]
@@ -100,17 +101,18 @@ mod poison {
Poisoned { at: Instant },
}
impl Poison {
pub fn new(what: &'static str) -> Self {
impl<T> Poison<T> {
pub fn new(what: &'static str, data: T) -> Self {
Self {
what,
state: State::Clean,
data,
}
}
pub fn check_and_arm(&mut self) -> Result<Guard, Error> {
pub fn check_and_arm(&mut self) -> Result<Guard<T>, Error> {
match self.state {
State::Clean => {
*self = State::Armed;
self.state = State::Armed;
Ok(Guard(self))
}
State::Armed => unreachable!("transient state"),
@@ -122,23 +124,30 @@ mod poison {
}
}
struct Guard<'a>(&'a mut Poison);
pub struct Guard<'a, T>(&'a mut Poison<T>);
impl<'a> Guard<'a> {
pub fn disarm(mut self) {
impl<'a, T> Guard<'a, T> {
pub fn data(&self) -> &T {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut T {
&mut self.0.data
}
pub fn disarm(self) {
match self.0.state {
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
State::Armed => {
*self = State::Clean;
self.0.state = State::Clean;
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state")
unreachable!("we fail check_and_arm() if it's in that state: {at:?}")
}
}
}
}
impl<'a> Drop for Guard<'a> {
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
match self.0.state {
State::Clean => {
@@ -148,18 +157,18 @@ mod poison {
// still armed => poison it
let at = Instant::now();
self.0.state = State::Poisoned { at };
warn!(at=%at, "poisoning {}", self.0.what);
warn!(at=?at, "poisoning {}", self.0.what);
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state")
unreachable!("we fail check_and_arm() if it's in that state: {at:?}")
}
}
}
}
#[derive(thiserror::Error, Debug)]
enum Error {
#[error("poisoned at {at}: {what}")]
pub enum Error {
#[error("poisoned at {at:?}: {what}")]
Poisoned { what: &'static str, at: Instant },
}
}
@@ -728,8 +737,8 @@ struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: tokio::sync::Mutex<ProcessOutput>,
stdin: tokio::sync::Mutex<ProcessInput>,
stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -828,17 +837,21 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin,
n_requests: 0,
poison: poison::Poison::clean("stdin"),
}),
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
poison: poison::Poison::clean("stdout"),
}),
stdin: tokio::sync::Mutex::new(Poison::new(
"stdin",
ProcessInput {
stdin,
n_requests: 0,
},
)),
stdout: tokio::sync::Mutex::new(Poison::new(
"stdout",
ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
},
)),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
@@ -901,17 +914,20 @@ impl WalRedoProcess {
}
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let mut input = self.stdin.lock().await;
let poison = input.poison.check_and_arm()?;
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
poison.disarm();
drop(input);
let request_no = {
let mut lock_guard = self.stdin.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let input = poison_guard.data_mut();
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
poison_guard.disarm();
request_no
};
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
@@ -925,8 +941,9 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().await;
let poison = output.poison.check_and_arm()?;
let mut lock_guard = self.stdout.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let output = poison_guard.data_mut();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
@@ -984,6 +1001,7 @@ impl WalRedoProcess {
break;
}
}
poison_guard.disarm();
Ok(res)
}