we don't need the CancellationToken, the ios_rx.recv() will fail at the same time

This commit is contained in:
Christian Schwarz
2025-01-20 19:13:40 +01:00
parent 82d20b52b3
commit cdad6b2de5

View File

@@ -27,7 +27,6 @@ use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;
use tracing::{trace, Instrument};
use utils::sync::gate::GateGuard;
@@ -208,7 +207,6 @@ pub(crate) enum IoConcurrency {
Serial,
FuturesUnordered {
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
cancel_task_on_drop: Arc<tokio_util::sync::DropGuard>,
},
}
@@ -265,10 +263,6 @@ impl IoConcurrency {
SelectedIoConcurrency::Serial => IoConcurrency::Serial,
SelectedIoConcurrency::FuturesUnordered(gate_guard) => {
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
let (cancel, _cancel_task_on_drop) = {
let t = CancellationToken::new();
(t.clone(), Arc::new(t.drop_guard()))
};
static TASK_ID: AtomicUsize = AtomicUsize::new(0);
let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let span =
@@ -304,9 +298,6 @@ impl IoConcurrency {
} => {
assert!(empty_futures.is_empty());
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures: empty_futures };
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
empty_futures.push(fut);
@@ -322,9 +313,6 @@ impl IoConcurrency {
mut ios_rx,
} => {
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures };
}
res = futures.next() => {
assert!(res.is_some());
if futures.is_empty() {
@@ -374,10 +362,7 @@ impl IoConcurrency {
}
drop(gate_guard); // drop it right before we exitlast
}.instrument(span));
IoConcurrency::FuturesUnordered {
ios_tx,
cancel_task_on_drop: _cancel_task_on_drop,
}
IoConcurrency::FuturesUnordered { ios_tx }
}
}
}
@@ -385,12 +370,8 @@ impl IoConcurrency {
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Serial => IoConcurrency::Serial,
IoConcurrency::FuturesUnordered {
ios_tx,
cancel_task_on_drop,
} => IoConcurrency::FuturesUnordered {
IoConcurrency::FuturesUnordered { ios_tx } => IoConcurrency::FuturesUnordered {
ios_tx: ios_tx.clone(),
cancel_task_on_drop: cancel_task_on_drop.clone(),
},
}
}