From cdad6b2de5bb9739acf08a0d7e71950020b5aeca Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 20 Jan 2025 19:13:40 +0100 Subject: [PATCH] we don't need the CancellationToken, the ios_rx.recv() will fail at the same time --- pageserver/src/tenant/storage_layer.rs | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 810dfddbb5..0c9ce3d2cf 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -27,7 +27,6 @@ use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio_util::sync::CancellationToken; use tracing::{trace, Instrument}; use utils::sync::gate::GateGuard; @@ -208,7 +207,6 @@ pub(crate) enum IoConcurrency { Serial, FuturesUnordered { ios_tx: tokio::sync::mpsc::UnboundedSender, - cancel_task_on_drop: Arc, }, } @@ -265,10 +263,6 @@ impl IoConcurrency { SelectedIoConcurrency::Serial => IoConcurrency::Serial, SelectedIoConcurrency::FuturesUnordered(gate_guard) => { let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel(); - let (cancel, _cancel_task_on_drop) = { - let t = CancellationToken::new(); - (t.clone(), Arc::new(t.drop_guard())) - }; static TASK_ID: AtomicUsize = AtomicUsize::new(0); let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let span = @@ -304,9 +298,6 @@ impl IoConcurrency { } => { assert!(empty_futures.is_empty()); tokio::select! { - () = cancel.cancelled() => { - state = State::ShuttingDown { futures: empty_futures }; - } fut = ios_rx.recv() => { if let Some(fut) = fut { empty_futures.push(fut); @@ -322,9 +313,6 @@ impl IoConcurrency { mut ios_rx, } => { tokio::select! { - () = cancel.cancelled() => { - state = State::ShuttingDown { futures }; - } res = futures.next() => { assert!(res.is_some()); if futures.is_empty() { @@ -374,10 +362,7 @@ impl IoConcurrency { } drop(gate_guard); // drop it right before we exitlast }.instrument(span)); - IoConcurrency::FuturesUnordered { - ios_tx, - cancel_task_on_drop: _cancel_task_on_drop, - } + IoConcurrency::FuturesUnordered { ios_tx } } } } @@ -385,12 +370,8 @@ impl IoConcurrency { pub(crate) fn clone(&self) -> Self { match self { IoConcurrency::Serial => IoConcurrency::Serial, - IoConcurrency::FuturesUnordered { - ios_tx, - cancel_task_on_drop, - } => IoConcurrency::FuturesUnordered { + IoConcurrency::FuturesUnordered { ios_tx } => IoConcurrency::FuturesUnordered { ios_tx: ios_tx.clone(), - cancel_task_on_drop: cancel_task_on_drop.clone(), }, } }