From 96413743e4ab53007566659dd624be6cfe38b3e5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 31 Jan 2024 16:24:19 +0000 Subject: [PATCH] move `poison` to `utils` and document --- Cargo.lock | 14 +++++ Cargo.toml | 1 + libs/utils/Cargo.toml | 1 + libs/utils/src/lib.rs | 2 + libs/utils/src/poison.rs | 120 ++++++++++++++++++++++++++++++++++++++ pageserver/src/walredo.rs | 93 +---------------------------- 6 files changed, 139 insertions(+), 92 deletions(-) create mode 100644 libs/utils/src/poison.rs diff --git a/Cargo.lock b/Cargo.lock index 6e91363de8..a9b9f9c7bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5648,6 +5648,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-test" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89b3cbabd3ae862100094ae433e1def582cf86451b4e9bf83aa7ac1d8a7d719" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.20.0" @@ -6135,6 +6148,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-test", "tokio-util", "tracing", "tracing-error", diff --git a/Cargo.toml b/Cargo.toml index 8afab02b15..0254ea24e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,7 @@ tokio-postgres-rustls = "0.10.0" tokio-rustls = "0.24" tokio-stream = "0.1" tokio-tar = "0.3" +tokio-test = "0.4.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.7" toml_edit = "0.19" diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 706b7a3187..7dee7e3963 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -66,6 +66,7 @@ criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true serde_assert.workspace = true +tokio-test.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 890061dc59..fd4e068b39 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -87,6 +87,8 @@ pub mod failpoint_support; pub mod yielding_loop; +pub mod poison; + /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages /// /// we have several cases: diff --git a/libs/utils/src/poison.rs b/libs/utils/src/poison.rs new file mode 100644 index 0000000000..d0ac77df47 --- /dev/null +++ b/libs/utils/src/poison.rs @@ -0,0 +1,120 @@ +//! Protect a piece of state from reuse after it is left in an inconsistent state. +//! +//! # Example +//! +//! ``` +//! # tokio_test::block_on(async { +//! use utils::poison::Poison; +//! use std::time::Duration; +//! +//! struct State { +//! clean: bool, +//! } +//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true })); +//! +//! let mut mutex_guard = state.lock().await; +//! let mut poison_guard = mutex_guard.check_and_arm()?; +//! let state = poison_guard.data_mut(); +//! state.clean = false; +//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail. +//! tokio::time::sleep(Duration::from_secs(10)).await; +//! state.clean = true; +//! poison_guard.disarm(); +//! # Ok::<(), utils::poison::Error>(()) +//! # }); +//! ``` + +use std::time::Instant; + +use tracing::warn; + +pub struct Poison { + what: &'static str, + state: State, + data: T, +} + +#[derive(Clone, Copy)] +enum State { + Clean, + Armed, + Poisoned { at: Instant }, +} + +impl Poison { + /// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed. + pub fn new(what: &'static str, data: T) -> Self { + Self { + what, + state: State::Clean, + data, + } + } + + /// Check for poisoning and return a [`Guard`] that provides access to the wrapped state. + pub fn check_and_arm(&mut self) -> Result, Error> { + match self.state { + State::Clean => { + self.state = State::Armed; + Ok(Guard(self)) + } + State::Armed => unreachable!("transient state"), + State::Poisoned { at } => Err(Error::Poisoned { + what: self.what, + at, + }), + } + } +} + +/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state. +/// Once modifications are done, use [`Self::disarm`]. +/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned +/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error. +pub struct Guard<'a, T>(&'a mut Poison); + +impl<'a, T> Guard<'a, T> { + pub fn data(&self) -> &T { + &self.0.data + } + pub fn data_mut(&mut self) -> &mut T { + &mut self.0.data + } + + pub fn disarm(self) { + match self.0.state { + State::Clean => unreachable!("we set it to Armed in check_and_arm()"), + State::Armed => { + self.0.state = State::Clean; + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state: {at:?}") + } + } + } +} + +impl<'a, T> Drop for Guard<'a, T> { + fn drop(&mut self) { + match self.0.state { + State::Clean => { + // set by disarm() + } + State::Armed => { + // still armed => poison it + let at = Instant::now(); + self.0.state = State::Poisoned { at }; + warn!(at=?at, "poisoning {}", self.0.what); + } + State::Poisoned { at } => { + unreachable!("we fail check_and_arm() if it's in that state: {at:?}") + } + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("poisoned at {at:?}: {what}")] + Poisoned { what: &'static str, at: Instant }, +} diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index dd0d703a85..45a3fbb626 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -34,6 +34,7 @@ use std::time::Duration; use std::time::Instant; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::*; +use utils::poison::Poison; use utils::{bin_ser::BeSer, lsn::Lsn}; #[cfg(feature = "testing")] @@ -58,8 +59,6 @@ use postgres_ffi::v14::nonrelfile_utils::{ }; use postgres_ffi::BLCKSZ; -use self::poison::Poison; - /// /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. /// @@ -83,96 +82,6 @@ struct ProcessOutput { n_processed_responses: usize, } -mod poison { - use std::time::Instant; - - use tracing::warn; - - pub struct Poison { - what: &'static str, - state: State, - data: T, - } - - #[derive(Clone, Copy)] - enum State { - Clean, - Armed, - Poisoned { at: Instant }, - } - - impl Poison { - pub fn new(what: &'static str, data: T) -> Self { - Self { - what, - state: State::Clean, - data, - } - } - pub fn check_and_arm(&mut self) -> Result, Error> { - match self.state { - State::Clean => { - self.state = State::Armed; - Ok(Guard(self)) - } - State::Armed => unreachable!("transient state"), - State::Poisoned { at } => Err(Error::Poisoned { - what: self.what, - at, - }), - } - } - } - - pub struct Guard<'a, T>(&'a mut Poison); - - impl<'a, T> Guard<'a, T> { - pub fn data(&self) -> &T { - &self.0.data - } - pub fn data_mut(&mut self) -> &mut T { - &mut self.0.data - } - - pub fn disarm(self) { - match self.0.state { - State::Clean => unreachable!("we set it to Armed in check_and_arm()"), - State::Armed => { - self.0.state = State::Clean; - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at:?}") - } - } - } - } - - impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - match self.0.state { - State::Clean => { - // set by disarm() - } - State::Armed => { - // still armed => poison it - let at = Instant::now(); - self.0.state = State::Poisoned { at }; - warn!(at=?at, "poisoning {}", self.0.what); - } - State::Poisoned { at } => { - unreachable!("we fail check_and_arm() if it's in that state: {at:?}") - } - } - } - } - - #[derive(thiserror::Error, Debug)] - pub enum Error { - #[error("poisoned at {at:?}: {what}")] - Poisoned { what: &'static str, at: Instant }, - } -} - /// /// This is the real implementation that uses a Postgres process to /// perform WAL replay. Only one thread can use the process at a time,