fix pipeline cancellation

This commit is contained in:
Christian Schwarz
2024-11-26 20:25:10 +01:00
parent 41ddc6772c
commit 18ffaba975

View File

@@ -316,6 +316,7 @@ struct PageServerHandler {
connection_ctx: RequestContext,
cancel: CancellationToken,
gate: utils::sync::gate::Gate,
/// None only while pagestream protocol is being processed.
timeline_handles: Option<TimelineHandles>,
@@ -581,6 +582,7 @@ impl PageServerHandler {
connection_ctx,
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
gate: Default::default(),
pipelining_config,
}
}
@@ -818,6 +820,7 @@ impl PageServerHandler {
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(), QueryError>
where
@@ -944,7 +947,7 @@ impl PageServerHandler {
}
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
@@ -1054,13 +1057,14 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let cancel = self.cancel.clone();
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&self.cancel,
&cancel,
ctx,
request_span.clone(),
)
@@ -1072,7 +1076,7 @@ impl PageServerHandler {
return Ok((pgb_reader, timeline_handles));
}
};
self.pagesteam_handle_batched_message(pgb_writer, *msg, ctx)
self.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx)
.await?;
}
}
@@ -1107,60 +1111,92 @@ impl PageServerHandler {
protocol_pipelining_mode,
} = pipelining_config;
let cancel = self.cancel.clone();
// Create a CancellationToken for the pipeline.
// And make any return/panic from this function signal that cancellation.
let (cancel, _drop_guard) = {
let cancel = self.cancel.child_token();
(cancel.clone(), cancel.drop_guard())
};
// Macro to _define_ a pipeline stage.
//
// The stage is a future.
// It need not be cancellation-safe.
// It receives a child token of `cancel` and a child RequestContext as an argument.
//
// When a stage exits all other stages will be signalled to cancel.
macro_rules! pipeline_stage {
($name:literal, $make_fut:expr) => {{
let cancel = cancel.clone();
let ctx = ctx.attached_child();
let stage_fut = $make_fut(cancel.child_token(), ctx);
async move {
scopeguard::defer! {
debug!("exiting");
}
let _cancel_pipeline_on_stage_exit = cancel.clone().drop_guard();
timed_after_cancellation(
stage_fut,
std::stringify!($name),
Duration::from_millis(100),
&cancel,
)
.await
}
.instrument(tracing::info_span!($name))
}};
}
// Grab gate guards before `self` gets borrowed by the executor stage.
let gate_guard_1 = self.gate.enter().map_err(|_| QueryError::Shutdown)?;
let gate_guard_2 = self.gate.enter().map_err(|_| QueryError::Shutdown)?;
//
// Create Reading future.
//
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
let read_messages = {
let cancel = self.cancel.child_token();
let ctx = ctx.attached_child();
async move {
scopeguard::defer! {
debug!("exiting");
}
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
}
let read_messages = pipeline_stage!("read_messages", move |cancel, ctx| async move {
let mut pgb_reader = pgb_reader;
loop {
let msg = Self::pagestream_read_message(
&mut pgb_reader,
tenant_id,
timeline_id,
&mut timeline_handles,
&cancel,
&ctx,
request_span.clone(),
)
.await?;
let msg = match msg {
Some(msg) => msg,
None => {
debug!("pagestream subprotocol end observed");
break;
}
};
// No need to be sensitive to `cancel` here because downstream is.
match requests_tx.send(msg).await {
Ok(()) => {}
Err(tokio::sync::mpsc::error::SendError(_)) => {
debug!("downstream is gone");
break;
}
}
Ok((pgb_reader, timeline_handles))
}
}
.instrument(tracing::info_span!("read_messages"));
Ok((pgb_reader, timeline_handles))
});
//
// Create Batching future.
//
// Explicit sensitivity to `cancel` is not needed because the only
// two await points are channel recv & send, both of which will complete
// as soon as the upstream sender / downstream receivers are dropped.
//
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
let batcher = async move {
scopeguard::defer! {
debug!("exiting");
}
let batcher = pipeline_stage!("batcher", move |_cancel, _ctx| async move {
loop {
let maybe_req = requests_rx.recv().await;
let Some(req) = maybe_req else {
@@ -1179,84 +1215,69 @@ impl PageServerHandler {
}
}
}
}
.instrument(tracing::info_span!("batcher"));
});
//
// Create Executor future.
//
let executor = async {
scopeguard::defer! {
debug!("exiting");
};
let executor = pipeline_stage!("executor", move |cancel, ctx| async move {
loop {
let batch = match batch_rx.recv().await {
let maybe_batch = batch_rx
.recv()
// no need to be sensitive to `cancel` because upstrema stages are
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
break;
}
};
self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx)
self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx)
.await?;
}
Ok(())
}
.instrument(tracing::info_span!("executor"));
});
//
// Execute the stages until they exit.
// Execute the stages.
//
// We can either run the pipeline as concurrent futures or we can
// run it in separate tokio tasks.
//
// In any case, we need to be responsive to cancellation (self.cancel).
// The style chosen here is that cancellation must propagate through the
// pipeline: if any stage dies, the whole pipeline dies.
// In any way, we wait for all stages to exit.
// The pipeline_stage! machinery ensures cancellation signalling,
// stages are responsible for being responsive to it.
//
// If the client communicates intent to end the pagestream sub-protocol,
// the Reader stage shuts down the pipeline cleanly by sending a `None`
// through the pipeline, resulting in all stages exiting cleanly after
// the last response has been produced.
//
// Unclean pipeline shutdown is initiated by Reader or Executor returning
// a QueryError. This bubbles up to the caller, which will shut down the connection.
macro_rules! with_noise_on_slow_cancel {
($fut:ident) => {
timed_after_cancellation(
$fut,
std::stringify!($fut),
Duration::from_millis(100),
&cancel,
)
};
}
// The behavior of the case where the client requests clean shutdown
// is not well defined right now.
let read_messages_res;
let executor_res;
match protocol_pipelining_mode {
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
(read_messages_res, _, executor_res) = {
tokio::join!(
with_noise_on_slow_cancel!(read_messages),
with_noise_on_slow_cancel!(batcher),
with_noise_on_slow_cancel!(executor),
)
}
(read_messages_res, _, executor_res) =
tokio::join!(read_messages, batcher, executor)
}
PageServiceProtocolPipeliningMode::Tasks => {
// cancelled via sensitivity to self.cancel
let read_messages_task = tokio::task::spawn(read_messages);
// cancelled when it observes read_messages_task disconnect the channel
let batcher_task = tokio::task::spawn(batcher);
macro_rules! spawn_with_gate_guard {
($guard:expr, $fut:expr) => {{
tokio::task::spawn(async move {
let res = $fut.await;
drop($guard);
res
})
}};
}
let read_messages_task = spawn_with_gate_guard!(gate_guard_1, read_messages);
let batcher_task = spawn_with_gate_guard!(gate_guard_2, batcher);
let read_messages_task_res;
let batcher_task_res;
(read_messages_task_res, batcher_task_res, executor_res) = tokio::join!(
with_noise_on_slow_cancel!(read_messages_task),
with_noise_on_slow_cancel!(batcher_task),
with_noise_on_slow_cancel!(executor), // not in a separate task
read_messages_task,
batcher_task,
executor, // not in a separate task
);
read_messages_res = read_messages_task_res
.context("read_messages task panicked, check logs for details")?;