adopt spsc_fold

This commit is contained in:
Christian Schwarz
2024-11-26 13:30:40 +01:00
parent 9bf2618c45
commit a23abb2cc0

View File

@@ -36,6 +36,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::spsc_fold;
use utils::{
auth::{Claims, Scope, SwappableJwtAuth},
id::{TenantId, TimelineId},
@@ -755,25 +756,20 @@ impl PageServerHandler {
#[allow(clippy::boxed_local)]
fn pagestream_do_batch(
max_batch_size: NonZeroUsize,
batch: &mut Option<Box<BatchedFeMessage>>,
batch: &mut Box<BatchedFeMessage>,
this_msg: Box<BatchedFeMessage>,
) -> Option<Box<BatchedFeMessage>> {
) -> Result<(), Box<BatchedFeMessage>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
match (batch.as_deref_mut(), *this_msg) {
// nothing batched yet
(None, this_msg) => {
*batch = Some(Box::new(this_msg));
None
}
match (&mut **batch, *this_msg) {
// something batched already, let's see if we can add this message to the batch
(
Some(BatchedFeMessage::GetPage {
BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}),
},
// would be nice to have box pattern here
BatchedFeMessage::GetPage {
span: _,
@@ -807,12 +803,12 @@ impl PageServerHandler {
{
// ok to batch
accum_pages.extend(this_pages);
None
Ok(())
}
// something batched already but this message is unbatchable
(Some(_), this_msg) => {
(_, this_msg) => {
// by default, don't continue batching
Some(Box::new(this_msg)) // TODO: avoid re-box
Err(Box::new(this_msg)) // TODO: avoid re-box
}
}
}
@@ -1160,96 +1156,26 @@ impl PageServerHandler {
//
// Create Batching future.
//
enum BatchState {
Building(Option<Box<BatchedFeMessage>>),
ReadMessagesEnded(Option<Box<BatchedFeMessage>>),
ExecutorEnded,
}
let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new(
std::sync::Mutex::new(BatchState::Building(None)),
));
let notify_batcher = Arc::new(tokio::sync::Notify::new());
let batcher = {
let notify_batcher = notify_batcher.clone();
async move {
scopeguard::defer! {
debug!("exiting");
}
'outer: loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
batch_tx.send_modify(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
match &mut *guard {
BatchState::Building(batch) => {
*guard = BatchState::ReadMessagesEnded(batch.take());
}
BatchState::ReadMessagesEnded(_) => {
unreachable!("we exit the first time")
}
BatchState::ExecutorEnded => {
debug!("observing executor ended when reading upstream");
}
}
});
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = async move {
scopeguard::defer! {
debug!("exiting");
}
loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
break;
};
let send_res = batch_tx
.send(req, |batch, req| {
Self::pagestream_do_batch(max_batch_size, batch, req)
})
.await;
match send_res {
Ok(()) => {}
Err(spsc_fold::SendError::ReceiverGone) => {
debug!("downstream is gone");
break;
};
// don't read new requests before this one has been processed
let mut req = Some(req);
loop {
let mut wait_notified = None;
enum Outcome {
Batched,
CannotBatchNeedWaitForExecutor,
ExecutorEndObserved,
Undefined,
}
let mut outcome = Outcome::Undefined;
batch_tx.send_if_modified(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
let building = match &mut *guard {
BatchState::Building(building) => building,
BatchState::ReadMessagesEnded(_) => {
unreachable!("we would have bailed earlier")
}
BatchState::ExecutorEnded => {
debug!("observing executor ended when trying to batch");
outcome = Outcome::ExecutorEndObserved;
return false;
}
};
match Self::pagestream_do_batch(
max_batch_size,
building,
req.take().unwrap(),
) {
Some(req_was_not_batched) => {
outcome = Outcome::CannotBatchNeedWaitForExecutor;
req.replace(req_was_not_batched);
wait_notified = Some(notify_batcher.notified());
false
}
None => {
outcome = Outcome::Batched;
true
}
}
});
match outcome {
Outcome::Batched => {
break;
}
Outcome::CannotBatchNeedWaitForExecutor => {
wait_notified.unwrap().await;
}
Outcome::ExecutorEndObserved => {
break 'outer;
}
Outcome::Undefined => {
unreachable!("send_if_modified should always be called")
}
}
}
}
}
@@ -1261,49 +1187,23 @@ impl PageServerHandler {
//
let executor = async {
let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| {
scopeguard::defer! {
debug!("exiting");
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &*guard {
BatchState::Building(_) | BatchState::ReadMessagesEnded(_) => {}
BatchState::ExecutorEnded => unreachable!("we only set this here"),
}
*guard = BatchState::ExecutorEnded;
});
let mut stop = false;
while !stop {
match batch_rx.changed().await {
Ok(()) => {}
Err(_) => {
debug!("batch_rx observed disconnection of batcher");
};
loop {
let batch = match batch_rx.recv().await {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
break;
}
};
let maybe_batch = {
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &mut *guard {
BatchState::Building(maybe_batch) => maybe_batch.take(),
BatchState::ReadMessagesEnded(maybe_batch) => {
debug!("upstream dead");
stop = true;
maybe_batch.take()
}
BatchState::ExecutorEnded => {
unreachable!("we break out of this loop after we set this state");
}
}
};
let Some(batch) = maybe_batch else {
break;
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx)
.await?;
}
Ok(())
};
}
.instrument(tracing::info_span!("executor"));
//
// Execute the stages until they exit.