remove the whole barriers business

This commit is contained in:
Christian Schwarz
2025-01-15 19:00:00 +01:00
parent e81fa7137e
commit 8fafff37c5
2 changed files with 8 additions and 103 deletions

View File

@@ -176,7 +176,6 @@ pub(crate) enum IoConcurrency {
Serial,
FuturesUnordered {
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
barriers_tx: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<()>>,
cancel_task_on_drop: Arc<tokio_util::sync::DropGuard>,
},
}
@@ -232,7 +231,6 @@ impl IoConcurrency {
match io_concurrency {
SelectedIoConcurrency::Serial => IoConcurrency::Serial,
SelectedIoConcurrency::FuturesUnordered(gate_guard) => {
let (barriers_tx, barrier_rx) = tokio::sync::mpsc::unbounded_channel();
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
let (cancel, _cancel_task_on_drop) = {
let t = CancellationToken::new();
@@ -247,62 +245,41 @@ impl IoConcurrency {
trace!("start");
scopeguard::defer!{ trace!("end") };
type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
type BarrierReqRx =
tokio::sync::mpsc::UnboundedReceiver<tokio::sync::oneshot::Sender<()>>;
type BarrierDoneTx = tokio::sync::oneshot::Sender<()>;
enum State {
Waiting {
// invariant: is_empty(), but we recycle the allocation
empty_futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
},
Executing {
futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
},
Barriering {
futures: FuturesUnordered<IoFuture>,
ios_rx: IosRx,
barrier_rx: BarrierReqRx,
barrier_done: BarrierDoneTx,
},
ShuttingDown {
futures: FuturesUnordered<IoFuture>,
barrier_done: Option<BarrierDoneTx>,
},
}
let mut state = State::Waiting {
empty_futures: FuturesUnordered::new(),
ios_rx,
barrier_rx,
};
loop {
match state {
State::Waiting {
empty_futures,
mut ios_rx,
mut barrier_rx,
} => {
assert!(empty_futures.is_empty());
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None };
state = State::ShuttingDown { futures: empty_futures };
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
empty_futures.push(fut);
state = State::Executing { futures: empty_futures, ios_rx, barrier_rx };
state = State::Executing { futures: empty_futures, ios_rx };
} else {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None }
}
}
barrier_done = barrier_rx.recv() => {
if let Some(barrier_done) = barrier_done {
state = State::Barriering { futures: empty_futures, ios_rx, barrier_rx, barrier_done };
} else {
state = State::ShuttingDown { futures: empty_futures, barrier_done: None };
state = State::ShuttingDown { futures: empty_futures }
}
}
}
@@ -310,79 +287,36 @@ impl IoConcurrency {
State::Executing {
mut futures,
mut ios_rx,
mut barrier_rx,
} => {
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures, barrier_done: None };
state = State::ShuttingDown { futures };
}
res = futures.next() => {
assert!(res.is_some());
if futures.is_empty() {
state = State::Waiting { empty_futures: futures, ios_rx, barrier_rx };
state = State::Waiting { empty_futures: futures, ios_rx};
} else {
state = State::Executing { futures, ios_rx, barrier_rx };
state = State::Executing { futures, ios_rx };
}
}
fut = ios_rx.recv() => {
if let Some(fut) = fut {
futures.push(fut);
state = State::Executing { futures, ios_rx, barrier_rx };
state = State::Executing { futures, ios_rx};
} else {
state = State::ShuttingDown { futures, barrier_done: None };
state = State::ShuttingDown { futures };
}
}
barrier_done = barrier_rx.recv() => {
if let Some(barrier_done) = barrier_done {
state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done };
} else {
state = State::ShuttingDown { futures, barrier_done: None };
}
}
}
}
State::Barriering {
mut futures,
ios_rx,
barrier_rx,
barrier_done,
} => {
if futures.is_empty() {
barrier_done.send(()).unwrap();
state = State::Waiting {
empty_futures: futures,
ios_rx,
barrier_rx,
};
} else {
tokio::select! {
() = cancel.cancelled() => {
state = State::ShuttingDown { futures, barrier_done: Some(barrier_done) };
}
res = futures.next() => {
assert!(res.is_some());
if futures.is_empty() {
barrier_done.send(()).unwrap();
state = State::Waiting { empty_futures: futures , ios_rx, barrier_rx };
} else {
state = State::Barriering { futures, ios_rx, barrier_rx, barrier_done };
}
}
// in barriering mode, we don't accept new IOs or new barrier requests
}
}
}
State::ShuttingDown {
mut futures,
barrier_done,
} => {
trace!("shutting down");
while let Some(()) = futures.next().await {
// drain
}
if let Some(barrier_done) = barrier_done {
barrier_done.send(()).unwrap();
}
break;
}
}
@@ -391,7 +325,6 @@ impl IoConcurrency {
}.instrument(span));
IoConcurrency::FuturesUnordered {
ios_tx,
barriers_tx,
cancel_task_on_drop: _cancel_task_on_drop,
}
}
@@ -403,11 +336,9 @@ impl IoConcurrency {
IoConcurrency::Serial => IoConcurrency::Serial,
IoConcurrency::FuturesUnordered {
ios_tx,
barriers_tx,
cancel_task_on_drop,
} => IoConcurrency::FuturesUnordered {
ios_tx: ios_tx.clone(),
barriers_tx: barriers_tx.clone(),
cancel_task_on_drop: cancel_task_on_drop.clone(),
},
}

View File

@@ -3428,32 +3428,6 @@ impl Timeline {
}
}
// TODO: move this to a function
trace!("waiting for futures to complete");
match &reconstruct_state.io_concurrency {
super::storage_layer::IoConcurrency::Serial => (),
super::storage_layer::IoConcurrency::FuturesUnordered { barriers_tx, .. } => {
let (tx, rx) = tokio::sync::oneshot::channel();
match barriers_tx.send(tx) {
Ok(()) => {}
Err(_) => {
return Err(GetVectoredError::Other(anyhow::anyhow!(
"concurrent io task dropped its barriers_rx, likely it panicked"
)));
}
}
match rx.await {
Ok(()) => {}
Err(_) => {
return Err(GetVectoredError::Other(anyhow::anyhow!(
"concurrent io task dropped the barrier_done, likely it panicked"
)));
}
}
}
}
trace!("futures completed");
Ok(TimelineVisitOutcome {
completed_keyspace,
image_covered_keyspace: image_covered_keyspace.consume_keyspace(),