From 69e92372c8057b6f0e8d3a23be3f14933e91c259 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 22 May 2023 11:09:27 +0300 Subject: [PATCH] refactor: inline None into MaybeDone::NotStarted --- libs/utils/src/shared_retryable.rs | 60 ++++++++++++++---------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/libs/utils/src/shared_retryable.rs b/libs/utils/src/shared_retryable.rs index 0bae05b2c2..f2a74e40c4 100644 --- a/libs/utils/src/shared_retryable.rs +++ b/libs/utils/src/shared_retryable.rs @@ -46,13 +46,13 @@ use std::sync::Arc; /// Example moved as a test `service_example`. #[derive(Clone)] pub struct SharedRetryable { - inner: Arc>>>, + inner: Arc>>, } impl Default for SharedRetryable { fn default() -> Self { Self { - inner: Arc::new(tokio::sync::Mutex::new(None)), + inner: Arc::new(tokio::sync::Mutex::new(MaybeDone::default())), } } } @@ -173,18 +173,16 @@ where { let mut g = self.inner.lock().await; - let maybe_rx = match g.as_ref() { - Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())), - Some(MaybeDone::Done(Err(e))) if e.is_permanent() => { - return Ok(Err(E2::from(e.to_owned()))) - } - Some(MaybeDone::Pending(weak)) => { + let maybe_rx = match &*g { + MaybeDone::Done(Ok(t)) => return Ok(Ok(t.to_owned())), + MaybeDone::Done(Err(e)) if e.is_permanent() => return Ok(Err(E2::from(e.to_owned()))), + MaybeDone::Pending(weak) => { // failure to upgrade can mean only one thing: there was an unexpected // panic which we consider as a transient retryable error. weak.upgrade() } - Some(MaybeDone::Done(Err(_retryable))) => None, - None => None, + MaybeDone::Done(Err(_retryable)) => None, + MaybeDone::NotStarted => None, }; let (strong, maybe_fut) = match maybe_rx { @@ -214,29 +212,29 @@ where fn make_run_and_complete( &self, fut: impl Future> + Send + 'static, - g: &mut tokio::sync::MutexGuard<'_, Option>>>, + g: &mut tokio::sync::MutexGuard<'_, MaybeDone>>, ) -> ( Arc>>, impl Future + Send + 'static, ) { #[cfg(debug_assertions)] match &**g { - Some(MaybeDone::Pending(weak)) => { + MaybeDone::Pending(weak) => { assert!( weak.upgrade().is_none(), "when starting a restart, should no longer have an upgradeable channel" ); } - Some(MaybeDone::Done(Err(err))) => { + MaybeDone::Done(Err(err)) => { assert!( !err.is_permanent(), "when restarting, the err must be transient" ); } - Some(MaybeDone::Done(Ok(_))) => { + MaybeDone::Done(Ok(_)) => { panic!("unexpected restart after a completion on MaybeDone"); } - None => {} + MaybeDone::NotStarted => {} } self.make_run_and_complete_any(fut, g) @@ -282,13 +280,13 @@ where let (rx, maybe_fut) = { let mut g = self.inner.lock().await; - let maybe_rx = match g.as_ref() { - Some(MaybeDone::Done(v)) => return Ok(v.to_owned()), - Some(MaybeDone::Pending(weak)) => { + let maybe_rx = match &*g { + MaybeDone::Done(v) => return Ok(v.to_owned()), + MaybeDone::Pending(weak) => { // see comment in try_restart weak.upgrade() } - None => None, + MaybeDone::NotStarted => None, }; let (strong, maybe_fut) = match maybe_rx { @@ -328,7 +326,7 @@ where fn make_run_and_complete_any( &self, fut: impl Future + Send + 'static, - g: &mut tokio::sync::MutexGuard<'_, Option>>, + g: &mut tokio::sync::MutexGuard<'_, MaybeDone>, ) -> ( Arc>, impl Future + Send + 'static, @@ -336,7 +334,7 @@ where let (tx, rx) = tokio::sync::broadcast::channel(1); let strong = Arc::new(rx); - **g = Some(MaybeDone::Pending(Arc::downgrade(&strong))); + **g = MaybeDone::Pending(Arc::downgrade(&strong)); let retry = { let strong = strong.clone(); @@ -345,7 +343,7 @@ where #[cfg(debug_assertions)] match &**g { - Some(MaybeDone::Pending(weak)) => { + MaybeDone::Pending(weak) => { let rx = weak.upgrade().expect("holding the weak and strong locally"); assert!(Arc::ptr_eq(&strong, &rx)); } @@ -368,7 +366,7 @@ where { let mut g = self.inner.lock().await; - MaybeDone::complete(&mut *g, &strong, res.clone()); + g.complete(&strong, res.clone()); // make the weak un-upgradeable by dropping the final alive // reference to it. it is final Arc because the Arc never escapes @@ -408,21 +406,19 @@ where /// /// On successful execution the one executing task will set this to `Done` variant, with the actual /// resulting value. -#[derive(Debug)] +#[derive(Debug, Default)] pub enum MaybeDone { Pending(std::sync::Weak>), Done(V), + #[default] + NotStarted, } impl MaybeDone { - fn complete( - this: &mut Option>, - _strong: &Arc>, - outcome: V, - ) { + fn complete(&mut self, _strong: &Arc>, outcome: V) { #[cfg(debug_assertions)] - match this { - Some(MaybeDone::Pending(weak)) => { + match self { + MaybeDone::Pending(weak) => { let same = weak .upgrade() // we don't yet have Receiver::same_channel @@ -433,7 +429,7 @@ impl MaybeDone { other => panic!("unexpected MaybeDone: {other:?}"), } - *this = Some(MaybeDone::Done(outcome)); + *self = MaybeDone::Done(outcome); } }