diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3292a7b510..2a7ba8d97b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1105,98 +1105,118 @@ impl PageServerHandler { // - Batching: fill the current batch // - Execution: take the current batch, execute it using get_vectored, and send the response. // + // The stages synchronize through channels. + // + // CODING RULES FOR CANCELLATION + // + // The channels propagate cancellation of the pipeline if any one stage exits. + // If a given stage exists, then ... + // - ... its downstream eventually exits because downstream's recv() fails and + // - ... its upstream eventually exists because upstream's send() fails. + // + // A stage will not observe propagated cancellation through channels while + // 1. there's still data in the channel (channel recv succeeds), or + // 2. while it is `await`ing a future that is not its upstream/downstream channel. + // That is intentional: we always want to run the pipeline empty. + // + // The coding discipline from the parent function still stands, though: + // any interaction with the client connection (pgb) must be sensitive to + // `self.cancel`, so that we can shut down page_service quickly. + // + // Let's walk through the common cases of pipeline shutdown: + // + // Client-initiated shutdown: the client ends the CopyBoth session, making + // the Reading stage exit with Ok(()). This in turn makes the Batching stage + // exit with Ok(()), and the Executor stage processes the remaining batch from + // the spsc_fold. Then the Executor stage exits with Ok(()). + // + // Server-initiated shutdown through self.cancel: + // - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message, + // it will exit with Err(QueryError::Shutdown). + // - Case 2: If the Reading stage is waiting on its downstream (send to Batching), + // it follows that Batching is waiting for Executor. + // Executor will observe self.cancel when it sends the response, and exit with Err(QueryError::Shutdown). + // + // In either case, a task exits, which makes the other tasks in the pipeline exit. + // + // Server-initiated shutdown through Timeline::cancel: + // - Case 1: If the Reading stage observes Timeline::cancel via timeline_handles + // when it builds the BatchedFeMessage, it will exit with Err(QueryError::Shutdown). + // - Case 2: If the Executor stage observes Timeline::cancel when it uses the + // handle that's stored in the BatchedFeMessage to execute the request, + // the `handle_*` function will fail with an error that bubbles up and results in + // the Executor stage exiting with Err(QueryError::Shutdown). + // + // Panic in a stage: the stage drops its channel end. let PageServicePipeliningConfig { max_batch_size, protocol_pipelining_mode, } = pipelining_config; - // Create a CancellationToken for the pipeline. - // And make any return/panic from this function signal that cancellation. - let (cancel, _drop_guard) = { - let cancel = self.cancel.child_token(); - (cancel.clone(), cancel.drop_guard()) - }; - // Macro to _define_ a pipeline stage. - // - // The stage is a future. - // It need not be cancellation-safe. - // It receives a child token of `cancel` and a child RequestContext as an argument. - // - // When a stage exits all other stages will be signalled to cancel. macro_rules! pipeline_stage { ($name:literal, $make_fut:expr) => {{ - let cancel = cancel.clone(); - let ctx = ctx.attached_child(); - let stage_fut = $make_fut(cancel.child_token(), ctx); + let stage_fut = $make_fut; async move { scopeguard::defer! { debug!("exiting"); } - let _cancel_pipeline_on_stage_exit = cancel.clone().drop_guard(); - timed_after_cancellation( - stage_fut, - std::stringify!($name), - Duration::from_millis(100), - &cancel, - ) - .await + stage_fut.await } .instrument(tracing::info_span!($name)) }}; } - // Grab gate guards before `self` gets borrowed by the executor stage. - let gate_guard_1 = self.gate.enter().map_err(|_| QueryError::Shutdown)?; - let gate_guard_2 = self.gate.enter().map_err(|_| QueryError::Shutdown)?; - // // Create Reading future. // let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_messages = pipeline_stage!("read_messages", move |cancel, ctx| async move { - let mut pgb_reader = pgb_reader; - loop { - let msg = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel, - &ctx, - request_span.clone(), - ) - .await?; - let msg = match msg { - Some(msg) => msg, - None => { - debug!("pagestream subprotocol end observed"); - break; - } - }; - // No need to be sensitive to `cancel` here because downstream is. - match requests_tx.send(msg).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("downstream is gone"); - break; + let read_messages = pipeline_stage!("read_messages", { + let cancel = self.cancel.clone(); + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + break; + } + }; + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("downstream is gone"); + break; + } } } + // Make downstream exit after we exit. + // Explicit drop here is for robustness in future refactors. + drop(requests_tx); + Ok((pgb_reader, timeline_handles)) } - Ok((pgb_reader, timeline_handles)) }); // // Create Batching future. // - // Explicit sensitivity to `cancel` is not needed because the only - // two await points are channel recv & send, both of which will complete - // as soon as the upstream sender / downstream receivers are dropped. - // + let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let batcher = pipeline_stage!("batcher", move |_cancel, _ctx| async move { + let batcher = pipeline_stage!("batcher", async move { loop { let maybe_req = requests_rx.recv().await; let Some(req) = maybe_req else { @@ -1215,29 +1235,36 @@ impl PageServerHandler { } } } + // Make downstream exit after we exit. + // Explicit drop here is for robustness in future refactors. + drop(batch_tx); }); // // Create Executor future. // - let executor = pipeline_stage!("executor", move |cancel, ctx| async move { - loop { - let maybe_batch = batch_rx - .recv() - // no need to be sensitive to `cancel` because upstrema stages are - .await; - let batch = match maybe_batch { - Ok(batch) => batch, - Err(spsc_fold::RecvError::SenderGone) => { - debug!("upstream gone"); - break; - } - }; - self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx) - .await?; + let executor = pipeline_stage!("executor", { + let cancel = self.cancel.clone(); + let ctx = ctx.attached_child(); + async move { + loop { + let maybe_batch = batch_rx.recv().await; + let batch = match maybe_batch { + Ok(batch) => batch, + Err(spsc_fold::RecvError::SenderGone) => { + debug!("upstream gone"); + break; + } + }; + self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx) + .await?; + } + // Make upstreams exit after we exit. + // Explicit drop here is for robustness in future refactors. + drop(batch_rx); + Ok(()) } - Ok(()) }); // @@ -1247,11 +1274,9 @@ impl PageServerHandler { // run it in separate tokio tasks. // // In any way, we wait for all stages to exit. - // The pipeline_stage! machinery ensures cancellation signalling, - // stages are responsible for being responsive to it. // - // The behavior of the case where the client requests clean shutdown - // is not well defined right now. + // See the top of this function for why all stages exit quickly + // if one of them does. let read_messages_res; let executor_res; @@ -1261,17 +1286,11 @@ impl PageServerHandler { tokio::join!(read_messages, batcher, executor) } PageServiceProtocolPipeliningMode::Tasks => { - macro_rules! spawn_with_gate_guard { - ($guard:expr, $fut:expr) => {{ - tokio::task::spawn(async move { - let res = $fut.await; - drop($guard); - res - }) - }}; - } - let read_messages_task = spawn_with_gate_guard!(gate_guard_1, read_messages); - let batcher_task = spawn_with_gate_guard!(gate_guard_2, batcher); + // NB: the assumption is that this function is polled to completion. + // So, no need to keep track of these task handles in a JoinSet / via GateGuard. + // This does not actually hold if we're panicking, but that reduces to the AsyncDrop problem. + let read_messages_task = tokio::task::spawn(read_messages); + let batcher_task = tokio::task::spawn(batcher); let read_messages_task_res; let batcher_task_res; (read_messages_task_res, batcher_task_res, executor_res) = tokio::join!(