refactor: inline None into MaybeDone::NotStarted

This commit is contained in:
Joonas Koivunen
2023-05-22 11:09:27 +03:00
parent c8b35324f6
commit 69e92372c8

View File

@@ -46,13 +46,13 @@ use std::sync::Arc;
/// Example moved as a test `service_example`.
#[derive(Clone)]
pub struct SharedRetryable<V> {
inner: Arc<tokio::sync::Mutex<Option<MaybeDone<V>>>>,
inner: Arc<tokio::sync::Mutex<MaybeDone<V>>>,
}
impl<V> Default for SharedRetryable<V> {
fn default() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(None)),
inner: Arc::new(tokio::sync::Mutex::new(MaybeDone::default())),
}
}
}
@@ -173,18 +173,16 @@ where
{
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())),
Some(MaybeDone::Done(Err(e))) if e.is_permanent() => {
return Ok(Err(E2::from(e.to_owned())))
}
Some(MaybeDone::Pending(weak)) => {
let maybe_rx = match &*g {
MaybeDone::Done(Ok(t)) => return Ok(Ok(t.to_owned())),
MaybeDone::Done(Err(e)) if e.is_permanent() => return Ok(Err(E2::from(e.to_owned()))),
MaybeDone::Pending(weak) => {
// failure to upgrade can mean only one thing: there was an unexpected
// panic which we consider as a transient retryable error.
weak.upgrade()
}
Some(MaybeDone::Done(Err(_retryable))) => None,
None => None,
MaybeDone::Done(Err(_retryable)) => None,
MaybeDone::NotStarted => None,
};
let (strong, maybe_fut) = match maybe_rx {
@@ -214,29 +212,29 @@ where
fn make_run_and_complete(
&self,
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<Result<T, E1>>>>,
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<Result<T, E1>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
impl Future<Output = ()> + Send + 'static,
) {
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
MaybeDone::Pending(weak) => {
assert!(
weak.upgrade().is_none(),
"when starting a restart, should no longer have an upgradeable channel"
);
}
Some(MaybeDone::Done(Err(err))) => {
MaybeDone::Done(Err(err)) => {
assert!(
!err.is_permanent(),
"when restarting, the err must be transient"
);
}
Some(MaybeDone::Done(Ok(_))) => {
MaybeDone::Done(Ok(_)) => {
panic!("unexpected restart after a completion on MaybeDone");
}
None => {}
MaybeDone::NotStarted => {}
}
self.make_run_and_complete_any(fut, g)
@@ -282,13 +280,13 @@ where
let (rx, maybe_fut) = {
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(v)) => return Ok(v.to_owned()),
Some(MaybeDone::Pending(weak)) => {
let maybe_rx = match &*g {
MaybeDone::Done(v) => return Ok(v.to_owned()),
MaybeDone::Pending(weak) => {
// see comment in try_restart
weak.upgrade()
}
None => None,
MaybeDone::NotStarted => None,
};
let (strong, maybe_fut) = match maybe_rx {
@@ -328,7 +326,7 @@ where
fn make_run_and_complete_any(
&self,
fut: impl Future<Output = V> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<V>>>,
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<V>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<V>>,
impl Future<Output = ()> + Send + 'static,
@@ -336,7 +334,7 @@ where
let (tx, rx) = tokio::sync::broadcast::channel(1);
let strong = Arc::new(rx);
**g = Some(MaybeDone::Pending(Arc::downgrade(&strong)));
**g = MaybeDone::Pending(Arc::downgrade(&strong));
let retry = {
let strong = strong.clone();
@@ -345,7 +343,7 @@ where
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
MaybeDone::Pending(weak) => {
let rx = weak.upgrade().expect("holding the weak and strong locally");
assert!(Arc::ptr_eq(&strong, &rx));
}
@@ -368,7 +366,7 @@ where
{
let mut g = self.inner.lock().await;
MaybeDone::complete(&mut *g, &strong, res.clone());
g.complete(&strong, res.clone());
// make the weak un-upgradeable by dropping the final alive
// reference to it. it is final Arc because the Arc never escapes
@@ -408,21 +406,19 @@ where
///
/// On successful execution the one executing task will set this to `Done` variant, with the actual
/// resulting value.
#[derive(Debug)]
#[derive(Debug, Default)]
pub enum MaybeDone<V> {
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
Done(V),
#[default]
NotStarted,
}
impl<V: std::fmt::Debug> MaybeDone<V> {
fn complete(
this: &mut Option<MaybeDone<V>>,
_strong: &Arc<tokio::sync::broadcast::Receiver<V>>,
outcome: V,
) {
fn complete(&mut self, _strong: &Arc<tokio::sync::broadcast::Receiver<V>>, outcome: V) {
#[cfg(debug_assertions)]
match this {
Some(MaybeDone::Pending(weak)) => {
match self {
MaybeDone::Pending(weak) => {
let same = weak
.upgrade()
// we don't yet have Receiver::same_channel
@@ -433,7 +429,7 @@ impl<V: std::fmt::Debug> MaybeDone<V> {
other => panic!("unexpected MaybeDone: {other:?}"),
}
*this = Some(MaybeDone::Done(outcome));
*self = MaybeDone::Done(outcome);
}
}