diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1645dfc9d0..1917e7f5b7 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1196,41 +1196,37 @@ impl PageServerHandler { let cancel_batcher = self.cancel.child_token(); let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let read_messages = pipeline_stage!( - "read_messages", - cancel_batcher.clone(), - move |cancel_batcher| { - let ctx = ctx.attached_child(); - async move { - let mut pgb_reader = pgb_reader; - let mut exit = false; - while !exit { - let read_res = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel_batcher, - &ctx, - request_span.clone(), - ) + let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| { + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + let mut exit = false; + while !exit { + let read_res = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel_batcher, + &ctx, + request_span.clone(), + ) + .await; + let Some(read_res) = read_res.transpose() else { + debug!("client-initiated shutdown"); + break; + }; + exit |= read_res.is_err(); + let could_send = batch_tx + .send(read_res, |batch, res| { + Self::pagestream_do_batch(max_batch_size, batch, res) + }) .await; - let Some(read_res) = read_res.transpose() else { - debug!("client-initiated shutdown"); - break; - }; - exit |= read_res.is_err(); - let could_send = batch_tx - .send(read_res, |batch, res| { - Self::pagestream_do_batch(max_batch_size, batch, res) - }) - .await; - exit |= could_send.is_err(); - } - (pgb_reader, timeline_handles) + exit |= could_send.is_err(); } + (pgb_reader, timeline_handles) } - ); + }); // // Executor @@ -1267,11 +1263,11 @@ impl PageServerHandler { match execution { PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => { - tokio::join!(read_messages, executor) + tokio::join!(batcher, executor) } PageServiceProtocolPipelinedExecutionStrategy::Tasks => { // These tasks are not tracked anywhere. - let read_messages_task = tokio::spawn(read_messages); + let read_messages_task = tokio::spawn(batcher); let (read_messages_task_res, executor_res_) = tokio::join!(read_messages_task, executor,); (