Simplify low-hanging fruit

This commit is contained in:
Dmitry Ivanov
2023-05-16 18:00:11 +03:00
committed by Joonas Koivunen
parent 0251c7c15c
commit d0b2befe02

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::future::Future;
use std::sync::Arc;
/// Container using which many request handlers can come together and join a single task to
/// completion instead of racing each other and their own cancellation.
@@ -244,23 +244,21 @@ where
///
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
/// times.
fn make_oneshot_alike_receiver<E2>(
async fn make_oneshot_alike_receiver<E2>(
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
) -> impl Future<Output = Result<T, E2>> + Send + 'static
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
use tokio::sync::broadcast::error::RecvError;
async move {
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
}
}
@@ -384,19 +382,17 @@ where
}
#[cfg(test)]
fn make_oneshot_alike_receiver_any(
async fn make_oneshot_alike_receiver_any(
mut rx: tokio::sync::broadcast::Receiver<V>,
) -> impl Future<Output = Result<V, RetriedTaskPanicked>> + Send + 'static {
) -> Result<V, RetriedTaskPanicked> {
use tokio::sync::broadcast::error::RecvError;
async move {
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
}
}