expand fix to tasks mode; add some comments

This commit is contained in:
Christian Schwarz
2024-11-25 11:51:58 +01:00
parent b9477aa945
commit 99b664c9ed

View File

@@ -1081,6 +1081,9 @@ impl PageServerHandler {
}
}
/// # Cancel-Safety
///
/// May leak tokio tasks if not polled to completion.
#[allow(clippy::too_many_arguments)]
async fn handle_pagerequests_pipelined<IO>(
&mut self,
@@ -1096,6 +1099,13 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
//
// We construct a pipeline of
// - Reading: read messages from pgb
// - Batching: fill the current batch
// - Execution: take the current batch, execute it using get_vectored, and send the response.
//
let PageServicePipeliningConfig {
max_batch_size,
protocol_pipelining_mode,
@@ -1103,6 +1113,10 @@ impl PageServerHandler {
let cancel = self.cancel.clone();
//
// Create Reading future.
//
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_messages = {
let cancel = self.cancel.child_token();
@@ -1143,6 +1157,10 @@ impl PageServerHandler {
}
.instrument(tracing::info_span!("read_messages"));
//
// Create Batching future.
//
enum BatchState {
Building(Option<Box<BatchedFeMessage>>),
ReadMessagesEnded(Option<Box<BatchedFeMessage>>),
@@ -1238,6 +1256,10 @@ impl PageServerHandler {
}
.instrument(tracing::info_span!("batcher"));
//
// Create Executor future.
//
let executor = async {
let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| {
debug!("exiting");
@@ -1283,25 +1305,44 @@ impl PageServerHandler {
Ok(())
};
//
// Execute the stages until they exit.
//
// We can either run the pipeline as concurrent futures or we can
// run it in separate tokio tasks.
//
// In any case, we need to be responsive to cancellation (self.cancel).
// The style chosen here is that cancellation must propagate through the
// pipeline: if any stage dies, the whole pipeline dies.
//
// If the client communicates intent to end the pagestream sub-protocol,
// the Reader stage shuts down the pipeline cleanly by sending a `None`
// through the pipeline, resulting in all stages exiting cleanly after
// the last response has been produced.
//
// Unclean pipeline shutdown is initiated by Reader or Executor returning
// a QueryError. This bubbles up to the caller, which will shut down the connection.
macro_rules! with_noise_on_slow_cancel {
($fut:ident) => {
timed_after_cancellation(
$fut,
std::stringify!($fut),
Duration::from_millis(100),
&cancel,
)
};
}
let read_messages_res;
let executor_res;
match protocol_pipelining_mode {
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
(read_messages_res, _, executor_res) = {
macro_rules! timed {
($fut:expr, $what:literal) => {
timed_after_cancellation(
$fut,
$what,
Duration::from_millis(100),
&cancel,
)
};
}
tokio::join!(
timed!(read_messages, "read-messages"),
timed!(batcher, "batcher"),
timed!(executor, "executor"),
with_noise_on_slow_cancel!(read_messages),
with_noise_on_slow_cancel!(batcher),
with_noise_on_slow_cancel!(executor),
)
}
}
@@ -1310,13 +1351,17 @@ impl PageServerHandler {
let read_messages_task = tokio::task::spawn(read_messages);
// cancelled when it observes read_messages_task disconnect the channel
let batcher_task = tokio::task::spawn(batcher);
executor_res = executor.await;
read_messages_res = read_messages_task
.await
let read_messages_task_res;
let batcher_task_res;
(read_messages_task_res, batcher_task_res, executor_res) = tokio::join!(
with_noise_on_slow_cancel!(read_messages_task),
with_noise_on_slow_cancel!(batcher_task),
with_noise_on_slow_cancel!(executor), // not in a separate task
);
read_messages_res = read_messages_task_res
.context("read_messages task panicked, check logs for details")?;
let _: () = batcher_task
.await
.context("batcher task panicked, check logs for details")?;
let _: () =
batcher_task_res.context("batcher task panicked, check logs for details")?;
}
}