mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
merge reader&batcher stages, update docs
This commit is contained in:
@@ -1114,72 +1114,64 @@ impl PageServerHandler {
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
//
|
||||
// We construct a pipeline of
|
||||
// - Reading: read messages from pgb
|
||||
// - Batching: batch the messages if possible
|
||||
// - Execution: take the current batch, execute it using get_vectored, and send the response.
|
||||
// Pipelined pagestream handling consists of
|
||||
// - a Batcher that reads requests off the wire and
|
||||
// and batches them if possible,
|
||||
// - an Executor that processes the batched requests.
|
||||
//
|
||||
// The stages synchronized through channels.
|
||||
// The batch is built up inside an `spsc_fold` channel,
|
||||
// shared betwen Batcher (Sender) and Executor (Receiver).
|
||||
//
|
||||
// CODING RULES FOR CANCELLATION
|
||||
// The Batcher continously folds client requests into the batch,
|
||||
// while the Executor can at any time take out what's in the batch
|
||||
// in order to process it.
|
||||
// This means the next batch builds up while the Executor
|
||||
// executes the last batch.
|
||||
//
|
||||
// The overall pipeline has a CancellationToken that is a child of `self.cancel`.
|
||||
// Each pipeline stage receives a child token of the pipeline's CancellationToken.
|
||||
// Every pipeline stage is sensitive to it on all `.await`s except
|
||||
// when the stage is waiting on its upstream or downstream channel, where cancellation
|
||||
// is signalled through channel disconnection from/to the upstream/downstream.
|
||||
// CANCELLATION
|
||||
//
|
||||
// When any pipeline stage exits with Err(), the pipeline CancellationToken gets
|
||||
// cancelled via drop guard. This causes all other stages to exit soon after.
|
||||
// We run both Batcher and Executor futures to completion before
|
||||
// returning from this function.
|
||||
//
|
||||
// When a pipeline stage exits with Ok(), the stage's drop guard is disarmed.
|
||||
// This allows other stages to wrap up cleanly.
|
||||
// If Executor exits first, it signals cancellation to the Batcher
|
||||
// via a CancellationToken that is child of `self.cancel`.
|
||||
// If Batcher exits first, it signals cancellation to the Executor
|
||||
// by dropping the spsc_fold channel Sender.
|
||||
//
|
||||
// Let's walk through the common cases of pipeline shutdown to test this model:
|
||||
// CLEAN SHUTDOWN
|
||||
//
|
||||
// Client-initiated shutdown: the client ends the CopyBoth session, making
|
||||
// the Reading stage exit with Ok(()). This in turn makes the Batching stage
|
||||
// exit with Ok(()), and the Executor stage processes the remaining batch from
|
||||
// the spsc_fold. Then the Executor stage exits with Ok(()). At no point was
|
||||
// the pipeline CancellationToken cancelled.
|
||||
// Clean shutdown means that the client ends the COPYBOTH session.
|
||||
// In response to such a client message, the Batcher exits.
|
||||
// The Executor continues to run, draining the spsc_fold channel.
|
||||
// Once drained, the spsc_fold recv will fail with a distinct error
|
||||
// indicating that the sender disconnected.
|
||||
// The Executor exits with Ok(()) in response to that error.
|
||||
//
|
||||
// Server-initiated shutdown through self.cancel: the pipeline CancellationToken
|
||||
// is a child token of self.cancel. All stages will exit promptly
|
||||
|
||||
// Server initiated shutdown is not clean shutdown, but instead
|
||||
// is an error Err(QueryError::Shutdown) that is propagated through
|
||||
// error propagation.
|
||||
//
|
||||
// - Case 1: If the Reading stage is waiting on its upstream (pgb) for a new client message,
|
||||
// it will exit with Err(QueryError::Shutdown).
|
||||
// - Case 2: If the Reading stage is waiting on its downstream (send to Batching),
|
||||
// it follows that Batching is waiting for Executor.
|
||||
// Executor will observe self.cancel when it sends the response, and exit with Err(QueryError::Shutdown).
|
||||
// - Case 3: the Executor stage observes self.cancel and exits with Err() while the Reading
|
||||
// stage is waiting for a message from the client. If no message from the client arrives,
|
||||
// the Reading stage will never exit.
|
||||
// ERROR PROPAGATION
|
||||
//
|
||||
// In either case, a task exits, which makes the other tasks in the pipeline exit.
|
||||
// When the Batcher encounter an error, it sends it as a value
|
||||
// through the spsc_fold channel and exits afterwards.
|
||||
// When the Executor observes such an error in the channel,
|
||||
// it exits returning that error value.
|
||||
//
|
||||
// Server-initiated shutdown through Timeline::cancel:
|
||||
// - Case 1: If the Reading stage observes Timeline::cancel via timeline_handles
|
||||
// when it builds the BatchedFeMessage, it will exit with Err(QueryError::Shutdown).
|
||||
// - Case 2: If the Executor stage observes Timeline::cancel when it uses the
|
||||
// handle that's stored in the BatchedFeMessage to execute the request,
|
||||
// the `handle_*` function will fail with an error that bubbles up and results in
|
||||
// the Executor stage exiting with Err(QueryError::Shutdown).
|
||||
// This design ensures that the Executor stage will still process
|
||||
// the batch that was in flight when the Batcher encountered an error,
|
||||
// thereby beahving identical to a serial implementation.
|
||||
|
||||
let PageServicePipeliningConfig {
|
||||
max_batch_size,
|
||||
protocol_pipelining_mode,
|
||||
} = pipelining_config;
|
||||
|
||||
// Cancellation root for the pipeline.
|
||||
// If any one stage exits, this gets cancelled.
|
||||
let cancel = self.cancel.child_token();
|
||||
|
||||
// Macro to _define_ a pipeline stage.
|
||||
macro_rules! pipeline_stage {
|
||||
($name:literal, $make_fut:expr) => {{
|
||||
let stage_fut = $make_fut;
|
||||
let cancel = cancel.clone();
|
||||
($name:literal, $cancel:expr, $make_fut:expr) => {{
|
||||
let cancel: CancellationToken = $cancel;
|
||||
let stage_fut = $make_fut(cancel.clone());
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("exiting");
|
||||
@@ -1192,102 +1184,79 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
//
|
||||
// Create Reading future.
|
||||
//
|
||||
|
||||
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
|
||||
let read_messages = pipeline_stage!("read_messages", {
|
||||
let cancel = cancel.clone();
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let res = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&cancel,
|
||||
&ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
.await;
|
||||
exit |= res.is_err();
|
||||
match requests_tx.send(res).await {
|
||||
Ok(()) => {}
|
||||
Err(tokio::sync::mpsc::error::SendError(_)) => {
|
||||
debug!("downstream is gone");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
(pgb_reader, timeline_handles)
|
||||
}
|
||||
});
|
||||
|
||||
//
|
||||
// Create Batching future.
|
||||
// Batcher
|
||||
//
|
||||
|
||||
let cancel_batcher = self.cancel.child_token();
|
||||
enum Batch {
|
||||
Request(Box<BatchedFeMessage>),
|
||||
ReadError(QueryError),
|
||||
}
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel::<Batch>();
|
||||
let batcher = pipeline_stage!("batcher", async move {
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let maybe_req = requests_rx.recv().await;
|
||||
let Some(read_res) = maybe_req else {
|
||||
debug!("upstream is gone");
|
||||
break;
|
||||
};
|
||||
let send_res = match read_res {
|
||||
Ok(None) => {
|
||||
debug!("upstream end of sub-protocol");
|
||||
break;
|
||||
}
|
||||
Ok(Some(req)) => {
|
||||
batch_tx
|
||||
.send(Batch::Request(req), |batch, req| match (batch, req) {
|
||||
(Batch::Request(ref mut batch), Batch::Request(req)) => {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, req)
|
||||
.map_err(Batch::Request)
|
||||
}
|
||||
(Batch::Request(_), x @ Batch::ReadError(_)) => Err(x),
|
||||
(Batch::ReadError(_), Batch::Request(_) | Batch::ReadError(_)) => {
|
||||
unreachable!("we exit from batcher after storing a read error");
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
Err(e) => {
|
||||
exit = true;
|
||||
batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await
|
||||
}
|
||||
};
|
||||
match send_res {
|
||||
Ok(()) => {}
|
||||
Err(spsc_fold::SendError::ReceiverGone) => {
|
||||
debug!("downstream is gone");
|
||||
break;
|
||||
let read_messages = pipeline_stage!(
|
||||
"read_messages",
|
||||
cancel_batcher.clone(),
|
||||
move |cancel_batcher| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let res = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&mut timeline_handles,
|
||||
&cancel_batcher,
|
||||
&ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
.await;
|
||||
exit |= res.is_err();
|
||||
let send_res = match res {
|
||||
Ok(None) => {
|
||||
debug!("sub-protocol client-initiated shutdown");
|
||||
break;
|
||||
}
|
||||
Ok(Some(req)) => {
|
||||
batch_tx
|
||||
.send(Batch::Request(req), |batch, req| match (batch, req) {
|
||||
(Batch::Request(ref mut batch), Batch::Request(req)) => {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, req)
|
||||
.map_err(Batch::Request)
|
||||
}
|
||||
(Batch::Request(_), x @ Batch::ReadError(_)) => Err(x),
|
||||
(
|
||||
Batch::ReadError(_),
|
||||
Batch::Request(_) | Batch::ReadError(_),
|
||||
) => {
|
||||
unreachable!(
|
||||
"we exit from batcher after storing a read error"
|
||||
);
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
Err(e) => {
|
||||
exit = true;
|
||||
batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await
|
||||
}
|
||||
};
|
||||
exit |= send_res.is_err();
|
||||
}
|
||||
(pgb_reader, timeline_handles)
|
||||
}
|
||||
}
|
||||
});
|
||||
);
|
||||
|
||||
//
|
||||
// Create Executor future.
|
||||
// Executor
|
||||
//
|
||||
|
||||
let executor = pipeline_stage!("executor", {
|
||||
let cancel = cancel.clone();
|
||||
let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| {
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
cancel.cancel();
|
||||
};
|
||||
let _cancel_batcher = cancel_batcher.drop_guard();
|
||||
loop {
|
||||
let maybe_batch = batch_rx.recv().await;
|
||||
let batch = match maybe_batch {
|
||||
@@ -1316,29 +1285,21 @@ impl PageServerHandler {
|
||||
// Execute the stages.
|
||||
//
|
||||
|
||||
let read_messages_res;
|
||||
let _batcher_res: ();
|
||||
let executor_res: Result<(), QueryError>;
|
||||
match protocol_pipelining_mode {
|
||||
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
|
||||
(read_messages_res, _batcher_res, executor_res) =
|
||||
tokio::join!(read_messages, batcher, executor);
|
||||
tokio::join!(read_messages, executor)
|
||||
}
|
||||
PageServiceProtocolPipeliningMode::Tasks => {
|
||||
// These tasks are not tracked anywhere.
|
||||
let read_messages_task = tokio::spawn(read_messages);
|
||||
let batcher_task = tokio::spawn(batcher);
|
||||
let (read_messages_task_res, batcher_task_res, executor_res_) =
|
||||
tokio::join!(read_messages_task, batcher_task, executor,);
|
||||
(read_messages_res, _batcher_res, executor_res) = (
|
||||
let (read_messages_task_res, executor_res_) =
|
||||
tokio::join!(read_messages_task, executor,);
|
||||
(
|
||||
read_messages_task_res.expect("propagated panic from read_messages"),
|
||||
batcher_task_res.expect("propagated panic from batcher"),
|
||||
executor_res_,
|
||||
);
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
(read_messages_res, executor_res)
|
||||
}
|
||||
|
||||
/// Helper function to handle the LSN from client request.
|
||||
|
||||
Reference in New Issue
Block a user