mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
converge on approach that pushes read Result through pipeline
This commit is contained in:
@@ -131,14 +131,6 @@ pub struct DiskUsageEvictionTaskConfig {
|
||||
pub struct PageServicePipeliningConfig {
|
||||
/// Causes runtime errors if larger than max get_vectored batch size.
|
||||
pub max_batch_size: NonZeroUsize,
|
||||
pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum PageServiceProtocolPipeliningMode {
|
||||
ConcurrentFutures,
|
||||
Tasks,
|
||||
}
|
||||
|
||||
pub mod statvfs {
|
||||
@@ -417,7 +409,6 @@ impl Default for ConfigToml {
|
||||
no_sync: None,
|
||||
page_service_pipelining: Some(PageServicePipeliningConfig {
|
||||
max_batch_size: NonZeroUsize::new(32).unwrap(),
|
||||
protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode};
|
||||
use pageserver_api::config::PageServicePipeliningConfig;
|
||||
use pageserver_api::models::{self, TenantState};
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -316,7 +316,6 @@ struct PageServerHandler {
|
||||
connection_ctx: RequestContext,
|
||||
|
||||
cancel: CancellationToken,
|
||||
gate: utils::sync::gate::Gate,
|
||||
|
||||
/// None only while pagestream protocol is being processed.
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
@@ -582,7 +581,6 @@ impl PageServerHandler {
|
||||
connection_ctx,
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
gate: Default::default(),
|
||||
pipelining_config,
|
||||
}
|
||||
}
|
||||
@@ -1004,7 +1002,7 @@ impl PageServerHandler {
|
||||
.expect("implementation error: timeline_handles should not be locked");
|
||||
|
||||
let request_span = info_span!("request", shard_id = tracing::field::Empty);
|
||||
let (pgb_reader, timeline_handles) = match self.pipelining_config.clone() {
|
||||
let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() {
|
||||
Some(pipelining_config) => {
|
||||
self.handle_pagerequests_pipelined(
|
||||
pgb,
|
||||
@@ -1030,7 +1028,7 @@ impl PageServerHandler {
|
||||
)
|
||||
.await
|
||||
}
|
||||
}?;
|
||||
};
|
||||
|
||||
debug!("pagestream subprotocol shut down cleanly");
|
||||
|
||||
@@ -1040,7 +1038,7 @@ impl PageServerHandler {
|
||||
let replaced = self.timeline_handles.replace(timeline_handles);
|
||||
assert!(replaced.is_none());
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -1053,12 +1051,15 @@ impl PageServerHandler {
|
||||
mut timeline_handles: TimelineHandles,
|
||||
request_span: Span,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
|
||||
) -> (
|
||||
(PostgresBackendReader<IO>, TimelineHandles),
|
||||
Result<(), QueryError>,
|
||||
)
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let cancel = self.cancel.clone();
|
||||
loop {
|
||||
let err = loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
@@ -1068,17 +1069,27 @@ impl PageServerHandler {
|
||||
ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await;
|
||||
let msg = match msg {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => break e,
|
||||
};
|
||||
let msg = match msg {
|
||||
Some(msg) => msg,
|
||||
None => {
|
||||
debug!("pagestream subprotocol end observed");
|
||||
return Ok((pgb_reader, timeline_handles));
|
||||
return ((pgb_reader, timeline_handles), Ok(()));
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx)
|
||||
.await?;
|
||||
}
|
||||
let err = self
|
||||
.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx)
|
||||
.await;
|
||||
match err {
|
||||
Ok(()) => {}
|
||||
Err(e) => break e,
|
||||
}
|
||||
};
|
||||
((pgb_reader, timeline_handles), Err(err))
|
||||
}
|
||||
|
||||
/// # Cancel-Safety
|
||||
@@ -1095,7 +1106,10 @@ impl PageServerHandler {
|
||||
request_span: Span,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(PostgresBackendReader<IO>, TimelineHandles), QueryError>
|
||||
) -> (
|
||||
(PostgresBackendReader<IO>, TimelineHandles),
|
||||
Result<(), QueryError>,
|
||||
)
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
@@ -1152,10 +1166,7 @@ impl PageServerHandler {
|
||||
// the `handle_*` function will fail with an error that bubbles up and results in
|
||||
// the Executor stage exiting with Err(QueryError::Shutdown).
|
||||
|
||||
let PageServicePipeliningConfig {
|
||||
max_batch_size,
|
||||
protocol_pipelining_mode,
|
||||
} = pipelining_config;
|
||||
let PageServicePipeliningConfig { max_batch_size } = pipelining_config;
|
||||
|
||||
// Cancellation root for the pipeline.
|
||||
// If any one stage exits, this gets cancelled.
|
||||
@@ -1164,16 +1175,14 @@ impl PageServerHandler {
|
||||
// Macro to _define_ a pipeline stage.
|
||||
macro_rules! pipeline_stage {
|
||||
($name:literal, $make_fut:expr) => {{
|
||||
// Give each stage a child token to avoid lock contention in `tasks` mode.
|
||||
let stage_fut = $make_fut(cancel.child_token());
|
||||
// Cancel the pipeline if the stage exits with an error.
|
||||
// If it exits cleanly, the cancellation should just bubble through the pipeline.
|
||||
let cancel_pipeline = cancel.clone().drop_guard();
|
||||
let stage_fut = $make_fut;
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
debug!("exiting");
|
||||
}
|
||||
stage_fut.await
|
||||
timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel)
|
||||
.await
|
||||
}
|
||||
.instrument(tracing::info_span!($name))
|
||||
}};
|
||||
@@ -1184,12 +1193,14 @@ impl PageServerHandler {
|
||||
//
|
||||
|
||||
let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1);
|
||||
let read_messages = pipeline_stage!("read_messages", move |cancel| {
|
||||
let read_messages = pipeline_stage!("read_messages", {
|
||||
let cancel = cancel.clone();
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
let mut pgb_reader = pgb_reader;
|
||||
loop {
|
||||
let msg = Self::pagestream_read_message(
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let res = Self::pagestream_read_message(
|
||||
&mut pgb_reader,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
@@ -1198,15 +1209,9 @@ impl PageServerHandler {
|
||||
&ctx,
|
||||
request_span.clone(),
|
||||
)
|
||||
.await?;
|
||||
let msg = match msg {
|
||||
Some(msg) => msg,
|
||||
None => {
|
||||
debug!("pagestream subprotocol end observed");
|
||||
break;
|
||||
}
|
||||
};
|
||||
match requests_tx.send(msg).await {
|
||||
.await;
|
||||
exit |= res.is_err();
|
||||
match requests_tx.send(res).await {
|
||||
Ok(()) => {}
|
||||
Err(tokio::sync::mpsc::error::SendError(_)) => {
|
||||
debug!("downstream is gone");
|
||||
@@ -1214,10 +1219,7 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Make downstream exit after we exit.
|
||||
// Explicit drop here is for robustness in future refactors.
|
||||
drop(requests_tx);
|
||||
Ok((pgb_reader, timeline_handles))
|
||||
(pgb_reader, timeline_handles)
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1225,18 +1227,43 @@ impl PageServerHandler {
|
||||
// Create Batching future.
|
||||
//
|
||||
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel();
|
||||
let batcher = pipeline_stage!("batcher", move |_cancel| async move {
|
||||
loop {
|
||||
enum Batch {
|
||||
Request(Box<BatchedFeMessage>),
|
||||
ReadError(QueryError),
|
||||
}
|
||||
let (mut batch_tx, mut batch_rx) = spsc_fold::channel::<Batch>();
|
||||
let batcher = pipeline_stage!("batcher", async move {
|
||||
let mut exit = false;
|
||||
while !exit {
|
||||
let maybe_req = requests_rx.recv().await;
|
||||
let Some(req) = maybe_req else {
|
||||
let Some(read_res) = maybe_req else {
|
||||
debug!("upstream is gone");
|
||||
break;
|
||||
};
|
||||
let send_res = batch_tx
|
||||
.send(req, |batch, req| {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, req)
|
||||
})
|
||||
.await;
|
||||
let send_res = match read_res {
|
||||
Ok(None) => {
|
||||
debug!("upstream end of sub-protocol");
|
||||
break;
|
||||
}
|
||||
Ok(Some(req)) => {
|
||||
batch_tx
|
||||
.send(Batch::Request(req), |batch, req| match (batch, req) {
|
||||
(Batch::Request(ref mut batch), Batch::Request(req)) => {
|
||||
Self::pagestream_do_batch(max_batch_size, batch, req)
|
||||
.map_err(|req| Batch::Request(req))
|
||||
}
|
||||
(Batch::Request(_), x @ Batch::ReadError(_)) => Err(x),
|
||||
(Batch::ReadError(_), Batch::Request(_) | Batch::ReadError(_)) => {
|
||||
unreachable!("we exit from batcher after storing a read error");
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
Err(e) => {
|
||||
exit = true;
|
||||
batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await
|
||||
}
|
||||
};
|
||||
match send_res {
|
||||
Ok(()) => {}
|
||||
Err(spsc_fold::SendError::ReceiverGone) => {
|
||||
@@ -1245,88 +1272,52 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Make downstream exit after we exit.
|
||||
// Explicit drop here is for robustness in future refactors.
|
||||
drop(batch_tx);
|
||||
});
|
||||
|
||||
//
|
||||
// Create Executor future.
|
||||
//
|
||||
|
||||
let executor = pipeline_stage!("executor", |cancel| {
|
||||
let executor = pipeline_stage!("executor", {
|
||||
let cancel = cancel.clone();
|
||||
let ctx = ctx.attached_child();
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
cancel.cancel();
|
||||
};
|
||||
loop {
|
||||
let maybe_batch = batch_rx.recv().await;
|
||||
let batch = match maybe_batch {
|
||||
Ok(batch) => batch,
|
||||
Err(spsc_fold::RecvError::SenderGone) => {
|
||||
debug!("upstream gone");
|
||||
break;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx)
|
||||
.await?;
|
||||
match batch {
|
||||
Batch::Request(batch) => {
|
||||
self.pagesteam_handle_batched_message(
|
||||
pgb_writer, *batch, &cancel, &ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Batch::ReadError(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Make upstreams exit after we exit.
|
||||
// Explicit drop here is for robustness in future refactors.
|
||||
drop(batch_rx);
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
//
|
||||
// Execute the stages.
|
||||
//
|
||||
// We can either run the pipeline as concurrent futures or we can
|
||||
// run it in separate tokio tasks.
|
||||
//
|
||||
// In any way, we wait for all stages to exit.
|
||||
//
|
||||
// See the top of this function for why all stages exit quickly
|
||||
// if one of them does.
|
||||
|
||||
let read_messages_res;
|
||||
let batcher_res;
|
||||
let executor_res;
|
||||
match protocol_pipelining_mode {
|
||||
PageServiceProtocolPipeliningMode::ConcurrentFutures => {
|
||||
(read_messages_res, batcher_res, executor_res) =
|
||||
tokio::join!(read_messages, batcher, executor)
|
||||
}
|
||||
PageServiceProtocolPipeliningMode::Tasks => {
|
||||
// We must run all tasks to completion and not panic; otherwise we leak the tasks.
|
||||
let read_messages_task = tokio::task::spawn(read_messages);
|
||||
let batcher_task = tokio::task::spawn(batcher);
|
||||
let executor_task = tokio::task::spawn(executor);
|
||||
let read_messages_task_res;
|
||||
let batcher_task_res;
|
||||
let executor_task_res;
|
||||
(read_messages_task_res, batcher_task_res, executor_task_res) =
|
||||
tokio::join!(read_messages_task, batcher_task, executor_task);
|
||||
read_messages_res = read_messages_task_res
|
||||
.context("read_messages task panicked, check logs for details")?;
|
||||
let _: () =
|
||||
batcher_task_res.context("batcher task panicked, check logs for details")?;
|
||||
}
|
||||
}
|
||||
let executor_res: Result<(), QueryError>;
|
||||
(read_messages_res, (), executor_res) = tokio::join!(read_messages, batcher, executor);
|
||||
|
||||
if let Err(batcher_err) = batcher_res {
|
||||
warn!(error=?batcher_err, "batcher exited with error, this is unexpected");
|
||||
}
|
||||
|
||||
match (read_messages_res, executor_res) {
|
||||
(Err(e), _) => {
|
||||
let e: QueryError = e;
|
||||
Err(e) //
|
||||
}
|
||||
(_, Err(e)) => {
|
||||
let e: QueryError = e;
|
||||
Err(e)
|
||||
}
|
||||
(Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)),
|
||||
}
|
||||
(read_messages_res, executor_res)
|
||||
}
|
||||
|
||||
/// Helper function to handle the LSN from client request.
|
||||
|
||||
@@ -17,20 +17,16 @@ TARGET_RUNTIME = 30
|
||||
@dataclass
|
||||
class PageServicePipeliningConfig:
|
||||
max_batch_size: int
|
||||
protocol_pipelining_mode: str
|
||||
|
||||
|
||||
PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"]
|
||||
|
||||
NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
|
||||
for max_batch_size in [1, 32]:
|
||||
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
|
||||
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
|
||||
NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size))
|
||||
|
||||
BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None]
|
||||
for max_batch_size in [1, 2, 4, 8, 16, 32]:
|
||||
for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES:
|
||||
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode))
|
||||
BATCHABLE.append(PageServicePipeliningConfig(max_batch_size))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Reference in New Issue
Block a user