From 18ffaba97574a3e0065f72f81c2fda0a7db8b957 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 26 Nov 2024 20:25:10 +0100 Subject: [PATCH] fix pipeline cancellation --- pageserver/src/page_service.rs | 199 ++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 89 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c8ea3a8ca7..3292a7b510 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -316,6 +316,7 @@ struct PageServerHandler { connection_ctx: RequestContext, cancel: CancellationToken, + gate: utils::sync::gate::Gate, /// None only while pagestream protocol is being processed. timeline_handles: Option, @@ -581,6 +582,7 @@ impl PageServerHandler { connection_ctx, timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, + gate: Default::default(), pipelining_config, } } @@ -818,6 +820,7 @@ impl PageServerHandler { &mut self, pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, + cancel: &CancellationToken, ctx: &RequestContext, ) -> Result<(), QueryError> where @@ -944,7 +947,7 @@ impl PageServerHandler { } tokio::select! { biased; - _ = self.cancel.cancelled() => { + _ = cancel.cancelled() => { // We were requested to shut down. info!("shutdown request received in page handler"); return Err(QueryError::Shutdown) @@ -1054,13 +1057,14 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { + let cancel = self.cancel.clone(); loop { let msg = Self::pagestream_read_message( &mut pgb_reader, tenant_id, timeline_id, &mut timeline_handles, - &self.cancel, + &cancel, ctx, request_span.clone(), ) @@ -1072,7 +1076,7 @@ impl PageServerHandler { return Ok((pgb_reader, timeline_handles)); } }; - self.pagesteam_handle_batched_message(pgb_writer, *msg, ctx) + self.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx) .await?; } } @@ -1107,60 +1111,92 @@ impl PageServerHandler { protocol_pipelining_mode, } = pipelining_config; - let cancel = self.cancel.clone(); + // Create a CancellationToken for the pipeline. + // And make any return/panic from this function signal that cancellation. + let (cancel, _drop_guard) = { + let cancel = self.cancel.child_token(); + (cancel.clone(), cancel.drop_guard()) + }; + + // Macro to _define_ a pipeline stage. + // + // The stage is a future. + // It need not be cancellation-safe. + // It receives a child token of `cancel` and a child RequestContext as an argument. + // + // When a stage exits all other stages will be signalled to cancel. + macro_rules! pipeline_stage { + ($name:literal, $make_fut:expr) => {{ + let cancel = cancel.clone(); + let ctx = ctx.attached_child(); + let stage_fut = $make_fut(cancel.child_token(), ctx); + async move { + scopeguard::defer! { + debug!("exiting"); + } + let _cancel_pipeline_on_stage_exit = cancel.clone().drop_guard(); + timed_after_cancellation( + stage_fut, + std::stringify!($name), + Duration::from_millis(100), + &cancel, + ) + .await + } + .instrument(tracing::info_span!($name)) + }}; + } + + // Grab gate guards before `self` gets borrowed by the executor stage. + let gate_guard_1 = self.gate.enter().map_err(|_| QueryError::Shutdown)?; + let gate_guard_2 = self.gate.enter().map_err(|_| QueryError::Shutdown)?; // // Create Reading future. // let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_messages = { - let cancel = self.cancel.child_token(); - let ctx = ctx.attached_child(); - async move { - scopeguard::defer! { - debug!("exiting"); - } - let mut pgb_reader = pgb_reader; - loop { - let msg = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel, - &ctx, - request_span.clone(), - ) - .await?; - let msg = match msg { - Some(msg) => msg, - None => { - debug!("pagestream subprotocol end observed"); - break; - } - }; - match requests_tx.send(msg).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("downstream is gone"); - break; - } + let read_messages = pipeline_stage!("read_messages", move |cancel, ctx| async move { + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + break; + } + }; + // No need to be sensitive to `cancel` here because downstream is. + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("downstream is gone"); + break; } } - Ok((pgb_reader, timeline_handles)) } - } - .instrument(tracing::info_span!("read_messages")); + Ok((pgb_reader, timeline_handles)) + }); // // Create Batching future. // + // Explicit sensitivity to `cancel` is not needed because the only + // two await points are channel recv & send, both of which will complete + // as soon as the upstream sender / downstream receivers are dropped. + // let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let batcher = async move { - scopeguard::defer! { - debug!("exiting"); - } + let batcher = pipeline_stage!("batcher", move |_cancel, _ctx| async move { loop { let maybe_req = requests_rx.recv().await; let Some(req) = maybe_req else { @@ -1179,84 +1215,69 @@ impl PageServerHandler { } } } - } - .instrument(tracing::info_span!("batcher")); + }); // // Create Executor future. // - let executor = async { - scopeguard::defer! { - debug!("exiting"); - }; + let executor = pipeline_stage!("executor", move |cancel, ctx| async move { loop { - let batch = match batch_rx.recv().await { + let maybe_batch = batch_rx + .recv() + // no need to be sensitive to `cancel` because upstrema stages are + .await; + let batch = match maybe_batch { Ok(batch) => batch, Err(spsc_fold::RecvError::SenderGone) => { debug!("upstream gone"); break; } }; - self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx) + self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx) .await?; } Ok(()) - } - .instrument(tracing::info_span!("executor")); + }); // - // Execute the stages until they exit. + // Execute the stages. // // We can either run the pipeline as concurrent futures or we can // run it in separate tokio tasks. // - // In any case, we need to be responsive to cancellation (self.cancel). - // The style chosen here is that cancellation must propagate through the - // pipeline: if any stage dies, the whole pipeline dies. + // In any way, we wait for all stages to exit. + // The pipeline_stage! machinery ensures cancellation signalling, + // stages are responsible for being responsive to it. // - // If the client communicates intent to end the pagestream sub-protocol, - // the Reader stage shuts down the pipeline cleanly by sending a `None` - // through the pipeline, resulting in all stages exiting cleanly after - // the last response has been produced. - // - // Unclean pipeline shutdown is initiated by Reader or Executor returning - // a QueryError. This bubbles up to the caller, which will shut down the connection. - - macro_rules! with_noise_on_slow_cancel { - ($fut:ident) => { - timed_after_cancellation( - $fut, - std::stringify!($fut), - Duration::from_millis(100), - &cancel, - ) - }; - } + // The behavior of the case where the client requests clean shutdown + // is not well defined right now. let read_messages_res; let executor_res; match protocol_pipelining_mode { PageServiceProtocolPipeliningMode::ConcurrentFutures => { - (read_messages_res, _, executor_res) = { - tokio::join!( - with_noise_on_slow_cancel!(read_messages), - with_noise_on_slow_cancel!(batcher), - with_noise_on_slow_cancel!(executor), - ) - } + (read_messages_res, _, executor_res) = + tokio::join!(read_messages, batcher, executor) } PageServiceProtocolPipeliningMode::Tasks => { - // cancelled via sensitivity to self.cancel - let read_messages_task = tokio::task::spawn(read_messages); - // cancelled when it observes read_messages_task disconnect the channel - let batcher_task = tokio::task::spawn(batcher); + macro_rules! spawn_with_gate_guard { + ($guard:expr, $fut:expr) => {{ + tokio::task::spawn(async move { + let res = $fut.await; + drop($guard); + res + }) + }}; + } + let read_messages_task = spawn_with_gate_guard!(gate_guard_1, read_messages); + let batcher_task = spawn_with_gate_guard!(gate_guard_2, batcher); let read_messages_task_res; let batcher_task_res; (read_messages_task_res, batcher_task_res, executor_res) = tokio::join!( - with_noise_on_slow_cancel!(read_messages_task), - with_noise_on_slow_cancel!(batcher_task), - with_noise_on_slow_cancel!(executor), // not in a separate task + read_messages_task, + batcher_task, + executor, // not in a separate task ); read_messages_res = read_messages_task_res .context("read_messages task panicked, check logs for details")?;