diff --git a/libs/utils/src/shared_retryable.rs b/libs/utils/src/shared_retryable.rs index 06430d83a9..d99e821b9a 100644 --- a/libs/utils/src/shared_retryable.rs +++ b/libs/utils/src/shared_retryable.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::future::Future; +use std::sync::Arc; /// Container using which many request handlers can come together and join a single task to /// completion instead of racing each other and their own cancellation. @@ -244,23 +244,21 @@ where /// /// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple /// times. - fn make_oneshot_alike_receiver( + async fn make_oneshot_alike_receiver( mut rx: tokio::sync::broadcast::Receiver>, - ) -> impl Future> + Send + 'static + ) -> Result where E2: From + From, { use tokio::sync::broadcast::error::RecvError; - async move { - match rx.recv().await { - Ok(Ok(t)) => Ok(t), - Ok(Err(e)) => Err(E2::from(e)), - Err(RecvError::Closed | RecvError::Lagged(_)) => { - // lagged doesn't mean anything with 1 send, but whatever, handle it the same - // this case should only ever happen if a panick happened in the `fut`. - Err(E2::from(RetriedTaskPanicked)) - } + match rx.recv().await { + Ok(Ok(t)) => Ok(t), + Ok(Err(e)) => Err(E2::from(e)), + Err(RecvError::Closed | RecvError::Lagged(_)) => { + // lagged doesn't mean anything with 1 send, but whatever, handle it the same + // this case should only ever happen if a panick happened in the `fut`. + Err(E2::from(RetriedTaskPanicked)) } } } @@ -384,19 +382,17 @@ where } #[cfg(test)] - fn make_oneshot_alike_receiver_any( + async fn make_oneshot_alike_receiver_any( mut rx: tokio::sync::broadcast::Receiver, - ) -> impl Future> + Send + 'static { + ) -> Result { use tokio::sync::broadcast::error::RecvError; - async move { - match rx.recv().await { - Ok(t) => Ok(t), - Err(RecvError::Closed | RecvError::Lagged(_)) => { - // lagged doesn't mean anything with 1 send, but whatever, handle it the same - // this case should only ever happen if a panick happened in the `fut`. - Err(RetriedTaskPanicked) - } + match rx.recv().await { + Ok(t) => Ok(t), + Err(RecvError::Closed | RecvError::Lagged(_)) => { + // lagged doesn't mean anything with 1 send, but whatever, handle it the same + // this case should only ever happen if a panick happened in the `fut`. + Err(RetriedTaskPanicked) } } }