From 99b664c9ed7e95b3013a81cd271c7c835fc468a7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 25 Nov 2024 11:51:58 +0100 Subject: [PATCH] expand fix to tasks mode; add some comments --- pageserver/src/page_service.rs | 83 ++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7316a839e6..bbe90893cc 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1081,6 +1081,9 @@ impl PageServerHandler { } } + /// # Cancel-Safety + /// + /// May leak tokio tasks if not polled to completion. #[allow(clippy::too_many_arguments)] async fn handle_pagerequests_pipelined( &mut self, @@ -1096,6 +1099,13 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { + // + // We construct a pipeline of + // - Reading: read messages from pgb + // - Batching: fill the current batch + // - Execution: take the current batch, execute it using get_vectored, and send the response. + // + let PageServicePipeliningConfig { max_batch_size, protocol_pipelining_mode, @@ -1103,6 +1113,10 @@ impl PageServerHandler { let cancel = self.cancel.clone(); + // + // Create Reading future. + // + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let read_messages = { let cancel = self.cancel.child_token(); @@ -1143,6 +1157,10 @@ impl PageServerHandler { } .instrument(tracing::info_span!("read_messages")); + // + // Create Batching future. + // + enum BatchState { Building(Option>), ReadMessagesEnded(Option>), @@ -1238,6 +1256,10 @@ impl PageServerHandler { } .instrument(tracing::info_span!("batcher")); + // + // Create Executor future. + // + let executor = async { let _guard = scopeguard::guard(batch_rx.clone(), |batch_rx| { debug!("exiting"); @@ -1283,25 +1305,44 @@ impl PageServerHandler { Ok(()) }; + // + // Execute the stages until they exit. + // + // We can either run the pipeline as concurrent futures or we can + // run it in separate tokio tasks. + // + // In any case, we need to be responsive to cancellation (self.cancel). + // The style chosen here is that cancellation must propagate through the + // pipeline: if any stage dies, the whole pipeline dies. + // + // If the client communicates intent to end the pagestream sub-protocol, + // the Reader stage shuts down the pipeline cleanly by sending a `None` + // through the pipeline, resulting in all stages exiting cleanly after + // the last response has been produced. + // + // Unclean pipeline shutdown is initiated by Reader or Executor returning + // a QueryError. This bubbles up to the caller, which will shut down the connection. + + macro_rules! with_noise_on_slow_cancel { + ($fut:ident) => { + timed_after_cancellation( + $fut, + std::stringify!($fut), + Duration::from_millis(100), + &cancel, + ) + }; + } + let read_messages_res; let executor_res; match protocol_pipelining_mode { PageServiceProtocolPipeliningMode::ConcurrentFutures => { (read_messages_res, _, executor_res) = { - macro_rules! timed { - ($fut:expr, $what:literal) => { - timed_after_cancellation( - $fut, - $what, - Duration::from_millis(100), - &cancel, - ) - }; - } tokio::join!( - timed!(read_messages, "read-messages"), - timed!(batcher, "batcher"), - timed!(executor, "executor"), + with_noise_on_slow_cancel!(read_messages), + with_noise_on_slow_cancel!(batcher), + with_noise_on_slow_cancel!(executor), ) } } @@ -1310,13 +1351,17 @@ impl PageServerHandler { let read_messages_task = tokio::task::spawn(read_messages); // cancelled when it observes read_messages_task disconnect the channel let batcher_task = tokio::task::spawn(batcher); - executor_res = executor.await; - read_messages_res = read_messages_task - .await + let read_messages_task_res; + let batcher_task_res; + (read_messages_task_res, batcher_task_res, executor_res) = tokio::join!( + with_noise_on_slow_cancel!(read_messages_task), + with_noise_on_slow_cancel!(batcher_task), + with_noise_on_slow_cancel!(executor), // not in a separate task + ); + read_messages_res = read_messages_task_res .context("read_messages task panicked, check logs for details")?; - let _: () = batcher_task - .await - .context("batcher task panicked, check logs for details")?; + let _: () = + batcher_task_res.context("batcher task panicked, check logs for details")?; } }