From 8fafff37c54f29a608a4decc092a0cec74e7e864 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 Jan 2025 19:00:00 +0100 Subject: [PATCH] remove the whole barriers business --- pageserver/src/tenant/storage_layer.rs | 85 +++----------------------- pageserver/src/tenant/timeline.rs | 26 -------- 2 files changed, 8 insertions(+), 103 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index fdfb44d6bf..ddab5b70c2 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -176,7 +176,6 @@ pub(crate) enum IoConcurrency { Serial, FuturesUnordered { ios_tx: tokio::sync::mpsc::UnboundedSender, - barriers_tx: tokio::sync::mpsc::UnboundedSender>, cancel_task_on_drop: Arc, }, } @@ -232,7 +231,6 @@ impl IoConcurrency { match io_concurrency { SelectedIoConcurrency::Serial => IoConcurrency::Serial, SelectedIoConcurrency::FuturesUnordered(gate_guard) => { - let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel(); let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel(); let (cancel, _cancel_task_on_drop) = { let t = CancellationToken::new(); @@ -247,62 +245,41 @@ impl IoConcurrency { trace!("start"); scopeguard::defer!{ trace!("end") }; type IosRx = tokio::sync::mpsc::UnboundedReceiver; - type BarrierReqRx = - tokio::sync::mpsc::UnboundedReceiver>; - type BarrierDoneTx = tokio::sync::oneshot::Sender<()>; enum State { Waiting { // invariant: is_empty(), but we recycle the allocation empty_futures: FuturesUnordered, ios_rx: IosRx, - barrier_rx: BarrierReqRx, }, Executing { futures: FuturesUnordered, ios_rx: IosRx, - barrier_rx: BarrierReqRx, - }, - Barriering { - futures: FuturesUnordered, - ios_rx: IosRx, - barrier_rx: BarrierReqRx, - barrier_done: BarrierDoneTx, }, ShuttingDown { futures: FuturesUnordered, - barrier_done: Option, }, } let mut state = State::Waiting { empty_futures: FuturesUnordered::new(), ios_rx, - barrier_rx, }; loop { match state { State::Waiting { empty_futures, mut ios_rx, - mut barrier_rx, } => { assert!(empty_futures.is_empty()); tokio::select! { () = cancel.cancelled() => { - state = State::ShuttingDown { futures: empty_futures, barrier_done: None }; + state = State::ShuttingDown { futures: empty_futures }; } fut = ios_rx.recv() => { if let Some(fut) = fut { empty_futures.push(fut); - state = State::Executing { futures: empty_futures, ios_rx, barrier_rx }; + state = State::Executing { futures: empty_futures, ios_rx }; } else { - state = State::ShuttingDown { futures: empty_futures, barrier_done: None } - } - } - barrier_done = barrier_rx.recv() => { - if let Some(barrier_done) = barrier_done { - state = State::Barriering { futures: empty_futures, ios_rx, barrier_rx, barrier_done }; - } else { - state = State::ShuttingDown { futures: empty_futures, barrier_done: None }; + state = State::ShuttingDown { futures: empty_futures } } } } @@ -310,79 +287,36 @@ impl IoConcurrency { State::Executing { mut futures, mut ios_rx, - mut barrier_rx, } => { tokio::select! { () = cancel.cancelled() => { - state = State::ShuttingDown { futures, barrier_done: None }; + state = State::ShuttingDown { futures }; } res = futures.next() => { assert!(res.is_some()); if futures.is_empty() { - state = State::Waiting { empty_futures: futures, ios_rx, barrier_rx }; + state = State::Waiting { empty_futures: futures, ios_rx}; } else { - state = State::Executing { futures, ios_rx, barrier_rx }; + state = State::Executing { futures, ios_rx }; } } fut = ios_rx.recv() => { if let Some(fut) = fut { futures.push(fut); - state = State::Executing { futures, ios_rx, barrier_rx }; + state = State::Executing { futures, ios_rx}; } else { - state = State::ShuttingDown { futures, barrier_done: None }; + state = State::ShuttingDown { futures }; } } - barrier_done = barrier_rx.recv() => { - if let Some(barrier_done) = barrier_done { - state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done }; - } else { - state = State::ShuttingDown { futures, barrier_done: None }; - } - } - } - } - State::Barriering { - mut futures, - ios_rx, - barrier_rx, - barrier_done, - } => { - if futures.is_empty() { - barrier_done.send(()).unwrap(); - state = State::Waiting { - empty_futures: futures, - ios_rx, - barrier_rx, - }; - } else { - tokio::select! { - () = cancel.cancelled() => { - state = State::ShuttingDown { futures, barrier_done: Some(barrier_done) }; - } - res = futures.next() => { - assert!(res.is_some()); - if futures.is_empty() { - barrier_done.send(()).unwrap(); - state = State::Waiting { empty_futures: futures , ios_rx, barrier_rx }; - } else { - state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done }; - } - } - // in barriering mode, we don't accept new IOs or new barrier requests - } } } State::ShuttingDown { mut futures, - barrier_done, } => { trace!("shutting down"); while let Some(()) = futures.next().await { // drain } - if let Some(barrier_done) = barrier_done { - barrier_done.send(()).unwrap(); - } break; } } @@ -391,7 +325,6 @@ impl IoConcurrency { }.instrument(span)); IoConcurrency::FuturesUnordered { ios_tx, - barriers_tx, cancel_task_on_drop: _cancel_task_on_drop, } } @@ -403,11 +336,9 @@ impl IoConcurrency { IoConcurrency::Serial => IoConcurrency::Serial, IoConcurrency::FuturesUnordered { ios_tx, - barriers_tx, cancel_task_on_drop, } => IoConcurrency::FuturesUnordered { ios_tx: ios_tx.clone(), - barriers_tx: barriers_tx.clone(), cancel_task_on_drop: cancel_task_on_drop.clone(), }, } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 57a1d522e3..20fefe923b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3428,32 +3428,6 @@ impl Timeline { } } - // TODO: move this to a function - trace!("waiting for futures to complete"); - match &reconstruct_state.io_concurrency { - super::storage_layer::IoConcurrency::Serial => (), - super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => { - let (tx, rx) = tokio::sync::oneshot::channel(); - match barriers_tx.send(tx) { - Ok(()) => {} - Err(_) => { - return Err(GetVectoredError::Other(anyhow::anyhow!( - "concurrent io task dropped its barriers_rx, likely it panicked" - ))); - } - } - match rx.await { - Ok(()) => {} - Err(_) => { - return Err(GetVectoredError::Other(anyhow::anyhow!( - "concurrent io task dropped the barrier_done, likely it panicked" - ))); - } - } - } - } - trace!("futures completed"); - Ok(TimelineVisitOutcome { completed_keyspace, image_covered_keyspace: image_covered_keyspace.consume_keyspace(),