From 9a5611a5ef58801d46f5ed1ffa860976f673c125 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 29 Nov 2024 11:39:16 +0100 Subject: [PATCH] merge reader&batcher stages, update docs --- pageserver/src/page_service.rs | 241 ++++++++++++++------------------- 1 file changed, 101 insertions(+), 140 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e33d2c22d4..002ad9ddbd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1114,72 +1114,64 @@ impl PageServerHandler { IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { // - // We construct a pipeline of - // - Reading: read messages from pgb - // - Batching: batch the messages if possible - // - Execution: take the current batch, execute it using get_vectored, and send the response. + // Pipelined pagestream handling consists of + // - a Batcher that reads requests off the wire and + // and batches them if possible, + // - an Executor that processes the batched requests. // - // The stages synchronized through channels. + // The batch is built up inside an `spsc_fold` channel, + // shared betwen Batcher (Sender) and Executor (Receiver). // - // CODING RULES FOR CANCELLATION + // The Batcher continously folds client requests into the batch, + // while the Executor can at any time take out what's in the batch + // in order to process it. + // This means the next batch builds up while the Executor + // executes the last batch. // - // The overall pipeline has a CancellationToken that is a child of `self.cancel`. - // Each pipeline stage receives a child token of the pipeline's CancellationToken. - // Every pipeline stage is sensitive to it on all `.await`s except - // when the stage is waiting on its upstream or downstream channel, where cancellation - // is signalled through channel disconnection from/to the upstream/downstream. + // CANCELLATION // - // When any pipeline stage exits with Err(), the pipeline CancellationToken gets - // cancelled via drop guard. This causes all other stages to exit soon after. + // We run both Batcher and Executor futures to completion before + // returning from this function. // - // When a pipeline stage exits with Ok(), the stage's drop guard is disarmed. - // This allows other stages to wrap up cleanly. + // If Executor exits first, it signals cancellation to the Batcher + // via a CancellationToken that is child of `self.cancel`. + // If Batcher exits first, it signals cancellation to the Executor + // by dropping the spsc_fold channel Sender. // - // Let's walk through the common cases of pipeline shutdown to test this model: + // CLEAN SHUTDOWN // - // Client-initiated shutdown: the client ends the CopyBoth session, making - // the Reading stage exit with Ok(()). This in turn makes the Batching stage - // exit with Ok(()), and the Executor stage processes the remaining batch from - // the spsc_fold. Then the Executor stage exits with Ok(()). At no point was - // the pipeline CancellationToken cancelled. + // Clean shutdown means that the client ends the COPYBOTH session. + // In response to such a client message, the Batcher exits. + // The Executor continues to run, draining the spsc_fold channel. + // Once drained, the spsc_fold recv will fail with a distinct error + // indicating that the sender disconnected. + // The Executor exits with Ok(()) in response to that error. // - // Server-initiated shutdown through self.cancel: the pipeline CancellationToken - // is a child token of self.cancel. All stages will exit promptly - + // Server initiated shutdown is not clean shutdown, but instead + // is an error Err(QueryError::Shutdown) that is propagated through + // error propagation. // - // - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message, - // it will exit with Err(QueryError::Shutdown). - // - Case 2: If the Reading stage is waiting on its downstream (send to Batching), - // it follows that Batching is waiting for Executor. - // Executor will observe self.cancel when it sends the response, and exit with Err(QueryError::Shutdown). - // - Case 3: the Executor stage observes self.cancel and exits with Err() while the Reading - // stage is waiting for a message from the client. If no message from the client arrives, - // the Reading stage will never exit. + // ERROR PROPAGATION // - // In either case, a task exits, which makes the other tasks in the pipeline exit. + // When the Batcher encounter an error, it sends it as a value + // through the spsc_fold channel and exits afterwards. + // When the Executor observes such an error in the channel, + // it exits returning that error value. // - // Server-initiated shutdown through Timeline::cancel: - // - Case 1: If the Reading stage observes Timeline::cancel via timeline_handles - // when it builds the BatchedFeMessage, it will exit with Err(QueryError::Shutdown). - // - Case 2: If the Executor stage observes Timeline::cancel when it uses the - // handle that's stored in the BatchedFeMessage to execute the request, - // the `handle_*` function will fail with an error that bubbles up and results in - // the Executor stage exiting with Err(QueryError::Shutdown). + // This design ensures that the Executor stage will still process + // the batch that was in flight when the Batcher encountered an error, + // thereby beahving identical to a serial implementation. let PageServicePipeliningConfig { max_batch_size, protocol_pipelining_mode, } = pipelining_config; - // Cancellation root for the pipeline. - // If any one stage exits, this gets cancelled. - let cancel = self.cancel.child_token(); - // Macro to _define_ a pipeline stage. macro_rules! pipeline_stage { - ($name:literal, $make_fut:expr) => {{ - let stage_fut = $make_fut; - let cancel = cancel.clone(); + ($name:literal, $cancel:expr, $make_fut:expr) => {{ + let cancel: CancellationToken = $cancel; + let stage_fut = $make_fut(cancel.clone()); async move { scopeguard::defer! { debug!("exiting"); @@ -1192,102 +1184,79 @@ impl PageServerHandler { } // - // Create Reading future. - // - - let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_messages = pipeline_stage!("read_messages", { - let cancel = cancel.clone(); - let ctx = ctx.attached_child(); - async move { - let mut pgb_reader = pgb_reader; - let mut exit = false; - while !exit { - let res = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel, - &ctx, - request_span.clone(), - ) - .await; - exit |= res.is_err(); - match requests_tx.send(res).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("downstream is gone"); - break; - } - } - } - (pgb_reader, timeline_handles) - } - }); - - // - // Create Batching future. + // Batcher // + let cancel_batcher = self.cancel.child_token(); enum Batch { Request(Box), ReadError(QueryError), } let (mut batch_tx, mut batch_rx) = spsc_fold::channel::(); - let batcher = pipeline_stage!("batcher", async move { - let mut exit = false; - while !exit { - let maybe_req = requests_rx.recv().await; - let Some(read_res) = maybe_req else { - debug!("upstream is gone"); - break; - }; - let send_res = match read_res { - Ok(None) => { - debug!("upstream end of sub-protocol"); - break; - } - Ok(Some(req)) => { - batch_tx - .send(Batch::Request(req), |batch, req| match (batch, req) { - (Batch::Request(ref mut batch), Batch::Request(req)) => { - Self::pagestream_do_batch(max_batch_size, batch, req) - .map_err(Batch::Request) - } - (Batch::Request(_), x @ Batch::ReadError(_)) => Err(x), - (Batch::ReadError(_), Batch::Request(_) | Batch::ReadError(_)) => { - unreachable!("we exit from batcher after storing a read error"); - } - }) - .await - } - Err(e) => { - exit = true; - batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await - } - }; - match send_res { - Ok(()) => {} - Err(spsc_fold::SendError::ReceiverGone) => { - debug!("downstream is gone"); - break; + let read_messages = pipeline_stage!( + "read_messages", + cancel_batcher.clone(), + move |cancel_batcher| { + let ctx = ctx.attached_child(); + async move { + let mut pgb_reader = pgb_reader; + let mut exit = false; + while !exit { + let res = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel_batcher, + &ctx, + request_span.clone(), + ) + .await; + exit |= res.is_err(); + let send_res = match res { + Ok(None) => { + debug!("sub-protocol client-initiated shutdown"); + break; + } + Ok(Some(req)) => { + batch_tx + .send(Batch::Request(req), |batch, req| match (batch, req) { + (Batch::Request(ref mut batch), Batch::Request(req)) => { + Self::pagestream_do_batch(max_batch_size, batch, req) + .map_err(Batch::Request) + } + (Batch::Request(_), x @ Batch::ReadError(_)) => Err(x), + ( + Batch::ReadError(_), + Batch::Request(_) | Batch::ReadError(_), + ) => { + unreachable!( + "we exit from batcher after storing a read error" + ); + } + }) + .await + } + Err(e) => { + exit = true; + batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await + } + }; + exit |= send_res.is_err(); } + (pgb_reader, timeline_handles) } } - }); + ); // - // Create Executor future. + // Executor // - let executor = pipeline_stage!("executor", { - let cancel = cancel.clone(); + let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| { let ctx = ctx.attached_child(); async move { - scopeguard::defer! { - cancel.cancel(); - }; + let _cancel_batcher = cancel_batcher.drop_guard(); loop { let maybe_batch = batch_rx.recv().await; let batch = match maybe_batch { @@ -1316,29 +1285,21 @@ impl PageServerHandler { // Execute the stages. // - let read_messages_res; - let _batcher_res: (); - let executor_res: Result<(), QueryError>; match protocol_pipelining_mode { PageServiceProtocolPipeliningMode::ConcurrentFutures => { - (read_messages_res, _batcher_res, executor_res) = - tokio::join!(read_messages, batcher, executor); + tokio::join!(read_messages, executor) } PageServiceProtocolPipeliningMode::Tasks => { // These tasks are not tracked anywhere. let read_messages_task = tokio::spawn(read_messages); - let batcher_task = tokio::spawn(batcher); - let (read_messages_task_res, batcher_task_res, executor_res_) = - tokio::join!(read_messages_task, batcher_task, executor,); - (read_messages_res, _batcher_res, executor_res) = ( + let (read_messages_task_res, executor_res_) = + tokio::join!(read_messages_task, executor,); + ( read_messages_task_res.expect("propagated panic from read_messages"), - batcher_task_res.expect("propagated panic from batcher"), executor_res_, - ); + ) } } - - (read_messages_res, executor_res) } /// Helper function to handle the LSN from client request.