watch-based approach

This commit is contained in:
Christian Schwarz
2024-11-21 22:55:10 +01:00
parent db9093f938
commit 88fd8aed52

View File

@@ -752,21 +752,17 @@ impl PageServerHandler {
/// Post-condition: `maybe_carry` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn pagestream_do_batch(
fn pagestream_do_batch(
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
this_msg: Box<BatchedFeMessage>,
) -> Option<Box<BatchedFeMessage>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
match (maybe_carry.as_deref_mut(), *this_msg) {
// new batch
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
None
}
// nothing batched yet and this message is unbatchable
// nothing batched yet
(None, this_msg) => {
Some(Box::new(this_msg)) // TODO: avoid this re-boxing
*maybe_carry = Some(Box::new(this_msg));
None
}
// something batched already, let's see if we can add this message to the batch
(
@@ -783,7 +779,7 @@ impl PageServerHandler {
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
) if (|| {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
@@ -805,8 +801,7 @@ impl PageServerHandler {
return false;
}
true
}
.await =>
})() =>
{
// ok to batch
accum_pages.extend(this_pages);
@@ -815,9 +810,7 @@ impl PageServerHandler {
// something batched already but this message is unbatchable
(Some(_), this_msg) => {
// by default, don't continue batching
let this_msg = Box::new(this_msg); // TODO: avoid this box
let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()");
Some(carry)
Some(Box::new(this_msg)) // TODO: avoid re-box
}
}
}
@@ -1011,7 +1004,7 @@ impl PageServerHandler {
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let request_span = info_span!("request", shard_id = tracing::field::Empty);
let read_message_task = tokio::spawn(
let read_message_task: JoinHandle<Result<_, QueryError>> = tokio::spawn(
{
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
@@ -1046,48 +1039,69 @@ impl PageServerHandler {
}
}
}
anyhow::Ok((pgb_reader, timeline_handles))
Ok((pgb_reader, timeline_handles))
}
}
.instrument(tracing::info_span!("read_protocol")),
);
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
enum BatchState {
Building(Option<Box<BatchedFeMessage>>),
UpstreamDead(Option<Box<BatchedFeMessage>>),
}
impl BatchState {
fn must_building_mut(&mut self) -> &mut Option<Box<BatchedFeMessage>> {
match self {
Self::Building(maybe_batch) => maybe_batch,
Self::UpstreamDead(_) => panic!("upstream dead"),
}
}
}
let (batch_tx, mut batch_rx) = tokio::sync::watch::channel(Arc::new(
std::sync::Mutex::new(BatchState::Building(None)),
));
let notify_batcher = Arc::new(tokio::sync::Notify::new());
tokio::spawn(
{
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
let notify_batcher = notify_batcher.clone();
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut batch: Option<Box<BatchedFeMessage>> = None;
loop {
let maybe_flush_msg = tokio::select! {
req = requests_rx.recv() => {
if let Some(req) = req {
Self::pagestream_do_batch(&mut batch, req).await
} else {
debug!("request processing pipeline upstream dead");
std::mem::take(&mut batch)
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
batch_tx.send_modify(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
match &mut *guard {
BatchState::Building(carry) => {
*guard = BatchState::UpstreamDead(carry.take());
}
BatchState::UpstreamDead(_) => panic!("twice"),
}
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early if any available");
std::mem::take(&mut batch)
}
});
break;
};
let flush_msg = match maybe_flush_msg {
None => {
continue;
}
Some(flush_msg) => flush_msg,
};
match batched_tx.send(flush_msg).await {
Ok(()) => {}
Err(_) => {
debug!("downstream is gone");
// don't read new requests before this one has been processed
let mut req = Some(req);
loop {
let mut wait_notified = None;
let batched = batch_tx.send_if_modified(|pending_batch| {
let mut guard = pending_batch.lock().unwrap();
let building = guard.must_building_mut();
match Self::pagestream_do_batch(building, req.take().unwrap()) {
Some(req_was_not_batched) => {
req.replace(req_was_not_batched);
wait_notified = Some(notify_batcher.notified());
false
}
None => true,
}
});
if batched {
break;
} else {
wait_notified.unwrap().await;
}
}
}
@@ -1096,26 +1110,30 @@ impl PageServerHandler {
.instrument(tracing::info_span!("batching")),
);
loop {
let batch = match batched_rx.try_recv() {
Ok(batch) => batch,
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
ready_for_next_batch.notify_one();
match batched_rx.recv().await {
Some(batch) => batch,
None => {
debug!(
"batched_rx observed as disconnected while waiting for next batch"
);
break;
}
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
debug!("batched_rx observed as disconnected as part of try_recv()");
break;
let mut stop = false;
while !stop {
match batch_rx.changed().await {
Ok(()) => {}
Err(_) => {
debug!("batch_rx observed disconnection of batcher");
}
};
let maybe_batch = {
let borrow = batch_rx.borrow();
let mut guard = borrow.lock().unwrap();
match &mut *guard {
BatchState::Building(maybe_batch) => maybe_batch.take(),
BatchState::UpstreamDead(maybe_batch) => {
debug!("upstream dead");
stop = true;
maybe_batch.take()
}
}
};
let Some(batch) = maybe_batch else {
break;
};
notify_batcher.notify_one();
debug!("processing batch");
self.pagesteam_handle_batched_message(pgb, *batch, &ctx)
.await?;