mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
WIP
This commit is contained in:
@@ -1102,35 +1102,37 @@ impl PageServerHandler {
|
||||
//
|
||||
// We construct a pipeline of
|
||||
// - Reading: read messages from pgb
|
||||
// - Batching: fill the current batch
|
||||
// - Batching: batch the messages if possible
|
||||
// - Execution: take the current batch, execute it using get_vectored, and send the response.
|
||||
//
|
||||
// The stages synchronize through channels.
|
||||
// The stages synchronized through channels.
|
||||
//
|
||||
// CODING RULES FOR CANCELLATION
|
||||
//
|
||||
// The channels propagate cancellation of the pipeline if any one stage exits.
|
||||
// If a given stage exists, then ...
|
||||
// - ... its downstream eventually exits because downstream's recv() fails and
|
||||
// - ... its upstream eventually exists because upstream's send() fails.
|
||||
// The overall pipeline has a CancellationToken that is a child of `self.cancel`.
|
||||
// Each pipeline stage receives a child token of the pipeline's CancellationToken.
|
||||
// Every pipeline stage is sensitive to it on all `.await`s except
|
||||
// when the stage is waiting on its upstream or downstream channel, where cancellation
|
||||
// is signalled through channel disconnection from/to the upstream/downstream.
|
||||
//
|
||||
// A stage will not observe propagated cancellation through channels while
|
||||
// 1. there's still data in the channel (channel recv succeeds), or
|
||||
// 2. while it is `await`ing a future that is not its upstream/downstream channel.
|
||||
// That is intentional: we always want to run the pipeline empty.
|
||||
// When any pipeline stage exits with Err(), the pipeline CancellationToken gets
|
||||
// cancelled via drop guard. This causes all other stages to exit soon after.
|
||||
//
|
||||
// The coding discipline from the parent function still stands, though:
|
||||
// any interaction with the client connection (pgb) must be sensitive to
|
||||
// `self.cancel`, so that we can shut down page_service quickly.
|
||||
// When a pipeline stage exits with Ok(), the stage's drop guard is disarmed.
|
||||
// This allows other stages to wrap up cleanly.
|
||||
//
|
||||
// Let's walk through the common cases of pipeline shutdown:
|
||||
// Let's walk through the common cases of pipeline shutdown to test this model:
|
||||
//
|
||||
// Client-initiated shutdown: the client ends the CopyBoth session, making
|
||||
// the Reading stage exit with Ok(()). This in turn makes the Batching stage
|
||||
// exit with Ok(()), and the Executor stage processes the remaining batch from
|
||||
// the spsc_fold. Then the Executor stage exits with Ok(()).
|
||||
// the spsc_fold. Then the Executor stage exits with Ok(()). At no point was
|
||||
// the pipeline CancellationToken cancelled.
|
||||
//
|
||||
// Server-initiated shutdown through self.cancel: the pipeline CancellationToken
|
||||
// is a child token of self.cancel. All stages will exit promptly
|
||||
|
||||
//
|
||||
// Server-initiated shutdown through self.cancel:
|
||||
// - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message,
|
||||
// it will exit with Err(QueryError::Shutdown).
|
||||
// - Case 2: If the Reading stage is waiting on its downstream (send to Batching),
|
||||
@@ -1149,18 +1151,24 @@ impl PageServerHandler {
|
||||
// handle that's stored in the BatchedFeMessage to execute the request,
|
||||
// the `handle_*` function will fail with an error that bubbles up and results in
|
||||
// the Executor stage exiting with Err(QueryError::Shutdown).
|
||||
//
|
||||
// Panic in a stage: the stage drops its channel end.
|
||||
|
||||
let PageServicePipeliningConfig {
|
||||
max_batch_size,
|
||||
protocol_pipelining_mode,
|
||||
} = pipelining_config;
|
||||
|
||||
// Cancellation root for the pipeline.
|
||||
// If any one stage exits, this gets cancelled.
|
||||
let cancel = self.cancel.child_token();
|
||||
|
||||
// Macro to _define_ a pipeline stage.
|
||||
macro_rules! pipeline_stage {
|
||||
($name:literal, $make_fut:expr) => {{
|
||||
let stage_fut = $make_fut;
|
||||
// Give each stage a child token to avoid lock contention in `tasks` mode.
|
||||
let stage_fut = $make_fut(cancel.child_token());
|
||||
// Cancel the pipeline if the stage exits with an error.
|
||||
// If it exits cleanly, the cancellation should just bubble through the pipeline.
|
||||
let cancel_pipeline = cancel.clone().drop_guard();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("exiting");
|
||||
@@ -1176,8 +1184,7 @@ impl PageServerHandler {
|
||||
//
|
||||
|
||||
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
|
||||
let read_messages = pipeline_stage!("read_messages", {
|
||||
let cancel = self.cancel.clone();
|
||||
let read_messages = pipeline_stage!("read_messages", move |cancel| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
@@ -1219,7 +1226,7 @@ impl PageServerHandler {
|
||||
//
|
||||
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", async move {
|
||||
let batcher = pipeline_stage!("batcher", move |_cancel| async move {
|
||||
loop {
|
||||
let maybe_req = requests_rx.recv().await;
|
||||
let Some(req) = maybe_req else {
|
||||
@@ -1247,8 +1254,7 @@ impl PageServerHandler {
|
||||
// Create Executor future.
|
||||
//
|
||||
|
||||
let executor = pipeline_stage!("executor", {
|
||||
let cancel = self.cancel.clone();
|
||||
let executor = pipeline_stage!("executor", |cancel| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
loop {
|
||||
@@ -1282,25 +1288,23 @@ impl PageServerHandler {
|
||||
// if one of them does.
|
||||
|
||||
let read_messages_res;
|
||||
let batcher_res;
|
||||
let executor_res;
|
||||
match protocol_pipelining_mode {
|
||||
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
|
||||
(read_messages_res, _, executor_res) =
|
||||
(read_messages_res, batcher_res, executor_res) =
|
||||
tokio::join!(read_messages, batcher, executor)
|
||||
}
|
||||
PageServiceProtocolPipeliningMode::Tasks => {
|
||||
// NB: the assumption is that this function is polled to completion.
|
||||
// So, no need to keep track of these task handles in a JoinSet / via GateGuard.
|
||||
// This does not actually hold if we're panicking, but that reduces to the AsyncDrop problem.
|
||||
// We must run all tasks to completion and not panic; otherwise we leak the tasks.
|
||||
let read_messages_task = tokio::task::spawn(read_messages);
|
||||
let batcher_task = tokio::task::spawn(batcher);
|
||||
let executor_task = tokio::task::spawn(executor);
|
||||
let read_messages_task_res;
|
||||
let batcher_task_res;
|
||||
(read_messages_task_res, batcher_task_res, executor_res) = tokio::join!(
|
||||
read_messages_task,
|
||||
batcher_task,
|
||||
executor, // not in a separate task
|
||||
);
|
||||
let executor_task_res;
|
||||
(read_messages_task_res, batcher_task_res, executor_task_res) =
|
||||
tokio::join!(read_messages_task, batcher_task, executor_task);
|
||||
read_messages_res = read_messages_task_res
|
||||
.context("read_messages task panicked, check logs for details")?;
|
||||
let _: () =
|
||||
@@ -1308,8 +1312,16 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(batcher_err) = batcher_res {
|
||||
warn!(error=?batcher_err, "batcher exited with error, this is unexpected");
|
||||
}
|
||||
|
||||
match (read_messages_res, executor_res) {
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
(Err(e), _) => {
|
||||
let e: QueryError = e;
|
||||
Err(e) //
|
||||
}
|
||||
(_, Err(e)) => {
|
||||
let e: QueryError = e;
|
||||
Err(e)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user