This commit is contained in:
Christian Schwarz
2024-11-21 19:42:43 +01:00
parent 345f8b6c3b
commit 408bc8fc71

View File

@@ -733,18 +733,25 @@ impl PageServerHandler {
Ok(Some(Box::new(batched_msg)))
}
/// Post-condition: `maybe_carry` is Some()
#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn pagestream_do_batch(
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
arg: Option<Box<BatchedFeMessage>>,
this_msg: Box<BatchedFeMessage>,
) -> Option<Box<BatchedFeMessage>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
match (maybe_carry.as_deref_mut(), arg.map(|x| *x)) {
(None, Some(arg)) => {
*maybe_carry = Some(Box::new(arg)); // TODO: avoid this boxing
match (maybe_carry.as_deref_mut(), *this_msg) {
// new batch
(None, this_msg @ BatchedFeMessage::GetPage { .. }) => {
*maybe_carry = Some(Box::new(this_msg)); // TODO: avoid this re-boxing
None
}
// nothing batched yet and this message is unbatchable
(None, this_msg) => {
Some(Box::new(this_msg)) // TODO: avoid this re-boxing
}
// something batched already, let's see if we can add this message to the batch
(
Some(BatchedFeMessage::GetPage {
span: _,
@@ -753,12 +760,12 @@ impl PageServerHandler {
effective_request_lsn: accum_lsn,
}),
// would be nice to have box pattern here
Some(BatchedFeMessage::GetPage {
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
}),
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
@@ -788,21 +795,17 @@ impl PageServerHandler {
accum_pages.extend(this_pages);
None
}
(Some(_), Some(this_msg)) => {
// something batched already but this message is unbatchable
(Some(_), this_msg) => {
// by default, don't continue batching
let this_msg = Box::new(this_msg); // TODO: avoid this box
let carry = maybe_carry.replace(this_msg).expect("this match arm checks it's Some()");
Some(carry)
}
(Some(_), None) => {
Some(maybe_carry.take().expect("this match arm checks it's Some()"))
}
(None, None) => {
None // TODO: can we prevent this branch trhough the typesystem
}
}
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
&mut self,
tenant_id: TenantId,
@@ -1013,10 +1016,17 @@ impl PageServerHandler {
&ctx,
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("request processing pipeline downstream dead");
debug!("downstream is gone");
break;
}
}
@@ -1034,45 +1044,32 @@ impl PageServerHandler {
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
async move {
let mut batch: Option<Box<BatchedFeMessage>> = None;
let mut stop = false;
while !stop {
loop {
let maybe_flush_msg = tokio::select! {
req = requests_rx.recv() => {
let arg = match req {
Some(Some(req)) => Some(req),
Some(None) => {
debug!("upstream task observed end of pagestream protocol");
None
}
None => {
debug!("upstream task observed protocol error");
None
}
};
if arg.is_none() {
stop = true;
if let Some(req) = req {
Self::pagestream_do_batch(&mut batch, req).await
} else {
debug!("request processing pipeline upstream dead");
std::mem::take(&mut batch)
}
Self::pagestream_do_batch(&mut batch, arg).await
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early");
// pass None so the batch gets flushed
Self::pagestream_do_batch(&mut batch, None).await
debug!("downstream ready, flushing batch early if any available");
std::mem::take(&mut batch)
}
};
let flush_msg = match maybe_flush_msg {
None => {
debug!("not batching");
continue;
}
Some(flush_msg) => flush_msg,
};
debug!("flushing batch");
match batched_tx.send(flush_msg).await {
Ok(()) => {}
Err(_) => {
debug!("batched messages consumer is gone");
stop = true;
debug!("downstream is gone");
break;
}
}
}
@@ -1101,6 +1098,7 @@ impl PageServerHandler {
break;
}
};
debug!("processing batch");
self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx)
.await?;
}