no need for cancel & ctx in pagestream_do_batch

This commit is contained in:
Christian Schwarz
2024-11-21 18:39:53 +01:00
parent a1bb2e7bb0
commit aa1032aeff

View File

@@ -737,8 +737,6 @@ impl PageServerHandler {
async fn pagestream_do_batch(
maybe_carry: &mut Option<Box<BatchedFeMessage>>,
arg: Option<Box<BatchedFeMessage>>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Option<Box<BatchedFeMessage>> {
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
@@ -1029,9 +1027,7 @@ impl PageServerHandler {
let ready_for_next_batch = Arc::new(tokio::sync::Notify::new());
let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn({
let cancel = self.cancel.child_token();
let ready_for_next_batch = Arc::clone(&ready_for_next_batch);
let ctx = ctx.attached_child();
async move {
let mut batch: Option<Box<BatchedFeMessage>> = None;
let mut stop = false;
@@ -1052,12 +1048,12 @@ impl PageServerHandler {
if arg.is_none() {
stop = true;
}
Self::pagestream_do_batch(&mut batch, arg, &cancel, &ctx).await
Self::pagestream_do_batch(&mut batch, arg).await
},
() = ready_for_next_batch.notified() => {
debug!("downstream ready, flushing batch early");
// pass None so the batch gets flushed
Self::pagestream_do_batch(&mut batch, None, &cancel, &ctx).await
Self::pagestream_do_batch(&mut batch, None).await
}
};
let flush_msg = match maybe_flush_msg {
@@ -1081,14 +1077,8 @@ impl PageServerHandler {
while let Some(batch) = batched_rx.recv().await {
ready_for_next_batch.notify_one();
self.pagesteam_handle_batched_message(
tenant_id,
timeline_id,
pgb,
*batch,
&ctx,
)
.await?;
self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx)
.await?;
}
let (pgb_reader, timeline_handles) = read_message_task