mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
read_messages => batcher
This commit is contained in:
@@ -1196,41 +1196,37 @@ impl PageServerHandler {
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let read_messages = pipeline_stage!(
|
||||
"read_messages",
|
||||
cancel_batcher.clone(),
|
||||
move |cancel_batcher| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let read_res = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
let batcher = pipeline_stage!("batcher", cancel_batcher.clone(), move |cancel_batcher| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let read_res = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
.await;
|
||||
let Some(read_res) = read_res.transpose() else {
|
||||
debug!("client-initiated shutdown");
|
||||
break;
|
||||
};
|
||||
exit |= read_res.is_err();
|
||||
let could_send = batch_tx
|
||||
.send(read_res, |batch, res| {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, res)
|
||||
})
|
||||
.await;
|
||||
let Some(read_res) = read_res.transpose() else {
|
||||
debug!("client-initiated shutdown");
|
||||
break;
|
||||
};
|
||||
exit |= read_res.is_err();
|
||||
let could_send = batch_tx
|
||||
.send(read_res, |batch, res| {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, res)
|
||||
})
|
||||
.await;
|
||||
exit |= could_send.is_err();
|
||||
}
|
||||
(pgb_reader, timeline_handles)
|
||||
exit |= could_send.is_err();
|
||||
}
|
||||
(pgb_reader, timeline_handles)
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
//
|
||||
// Executor
|
||||
@@ -1267,11 +1263,11 @@ impl PageServerHandler {
|
||||
|
||||
match execution {
|
||||
PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => {
|
||||
tokio::join!(read_messages, executor)
|
||||
tokio::join!(batcher, executor)
|
||||
}
|
||||
PageServiceProtocolPipelinedExecutionStrategy::Tasks => {
|
||||
// These tasks are not tracked anywhere.
|
||||
let read_messages_task = tokio::spawn(read_messages);
|
||||
let read_messages_task = tokio::spawn(batcher);
|
||||
let (read_messages_task_res, executor_res_) =
|
||||
tokio::join!(read_messages_task, executor,);
|
||||
(
|
||||
|
||||
Reference in New Issue
Block a user