diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 74dd7eddaf..7316a839e6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1145,15 +1145,8 @@ impl PageServerHandler { enum BatchState { Building(Option>), - UpstreamDead(Option>), - } - impl BatchState { - fn must_building_mut(&mut self) -> &mut Option> { - match self { - Self::Building(maybe_batch) => maybe_batch, - Self::UpstreamDead(_) => panic!("upstream dead"), - } - } + ReadMessagesEnded(Option>), + ExecutorEnded, } let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new( std::sync::Mutex::new(BatchState::Building(None)), @@ -1165,16 +1158,21 @@ impl PageServerHandler { scopeguard::defer! { debug!("exiting"); } - loop { + 'outer: loop { let maybe_req = requests_rx.recv().await; let Some(req) = maybe_req else { batch_tx.send_modify(|pending_batch| { let mut guard = pending_batch.lock().unwrap(); match &mut *guard { BatchState::Building(batch) => { - *guard = BatchState::UpstreamDead(batch.take()); + *guard = BatchState::ReadMessagesEnded(batch.take()); + } + BatchState::ReadMessagesEnded(_) => { + unreachable!("we exit the first time") + } + BatchState::ExecutorEnded => { + debug!("observing executor ended when reading upstream"); } - BatchState::UpstreamDead(_) => panic!("twice"), } }); break; @@ -1183,26 +1181,56 @@ impl PageServerHandler { let mut req = Some(req); loop { let mut wait_notified = None; - let batched = batch_tx.send_if_modified(|pending_batch| { + enum Outcome { + Batched, + CannotBatchNeedWaitForExecutor, + ExecutorEndObserved, + Undefined, + } + let mut outcome = Outcome::Undefined; + batch_tx.send_if_modified(|pending_batch| { let mut guard = pending_batch.lock().unwrap(); - let building = guard.must_building_mut(); + let building = match &mut *guard { + BatchState::Building(building) => building, + BatchState::ReadMessagesEnded(_) => { + unreachable!("we would have bailed earlier") + } + BatchState::ExecutorEnded => { + debug!("observing executor ended when trying to batch"); + outcome = Outcome::ExecutorEndObserved; + return false; + } + }; match Self::pagestream_do_batch( max_batch_size, building, req.take().unwrap(), ) { Some(req_was_not_batched) => { + outcome = Outcome::CannotBatchNeedWaitForExecutor; req.replace(req_was_not_batched); wait_notified = Some(notify_batcher.notified()); false } - None => true, + None => { + outcome = Outcome::Batched; + true + } } }); - if batched { - break; - } else { - wait_notified.unwrap().await; + match outcome { + Outcome::Batched => { + break; + } + Outcome::CannotBatchNeedWaitForExecutor => { + wait_notified.unwrap().await; + } + Outcome::ExecutorEndObserved => { + break 'outer; + } + Outcome::Undefined => { + unreachable!("send_if_modified should always be called") + } } } } @@ -1211,6 +1239,16 @@ impl PageServerHandler { .instrument(tracing::info_span!("batcher")); let executor = async { + let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| { + debug!("exiting"); + let borrow = batch_rx.borrow(); + let mut guard = borrow.lock().unwrap(); + match &*guard { + BatchState::Building(_) | BatchState::ReadMessagesEnded(_) => {} + BatchState::ExecutorEnded => unreachable!("we only set this here"), + } + *guard = BatchState::ExecutorEnded; + }); let mut stop = false; while !stop { match batch_rx.changed().await { @@ -1224,11 +1262,14 @@ impl PageServerHandler { let mut guard = borrow.lock().unwrap(); match &mut *guard { BatchState::Building(maybe_batch) => maybe_batch.take(), - BatchState::UpstreamDead(maybe_batch) => { + BatchState::ReadMessagesEnded(maybe_batch) => { debug!("upstream dead"); stop = true; maybe_batch.take() } + BatchState::ExecutorEnded => { + unreachable!("we break out of this loop after we set this state"); + } } }; let Some(batch) = maybe_batch else {