fix: batcher wouldn't shut down after executor exits

This commit is contained in:
Christian Schwarz
2024-11-25 11:28:30 +01:00
parent 0bb037240d
commit b9477aa945

View File

@@ -1145,15 +1145,8 @@ impl PageServerHandler {
enum BatchState {
Building(Option<Box<BatchedFeMessage>>),
UpstreamDead(Option<Box<BatchedFeMessage>>),
}
impl BatchState {
fn must_building_mut(&mut self) -> &mut Option<Box<BatchedFeMessage>> {
match self {
Self::Building(maybe_batch) => maybe_batch,
Self::UpstreamDead(_) => panic!("upstream dead"),
}
}
ReadMessagesEnded(Option<Box<BatchedFeMessage>>),
ExecutorEnded,
}
let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new(
std::sync::Mutex::new(BatchState::Building(None)),
@@ -1165,16 +1158,21 @@ impl PageServerHandler {
scopeguard::defer! {
debug!("exiting");
}
loop {
'outer: loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
batch_tx.send_modify(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
match &mut *guard {
BatchState::Building(batch) => {
*guard = BatchState::UpstreamDead(batch.take());
*guard = BatchState::ReadMessagesEnded(batch.take());
}
BatchState::ReadMessagesEnded(_) => {
unreachable!("we exit the first time")
}
BatchState::ExecutorEnded => {
debug!("observing executor ended when reading upstream");
}
BatchState::UpstreamDead(_) => panic!("twice"),
}
});
break;
@@ -1183,26 +1181,56 @@ impl PageServerHandler {
let mut req = Some(req);
loop {
let mut wait_notified = None;
let batched = batch_tx.send_if_modified(|pending_batch| {
enum Outcome {
Batched,
CannotBatchNeedWaitForExecutor,
ExecutorEndObserved,
Undefined,
}
let mut outcome = Outcome::Undefined;
batch_tx.send_if_modified(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
let building = guard.must_building_mut();
let building = match &mut *guard {
BatchState::Building(building) => building,
BatchState::ReadMessagesEnded(_) => {
unreachable!("we would have bailed earlier")
}
BatchState::ExecutorEnded => {
debug!("observing executor ended when trying to batch");
outcome = Outcome::ExecutorEndObserved;
return false;
}
};
match Self::pagestream_do_batch(
max_batch_size,
building,
req.take().unwrap(),
) {
Some(req_was_not_batched) => {
outcome = Outcome::CannotBatchNeedWaitForExecutor;
req.replace(req_was_not_batched);
wait_notified = Some(notify_batcher.notified());
false
}
None => true,
None => {
outcome = Outcome::Batched;
true
}
}
});
if batched {
break;
} else {
wait_notified.unwrap().await;
match outcome {
Outcome::Batched => {
break;
}
Outcome::CannotBatchNeedWaitForExecutor => {
wait_notified.unwrap().await;
}
Outcome::ExecutorEndObserved => {
break 'outer;
}
Outcome::Undefined => {
unreachable!("send_if_modified should always be called")
}
}
}
}
@@ -1211,6 +1239,16 @@ impl PageServerHandler {
.instrument(tracing::info_span!("batcher"));
let executor = async {
let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| {
debug!("exiting");
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &*guard {
BatchState::Building(_) | BatchState::ReadMessagesEnded(_) => {}
BatchState::ExecutorEnded => unreachable!("we only set this here"),
}
*guard = BatchState::ExecutorEnded;
});
let mut stop = false;
while !stop {
match batch_rx.changed().await {
@@ -1224,11 +1262,14 @@ impl PageServerHandler {
let mut guard = borrow.lock().unwrap();
match &mut *guard {
BatchState::Building(maybe_batch) => maybe_batch.take(),
BatchState::UpstreamDead(maybe_batch) => {
BatchState::ReadMessagesEnded(maybe_batch) => {
debug!("upstream dead");
stop = true;
maybe_batch.take()
}
BatchState::ExecutorEnded => {
unreachable!("we break out of this loop after we set this state");
}
}
};
let Some(batch) = maybe_batch else {