explain the pipeline cancellation story

This commit is contained in:
Christian Schwarz
2024-11-27 10:13:51 +01:00
parent 18ffaba975
commit e0123c8a80

View File

@@ -1105,98 +1105,118 @@ impl PageServerHandler {
// - Batching: fill the current batch
// - Execution: take the current batch, execute it using get_vectored, and send the response.
//
// The stages synchronize through channels.
//
// CODING RULES FOR CANCELLATION
//
// The channels propagate cancellation of the pipeline if any one stage exits.
// If a given stage exists, then ...
// - ... its downstream eventually exits because downstream's recv() fails and
// - ... its upstream eventually exists because upstream's send() fails.
//
// A stage will not observe propagated cancellation through channels while
// 1. there's still data in the channel (channel recv succeeds), or
// 2. while it is `await`ing a future that is not its upstream/downstream channel.
// That is intentional: we always want to run the pipeline empty.
//
// The coding discipline from the parent function still stands, though:
// any interaction with the client connection (pgb) must be sensitive to
// `self.cancel`, so that we can shut down page_service quickly.
//
// Let's walk through the common cases of pipeline shutdown:
//
// Client-initiated shutdown: the client ends the CopyBoth session, making
// the Reading stage exit with Ok(()). This in turn makes the Batching stage
// exit with Ok(()), and the Executor stage processes the remaining batch from
// the spsc_fold. Then the Executor stage exits with Ok(()).
//
// Server-initiated shutdown through self.cancel:
// - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message,
// it will exit with Err(QueryError::Shutdown).
// - Case 2: If the Reading stage is waiting on its downstream (send to Batching),
// it follows that Batching is waiting for Executor.
// Executor will observe self.cancel when it sends the response, and exit with Err(QueryError::Shutdown).
//
// In either case, a task exits, which makes the other tasks in the pipeline exit.
//
// Server-initiated shutdown through Timeline::cancel:
// - Case 1: If the Reading stage observes Timeline::cancel via timeline_handles
// when it builds the BatchedFeMessage, it will exit with Err(QueryError::Shutdown).
// - Case 2: If the Executor stage observes Timeline::cancel when it uses the
// handle that's stored in the BatchedFeMessage to execute the request,
// the `handle_*` function will fail with an error that bubbles up and results in
// the Executor stage exiting with Err(QueryError::Shutdown).
//
// Panic in a stage: the stage drops its channel end.
let PageServicePipeliningConfig {
max_batch_size,
protocol_pipelining_mode,
} = pipelining_config;
// Create a CancellationToken for the pipeline.
// And make any return/panic from this function signal that cancellation.
let (cancel, _drop_guard) = {
let cancel = self.cancel.child_token();
(cancel.clone(), cancel.drop_guard())
};
// Macro to _define_ a pipeline stage.
//
// The stage is a future.
// It need not be cancellation-safe.
// It receives a child token of `cancel` and a child RequestContext as an argument.
//
// When a stage exits all other stages will be signalled to cancel.
macro_rules! pipeline_stage {
($name:literal, $make_fut:expr) => {{
let cancel = cancel.clone();
let ctx = ctx.attached_child();
let stage_fut = $make_fut(cancel.child_token(), ctx);
let stage_fut = $make_fut;
async move {
scopeguard::defer! {
debug!("exiting");
}
let _cancel_pipeline_on_stage_exit = cancel.clone().drop_guard();
timed_after_cancellation(
stage_fut,
std::stringify!($name),
Duration::from_millis(100),
&cancel,
)
.await
stage_fut.await
}
.instrument(tracing::info_span!($name))
}};
}
// Grab gate guards before `self` gets borrowed by the executor stage.
let gate_guard_1 = self.gate.enter().map_err(|_| QueryError::Shutdown)?;
let gate_guard_2 = self.gate.enter().map_err(|_| QueryError::Shutdown)?;
//
// Create Reading future.
//
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_messages = pipeline_stage!("read_messages", move |cancel, ctx| async move {
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
// No need to be sensitive to `cancel` here because downstream is.
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
let read_messages = pipeline_stage!("read_messages", {
let cancel = self.cancel.clone();
let ctx = ctx.attached_child();
async move {
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
}
}
}
// Make downstream exit after we exit.
// Explicit drop here is for robustness in future refactors.
drop(requests_tx);
Ok((pgb_reader, timeline_handles))
}
Ok((pgb_reader, timeline_handles))
});
//
// Create Batching future.
//
// Explicit sensitivity to `cancel` is not needed because the only
// two await points are channel recv & send, both of which will complete
// as soon as the upstream sender / downstream receivers are dropped.
//
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = pipeline_stage!("batcher", move |_cancel, _ctx| async move {
let batcher = pipeline_stage!("batcher", async move {
loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
@@ -1215,29 +1235,36 @@ impl PageServerHandler {
}
}
}
// Make downstream exit after we exit.
// Explicit drop here is for robustness in future refactors.
drop(batch_tx);
});
//
// Create Executor future.
//
let executor = pipeline_stage!("executor", move |cancel, ctx| async move {
loop {
let maybe_batch = batch_rx
.recv()
// no need to be sensitive to `cancel` because upstrema stages are
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
break;
}
};
self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx)
.await?;
let executor = pipeline_stage!("executor", {
let cancel = self.cancel.clone();
let ctx = ctx.attached_child();
async move {
loop {
let maybe_batch = batch_rx.recv().await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
break;
}
};
self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx)
.await?;
}
// Make upstreams exit after we exit.
// Explicit drop here is for robustness in future refactors.
drop(batch_rx);
Ok(())
}
Ok(())
});
//
@@ -1247,11 +1274,9 @@ impl PageServerHandler {
// run it in separate tokio tasks.
//
// In any way, we wait for all stages to exit.
// The pipeline_stage! machinery ensures cancellation signalling,
// stages are responsible for being responsive to it.
//
// The behavior of the case where the client requests clean shutdown
// is not well defined right now.
// See the top of this function for why all stages exit quickly
// if one of them does.
let read_messages_res;
let executor_res;
@@ -1261,17 +1286,11 @@ impl PageServerHandler {
tokio::join!(read_messages, batcher, executor)
}
PageServiceProtocolPipeliningMode::Tasks => {
macro_rules! spawn_with_gate_guard {
($guard:expr, $fut:expr) => {{
tokio::task::spawn(async move {
let res = $fut.await;
drop($guard);
res
})
}};
}
let read_messages_task = spawn_with_gate_guard!(gate_guard_1, read_messages);
let batcher_task = spawn_with_gate_guard!(gate_guard_2, batcher);
// NB: the assumption is that this function is polled to completion.
// So, no need to keep track of these task handles in a JoinSet / via GateGuard.
// This does not actually hold if we're panicking, but that reduces to the AsyncDrop problem.
let read_messages_task = tokio::task::spawn(read_messages);
let batcher_task = tokio::task::spawn(batcher);
let read_messages_task_res;
let batcher_task_res;
(read_messages_task_res, batcher_task_res, executor_res) = tokio::join!(