From 82e1fa3f83f3a2607870d2c12578df4c2b410abc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 27 Nov 2024 12:31:56 +0100 Subject: [PATCH] WIP --- pageserver/src/page_service.rs | 80 +++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7d7679a68c..bfe84802a2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1102,35 +1102,37 @@ impl PageServerHandler { // // We construct a pipeline of // - Reading: read messages from pgb - // - Batching: fill the current batch + // - Batching: batch the messages if possible // - Execution: take the current batch, execute it using get_vectored, and send the response. // - // The stages synchronize through channels. + // The stages synchronized through channels. // // CODING RULES FOR CANCELLATION // - // The channels propagate cancellation of the pipeline if any one stage exits. - // If a given stage exists, then ... - // - ... its downstream eventually exits because downstream's recv() fails and - // - ... its upstream eventually exists because upstream's send() fails. + // The overall pipeline has a CancellationToken that is a child of `self.cancel`. + // Each pipeline stage receives a child token of the pipeline's CancellationToken. + // Every pipeline stage is sensitive to it on all `.await`s except + // when the stage is waiting on its upstream or downstream channel, where cancellation + // is signalled through channel disconnection from/to the upstream/downstream. // - // A stage will not observe propagated cancellation through channels while - // 1. there's still data in the channel (channel recv succeeds), or - // 2. while it is `await`ing a future that is not its upstream/downstream channel. - // That is intentional: we always want to run the pipeline empty. + // When any pipeline stage exits with Err(), the pipeline CancellationToken gets + // cancelled via drop guard. This causes all other stages to exit soon after. // - // The coding discipline from the parent function still stands, though: - // any interaction with the client connection (pgb) must be sensitive to - // `self.cancel`, so that we can shut down page_service quickly. + // When a pipeline stage exits with Ok(), the stage's drop guard is disarmed. + // This allows other stages to wrap up cleanly. // - // Let's walk through the common cases of pipeline shutdown: + // Let's walk through the common cases of pipeline shutdown to test this model: // // Client-initiated shutdown: the client ends the CopyBoth session, making // the Reading stage exit with Ok(()). This in turn makes the Batching stage // exit with Ok(()), and the Executor stage processes the remaining batch from - // the spsc_fold. Then the Executor stage exits with Ok(()). + // the spsc_fold. Then the Executor stage exits with Ok(()). At no point was + // the pipeline CancellationToken cancelled. + // + // Server-initiated shutdown through self.cancel: the pipeline CancellationToken + // is a child token of self.cancel. All stages will exit promptly + // - // Server-initiated shutdown through self.cancel: // - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message, // it will exit with Err(QueryError::Shutdown). // - Case 2: If the Reading stage is waiting on its downstream (send to Batching), @@ -1149,18 +1151,24 @@ impl PageServerHandler { // handle that's stored in the BatchedFeMessage to execute the request, // the `handle_*` function will fail with an error that bubbles up and results in // the Executor stage exiting with Err(QueryError::Shutdown). - // - // Panic in a stage: the stage drops its channel end. let PageServicePipeliningConfig { max_batch_size, protocol_pipelining_mode, } = pipelining_config; + // Cancellation root for the pipeline. + // If any one stage exits, this gets cancelled. + let cancel = self.cancel.child_token(); + // Macro to _define_ a pipeline stage. macro_rules! pipeline_stage { ($name:literal, $make_fut:expr) => {{ - let stage_fut = $make_fut; + // Give each stage a child token to avoid lock contention in `tasks` mode. + let stage_fut = $make_fut(cancel.child_token()); + // Cancel the pipeline if the stage exits with an error. + // If it exits cleanly, the cancellation should just bubble through the pipeline. + let cancel_pipeline = cancel.clone().drop_guard(); async move { scopeguard::defer! { debug!("exiting"); @@ -1176,8 +1184,7 @@ impl PageServerHandler { // let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_messages = pipeline_stage!("read_messages", { - let cancel = self.cancel.clone(); + let read_messages = pipeline_stage!("read_messages", move |cancel| { let ctx = ctx.attached_child(); async move { let mut pgb_reader = pgb_reader; @@ -1219,7 +1226,7 @@ impl PageServerHandler { // let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let batcher = pipeline_stage!("batcher", async move { + let batcher = pipeline_stage!("batcher", move |_cancel| async move { loop { let maybe_req = requests_rx.recv().await; let Some(req) = maybe_req else { @@ -1247,8 +1254,7 @@ impl PageServerHandler { // Create Executor future. // - let executor = pipeline_stage!("executor", { - let cancel = self.cancel.clone(); + let executor = pipeline_stage!("executor", |cancel| { let ctx = ctx.attached_child(); async move { loop { @@ -1282,25 +1288,23 @@ impl PageServerHandler { // if one of them does. let read_messages_res; + let batcher_res; let executor_res; match protocol_pipelining_mode { PageServiceProtocolPipeliningMode::ConcurrentFutures => { - (read_messages_res, _, executor_res) = + (read_messages_res, batcher_res, executor_res) = tokio::join!(read_messages, batcher, executor) } PageServiceProtocolPipeliningMode::Tasks => { - // NB: the assumption is that this function is polled to completion. - // So, no need to keep track of these task handles in a JoinSet / via GateGuard. - // This does not actually hold if we're panicking, but that reduces to the AsyncDrop problem. + // We must run all tasks to completion and not panic; otherwise we leak the tasks. let read_messages_task = tokio::task::spawn(read_messages); let batcher_task = tokio::task::spawn(batcher); + let executor_task = tokio::task::spawn(executor); let read_messages_task_res; let batcher_task_res; - (read_messages_task_res, batcher_task_res, executor_res) = tokio::join!( - read_messages_task, - batcher_task, - executor, // not in a separate task - ); + let executor_task_res; + (read_messages_task_res, batcher_task_res, executor_task_res) = + tokio::join!(read_messages_task, batcher_task, executor_task); read_messages_res = read_messages_task_res .context("read_messages task panicked, check logs for details")?; let _: () = @@ -1308,8 +1312,16 @@ impl PageServerHandler { } } + if let Err(batcher_err) = batcher_res { + warn!(error=?batcher_err, "batcher exited with error, this is unexpected"); + } + match (read_messages_res, executor_res) { - (Err(e), _) | (_, Err(e)) => { + (Err(e), _) => { + let e: QueryError = e; + Err(e) // + } + (_, Err(e)) => { let e: QueryError = e; Err(e) }