From aa1032aeff60f20e8b0eade624d9da8cb177d8ed Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 21 Nov 2024 18:39:53 +0100 Subject: [PATCH] no need for cancel & ctx in pagestream_do_batch --- pageserver/src/page_service.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 928472fc4c..058d8b3269 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -737,8 +737,6 @@ impl PageServerHandler { async fn pagestream_do_batch( maybe_carry: &mut Option>, arg: Option>, - cancel: &CancellationToken, - ctx: &RequestContext, ) -> Option> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); @@ -1029,9 +1027,7 @@ impl PageServerHandler { let ready_for_next_batch = Arc::new(tokio::sync::Notify::new()); let (batched_tx, mut batched_rx) = tokio::sync::mpsc::channel(1); tokio::spawn({ - let cancel = self.cancel.child_token(); let ready_for_next_batch = Arc::clone(&ready_for_next_batch); - let ctx = ctx.attached_child(); async move { let mut batch: Option> = None; let mut stop = false; @@ -1052,12 +1048,12 @@ impl PageServerHandler { if arg.is_none() { stop = true; } - Self::pagestream_do_batch(&mut batch, arg, &cancel, &ctx).await + Self::pagestream_do_batch(&mut batch, arg).await }, () = ready_for_next_batch.notified() => { debug!("downstream ready, flushing batch early"); // pass None so the batch gets flushed - Self::pagestream_do_batch(&mut batch, None, &cancel, &ctx).await + Self::pagestream_do_batch(&mut batch, None).await } }; let flush_msg = match maybe_flush_msg { @@ -1081,14 +1077,8 @@ impl PageServerHandler { while let Some(batch) = batched_rx.recv().await { ready_for_next_batch.notify_one(); - self.pagesteam_handle_batched_message( - tenant_id, - timeline_id, - pgb, - *batch, - &ctx, - ) - .await?; + self.pagesteam_handle_batched_message(tenant_id, timeline_id, pgb, *batch, &ctx) + .await?; } let (pgb_reader, timeline_handles) = read_message_task