diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 4e4f79ab6b..298944f063 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -60,6 +60,10 @@ pub mod tracing_span_assert; pub mod rate_limit; +/// Primitive for coalescing operations into a single task which will not be cancelled by for +/// example external http client closing the connection. +pub mod shared_retryable; + mod failpoint_macro_helpers { /// use with fail::cfg("$name", "return(2000)") @@ -96,6 +100,7 @@ mod failpoint_macro_helpers { tracing::info!("failpoint {:?}: sleep done", name); } } + pub use failpoint_macro_helpers::failpoint_sleep_helper; /// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages diff --git a/libs/utils/src/shared_retryable.rs b/libs/utils/src/shared_retryable.rs new file mode 100644 index 0000000000..7174fe9d99 --- /dev/null +++ b/libs/utils/src/shared_retryable.rs @@ -0,0 +1,645 @@ +use std::sync::Arc; + +/// Container using which many request handlers can come together and join a single task to +/// completion instead of racing each other and their own cancellation. +/// +/// In a picture: +/// +/// ```text +/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt +/// \ / +/// request handler 1 ---->|--X +/// request handler 2 ---->|-------| +/// request handler 3 ---->|-------| +/// \------>/ +/// (X = cancelled during await) +/// ``` +/// +/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability +/// to just spawn the task, but this is needed for pageserver usage. +/// +/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the +/// caller to do the spawning before awaiting for the result. If the caller is dropped while this +/// happens, a new attempt will be required, and all concurrent awaiters will see a +/// [`RetriedTaskPanicked`] error. +/// +/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is +/// just provided for completeness, and it does not have a fully decomposed version like +/// `try_restart`. +/// +/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level +/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so +/// implementation advice is to log the happened error, and not propagate more than a label as the +/// "inner error" which will be used to build an outer error. The outer error will also have to be +/// convertable from [`RetriedTaskPanicked`] to absorb that case as well. +/// +/// ## Example +/// +/// A shared service value completes the infallible work once, even if called concurrently by +/// multiple cancellable tasks. +/// +/// ``` +/// use utils::shared_retryable::{SharedRetryable, Retryable, RetriedTaskPanicked}; +/// use std::sync::Arc; +/// +/// #[derive(Debug, Clone, Copy)] +/// enum OneLevelError { +/// TaskPanicked +/// } +/// +/// impl Retryable for OneLevelError { +/// fn is_permanent(&self) -> bool { +/// // for a single level errors, this wording is weird +/// !matches!(self, OneLevelError::TaskPanicked) +/// } +/// } +/// +/// impl From for OneLevelError { +/// fn from(_: RetriedTaskPanicked) -> Self { +/// OneLevelError::TaskPanicked +/// } +/// } +/// +/// #[derive(Clone, Default)] +/// struct Service(SharedRetryable>); +/// +/// impl Service { +/// async fn work(&self, completions: Arc) -> Result { +/// self.0.try_restart_spawn( +/// || async move { +/// // give time to cancel some of the tasks +/// tokio::time::sleep(std::time::Duration::from_secs(1)).await; +/// completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed); +/// Self::work_once().await +/// } +/// ) +/// .await +/// } +/// +/// async fn work_once() -> Result { +/// Ok(42) +/// } +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let svc = Service::default(); +/// +/// let mut js = tokio::task::JoinSet::new(); +/// +/// let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1)); +/// let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0)); +/// +/// let handles = (0..10).map(|_| js.spawn({ +/// let svc = svc.clone(); +/// let barrier = barrier.clone(); +/// let completions = completions.clone(); +/// async move { +/// // make sure all tasks are ready to start at the same time +/// barrier.wait().await; +/// // after successfully starting the work, any of the futures could get cancelled +/// svc.work(completions).await +/// } +/// })).collect::>(); +/// +/// barrier.wait().await; +/// +/// tokio::time::sleep(std::time::Duration::from_millis(100)).await; +/// +/// handles[5].abort(); +/// +/// let mut cancellations = 0; +/// +/// while let Some(res) = js.join_next().await { +/// // all complete with the same result +/// match res { +/// Ok(res) => assert_eq!(res.unwrap(), 42), +/// Err(je) => { +/// // except for the one task we cancelled; it's cancelling +/// // does not interfere with the result +/// assert!(je.is_cancelled()); +/// cancellations += 1; +/// assert_eq!(cancellations, 1, "only 6th task was aborted"); +/// } +/// } +/// } +/// +/// // there will be at most one terminal completion +/// assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1); +/// } +/// ``` +#[derive(Clone)] +pub struct SharedRetryable { + inner: Arc>>>, +} + +impl Default for SharedRetryable { + fn default() -> Self { + Self { + inner: Arc::new(tokio::sync::Mutex::new(None)), + } + } +} + +/// Determine if an error is transient or permanent. +pub trait Retryable { + fn is_permanent(&self) -> bool { + true + } +} + +/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]). +#[derive(Debug, PartialEq, Eq)] +pub struct RetriedTaskPanicked; + +impl SharedRetryable> +where + T: Clone + std::fmt::Debug + Send + 'static, + E1: Clone + Retryable + std::fmt::Debug + Send + 'static, +{ + /// Restart a previously failed operation unless it already completed with a terminal result. + /// + /// Many tasks can call this function and and get the terminal result from an earlier attempt + /// or start a new attempt, or join an existing one. + /// + /// Compared to `Self::try_restart`, this method also spawns the future to run, which would + /// otherwise have to be done manually. + pub async fn try_restart_spawn(&self, retry_with: F) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future> + Send + 'static, + E2: From + From + Send + 'static, + { + let (recv, maybe_fut) = self.try_restart(retry_with).await; + + if let Some(fut) = maybe_fut { + // top level function, we must spawn, pageserver cannot use this + tokio::spawn(fut); + } + + recv.await + } + + /// Restart a previously failed operation unless it already completed with a terminal result. + /// + /// Many tasks can call this function and get the terminal result from an earlier attempt or + /// start a new attempt, or join an existing one. + /// + /// If a task calling this method is cancelled, at worst, a new attempt which is immediatedly + /// deemed as having panicked will happen, but without a panic ever happening. + /// + /// Returns one future for waiting for the result and possibly another which needs to be + /// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first + /// future will never make progress. + /// + /// This complication exists because on pageserver we cannot use `tokio::spawn` directly + /// at this time. + pub async fn try_restart( + &self, + retry_with: F, + ) -> ( + impl std::future::Future> + Send + 'static, + Option + Send + 'static>, + ) + where + F: FnOnce() -> Fut, + Fut: std::future::Future> + Send + 'static, + E2: From + Send + 'static, + E2: From, + { + use futures::future::Either; + + match self.decide_to_retry_or_join(retry_with).await { + Ok(terminal) => (Either::Left(async move { terminal }), None), + Err((rx, maybe_fut)) => { + let recv = Self::make_oneshot_alike_receiver(rx); + + (Either::Right(recv), maybe_fut) + } + } + } + + async fn decide_to_retry_or_join( + &self, + retry_with: F, + ) -> Result< + Result, + ( + tokio::sync::broadcast::Receiver>, + Option + Send + 'static>, + ), + > + where + F: FnOnce() -> Fut, + Fut: std::future::Future> + Send + 'static, + E2: From, + E2: From, + { + let mut g = self.inner.lock().await; + + let maybe_rx = match g.as_ref() { + Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())), + Some(MaybeDone::Done(Err(e))) if e.is_permanent() => { + return Ok(Err(E2::from(e.to_owned()))) + } + Some(MaybeDone::Pending(weak)) => { + // failure to upgrade can mean only one thing: there was an unexpected + // panic which we consider as a transient retryable error. + weak.upgrade() + } + Some(MaybeDone::Done(Err(_retryable))) => None, + None => None, + }; + + let (strong, maybe_fut) = match maybe_rx { + Some(strong) => (strong, None), + None => { + // new attempt + // panic safety: invoke the factory before configuring the pending value + let fut = retry_with(); + + let (strong, fut) = self.make_run_and_complete(fut, &mut g); + (strong, Some(fut)) + } + }; + + // important: the Arc is not held after unlocking + // important: we resubscribe before lock is released to be sure to get a message which + // is sent once receiver is dropped + let rx = strong.resubscribe(); + drop(strong); + Err((rx, maybe_fut)) + } + + /// Spawn and configure a new attempt. + /// + /// Returns an `Arc>` which is valid until the attempt completes, and the future + /// which will run to completion outside the lifecycle of the caller. + fn make_run_and_complete( + &self, + fut: Fut, + g: &mut tokio::sync::MutexGuard<'_, Option>>>, + ) -> ( + Arc>>, + impl std::future::Future + Send + 'static, + ) + where + Fut: std::future::Future> + Send + 'static, + { + #[cfg(debug_assertions)] + match &**g { + Some(MaybeDone::Pending(weak)) => { + assert!( + weak.upgrade().is_none(), + "when starting a restart, should no longer have an upgradeable channel" + ); + } + Some(MaybeDone::Done(Err(err))) => { + assert!( + !err.is_permanent(), + "when restarting, the err must be transient" + ); + } + Some(MaybeDone::Done(Ok(_))) => { + panic!("unexpected restart after a completion on MaybeDone"); + } + None => {} + } + + self.make_run_and_complete_any(fut, g) + } + + /// Oneshot alike as in it's a future which will be consumed by an `await`. + /// + /// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple + /// times. + fn make_oneshot_alike_receiver( + mut rx: tokio::sync::broadcast::Receiver>, + ) -> impl std::future::Future> + Send + 'static + where + E2: From, + E2: From, + { + use tokio::sync::broadcast::error::RecvError; + + async move { + match rx.recv().await { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(E2::from(e)), + Err(RecvError::Closed | RecvError::Lagged(_)) => { + // lagged doesn't mean anything with 1 send, but whatever, handle it the same + // this case should only ever happen if a panick happened in the `fut`. + Err(E2::from(RetriedTaskPanicked)) + } + } + } + } +} + +impl SharedRetryable +where + V: std::fmt::Debug + Clone + Send + 'static, +{ + /// Attempt to run once a spawned future to completion. + /// + /// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be + /// returned when the most recent attempt panicked. + pub async fn attempt_spawn(&self, attempt_with: F) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future + Send + 'static, + { + let (rx, maybe_fut) = { + let mut g = self.inner.lock().await; + + let maybe_rx = match g.as_ref() { + Some(MaybeDone::Done(v)) => return Ok(v.to_owned()), + Some(MaybeDone::Pending(weak)) => { + // see comment in try_restart + weak.upgrade() + } + None => None, + }; + + let (strong, maybe_fut) = match maybe_rx { + Some(strong) => (strong, None), + None => { + let fut = attempt_with(); + + let (strong, fut) = self.make_run_and_complete_any(fut, &mut g); + (strong, Some(fut)) + } + }; + + // see try_restart for important notes + let rx = strong.resubscribe(); + drop(strong); + (rx, maybe_fut) + }; + + if let Some(fut) = maybe_fut { + // this is a top level function, need to spawn directly + // from pageserver one wouldn't use this but more piecewise functions + tokio::spawn(fut); + } + + let recv = Self::make_oneshot_alike_receiver_any(rx); + + recv.await + } + + /// Configure a new attempt, but leave spawning it to the caller. + /// + /// Forgetting the returned future is outside of scope of any correctness guarantees; all of + /// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping + /// and not running the future will lead to busy looping behaviour. + /// + /// Also returns an `Arc>` which is valid until the attempt completes. + fn make_run_and_complete_any( + &self, + fut: Fut, + g: &mut tokio::sync::MutexGuard<'_, Option>>, + ) -> ( + Arc>, + impl std::future::Future + Send + 'static, + ) + where + Fut: std::future::Future + Send + 'static, + { + let (tx, rx) = tokio::sync::broadcast::channel(1); + let strong = Arc::new(rx); + + **g = Some(MaybeDone::Pending(Arc::downgrade(&strong))); + + let retry = { + let strong = strong.clone(); + let this = self.clone(); + async move { this.run_and_complete(fut, tx, strong).await } + }; + + #[cfg(debug_assertions)] + match &**g { + Some(MaybeDone::Pending(weak)) => { + let rx = weak.upgrade().expect("holding the weak and strong locally"); + assert!(Arc::ptr_eq(&strong, &rx)); + } + _ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"), + } + + (strong, retry) + } + + /// Run the actual attempt, and communicate the response via both: + /// - setting the `MaybeDone::Done` + /// - the broadcast channel + async fn run_and_complete( + &self, + fut: Fut, + tx: tokio::sync::broadcast::Sender, + strong: Arc>, + ) where + Fut: std::future::Future, + { + let res = fut.await; + + { + let mut g = self.inner.lock().await; + MaybeDone::complete(&mut *g, &strong, res.clone()); + + // make the weak un-upgradeable by dropping the final alive + // reference to it. it is final Arc because the Arc never escapes + // the critical section. + Arc::try_unwrap(strong).expect("expected this to be the only Arc>"); + } + + // now no one can get the Pending(weak) value to upgrade and they only see + // the Done(res). + // + // send the result value to listeners, if any + drop(tx.send(res)); + } + + fn make_oneshot_alike_receiver_any( + mut rx: tokio::sync::broadcast::Receiver, + ) -> impl std::future::Future> + Send + 'static { + use tokio::sync::broadcast::error::RecvError; + + async move { + match rx.recv().await { + Ok(t) => Ok(t), + Err(RecvError::Closed | RecvError::Lagged(_)) => { + // lagged doesn't mean anything with 1 send, but whatever, handle it the same + // this case should only ever happen if a panick happened in the `fut`. + Err(RetriedTaskPanicked) + } + } + } + } +} + +/// MaybeDone handles synchronization for multiple requests and the single actual task. +/// +/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a +/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails, +/// this means that "disconnect" has happened in the past. +/// +/// On successful execution the one executing task will set this to `Done` variant, with the actual +/// resulting value. +#[derive(Debug)] +pub enum MaybeDone { + Pending(std::sync::Weak>), + Done(V), +} + +impl MaybeDone { + fn complete( + this: &mut Option>, + _expected_rx: &Arc>, + outcome: V, + ) { + #[cfg(debug_assertions)] + match this { + Some(MaybeDone::Pending(weak)) => { + let same = weak + .upgrade() + // we don't yet have Receiver::same_channel + .map(|rx| Arc::ptr_eq(&_expected_rx, &rx)) + .unwrap_or(false); + assert!(same, "different channel had been replaced or dropped"); + } + other => panic!("unexpected MaybeDone: {other:?}"), + } + + *this = Some(MaybeDone::Done(outcome)); + } +} + +#[cfg(test)] +mod tests { + + use super::{RetriedTaskPanicked, Retryable, SharedRetryable}; + + #[derive(Debug)] + enum OuterError { + AttemptPanicked, + Unlucky, + } + + #[derive(Clone, Debug)] + enum InnerError { + Unlucky, + } + + impl Retryable for InnerError { + fn is_permanent(&self) -> bool { + false + } + } + + impl From for OuterError { + fn from(_: InnerError) -> Self { + OuterError::Unlucky + } + } + + impl From for OuterError { + fn from(_: RetriedTaskPanicked) -> Self { + OuterError::AttemptPanicked + } + } + + #[tokio::test] + async fn restartable_until_permanent() { + let shr = SharedRetryable::>::default(); + + let res = shr + .try_restart_spawn(|| async move { todo!("really unlucky") }) + .await; + + assert!(matches!(res, Err(OuterError::AttemptPanicked))); + + let res = shr + .try_restart_spawn(|| async move { Err(InnerError::Unlucky) }) + .await; + + assert!(matches!(res, Err(OuterError::Unlucky))); + + let res = shr.try_restart_spawn(|| async move { Ok(42) }).await; + + assert!(matches!(res, Ok::(42))); + + let res = shr + .try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") }) + .await; + + assert!(matches!(res, Ok::(42))); + } + + /// Demonstration of the SharedRetryable::attempt + #[tokio::test] + async fn attemptable_until_no_panic() { + let shr = SharedRetryable::::default(); + + let res = shr + .attempt_spawn(|| async move { panic!("should not interfere") }) + .await; + + assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}"); + + let res = shr.attempt_spawn(|| async move { 42 }).await; + + assert_eq!(res, Ok(42)); + + let res = shr + .attempt_spawn(|| async move { panic!("should not be called") }) + .await; + + assert_eq!(res, Ok(42)); + } + + #[tokio::test] + async fn cancelling_spawner_is_fine() { + let shr = SharedRetryable::>::default(); + + let (recv1, maybe_fut) = shr + .try_restart(|| async move { panic!("should not have been called") }) + .await; + let should_be_spawned = maybe_fut.unwrap(); + + let (recv2, maybe_fut) = shr + .try_restart(|| async move { + panic!("should never be called because waiting on should_be_spawned") + }) + .await; + assert!( + matches!(maybe_fut, None), + "only the first one should had created the future" + ); + + let mut recv1 = std::pin::pin!(recv1); + let mut recv2 = std::pin::pin!(recv2); + + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {}, + _ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"), + _ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"), + } + + drop(should_be_spawned); + + let res = recv1.await; + assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}"); + + let res = recv2.await; + assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}"); + + // but we can still reach a terminal state if the api is not misused or the + // should_be_spawned winner is not cancelled + + let recv1 = shr.try_restart_spawn::<_, _, OuterError>(|| async move { Ok(42) }); + let recv2 = shr.try_restart_spawn::<_, _, OuterError>(|| async move { Ok(43) }); + + assert_eq!(recv1.await.unwrap(), 42); + assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned"); + } +}