From 07358dea89ecf827b4852243bee6b2b1b7874983 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 28 Nov 2024 20:06:15 +0100 Subject: [PATCH] converge on approach that pushes read Result through pipeline --- libs/pageserver_api/src/config.rs | 9 - pageserver/src/page_service.rs | 201 +++++++++--------- .../pageserver/test_page_service_batching.py | 8 +- 3 files changed, 98 insertions(+), 120 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index a0a6dedcdd..bc24cdedad 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -131,14 +131,6 @@ pub struct DiskUsageEvictionTaskConfig { pub struct PageServicePipeliningConfig { /// Causes runtime errors if larger than max get_vectored batch size. pub max_batch_size: NonZeroUsize, - pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "kebab-case")] -pub enum PageServiceProtocolPipeliningMode { - ConcurrentFutures, - Tasks, } pub mod statvfs { @@ -417,7 +409,6 @@ impl Default for ConfigToml { no_sync: None, page_service_pipelining: Some(PageServicePipeliningConfig { max_batch_size: NonZeroUsize::new(32).unwrap(), - protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures, }), } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bfe84802a2..d188a64464 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,7 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; -use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode}; +use pageserver_api::config::PageServicePipeliningConfig; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -316,7 +316,6 @@ struct PageServerHandler { connection_ctx: RequestContext, cancel: CancellationToken, - gate: utils::sync::gate::Gate, /// None only while pagestream protocol is being processed. timeline_handles: Option, @@ -582,7 +581,6 @@ impl PageServerHandler { connection_ctx, timeline_handles: Some(TimelineHandles::new(tenant_manager)), cancel, - gate: Default::default(), pipelining_config, } } @@ -1004,7 +1002,7 @@ impl PageServerHandler { .expect("implementation error: timeline_handles should not be locked"); let request_span = info_span!("request", shard_id = tracing::field::Empty); - let (pgb_reader, timeline_handles) = match self.pipelining_config.clone() { + let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() { Some(pipelining_config) => { self.handle_pagerequests_pipelined( pgb, @@ -1030,7 +1028,7 @@ impl PageServerHandler { ) .await } - }?; + }; debug!("pagestream subprotocol shut down cleanly"); @@ -1040,7 +1038,7 @@ impl PageServerHandler { let replaced = self.timeline_handles.replace(timeline_handles); assert!(replaced.is_none()); - Ok(()) + result } #[allow(clippy::too_many_arguments)] @@ -1053,12 +1051,15 @@ impl PageServerHandler { mut timeline_handles: TimelineHandles, request_span: Span, ctx: &RequestContext, - ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + ) -> ( + (PostgresBackendReader, TimelineHandles), + Result<(), QueryError>, + ) where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { let cancel = self.cancel.clone(); - loop { + let err = loop { let msg = Self::pagestream_read_message( &mut pgb_reader, tenant_id, @@ -1068,17 +1069,27 @@ impl PageServerHandler { ctx, request_span.clone(), ) - .await?; + .await; + let msg = match msg { + Ok(msg) => msg, + Err(e) => break e, + }; let msg = match msg { Some(msg) => msg, None => { debug!("pagestream subprotocol end observed"); - return Ok((pgb_reader, timeline_handles)); + return ((pgb_reader, timeline_handles), Ok(())); } }; - self.pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx) - .await?; - } + let err = self + .pagesteam_handle_batched_message(pgb_writer, *msg, &cancel, ctx) + .await; + match err { + Ok(()) => {} + Err(e) => break e, + } + }; + ((pgb_reader, timeline_handles), Err(err)) } /// # Cancel-Safety @@ -1095,7 +1106,10 @@ impl PageServerHandler { request_span: Span, pipelining_config: PageServicePipeliningConfig, ctx: &RequestContext, - ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> + ) -> ( + (PostgresBackendReader, TimelineHandles), + Result<(), QueryError>, + ) where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { @@ -1152,10 +1166,7 @@ impl PageServerHandler { // the `handle_*` function will fail with an error that bubbles up and results in // the Executor stage exiting with Err(QueryError::Shutdown). - let PageServicePipeliningConfig { - max_batch_size, - protocol_pipelining_mode, - } = pipelining_config; + let PageServicePipeliningConfig { max_batch_size } = pipelining_config; // Cancellation root for the pipeline. // If any one stage exits, this gets cancelled. @@ -1164,16 +1175,14 @@ impl PageServerHandler { // Macro to _define_ a pipeline stage. macro_rules! pipeline_stage { ($name:literal, $make_fut:expr) => {{ - // Give each stage a child token to avoid lock contention in `tasks` mode. - let stage_fut = $make_fut(cancel.child_token()); - // Cancel the pipeline if the stage exits with an error. - // If it exits cleanly, the cancellation should just bubble through the pipeline. - let cancel_pipeline = cancel.clone().drop_guard(); + let stage_fut = $make_fut; + let cancel = cancel.clone(); async move { scopeguard::defer! { debug!("exiting"); } - stage_fut.await + timed_after_cancellation(stage_fut, $name, Duration::from_millis(100), &cancel) + .await } .instrument(tracing::info_span!($name)) }}; @@ -1184,12 +1193,14 @@ impl PageServerHandler { // let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); - let read_messages = pipeline_stage!("read_messages", move |cancel| { + let read_messages = pipeline_stage!("read_messages", { + let cancel = cancel.clone(); let ctx = ctx.attached_child(); async move { let mut pgb_reader = pgb_reader; - loop { - let msg = Self::pagestream_read_message( + let mut exit = false; + while !exit { + let res = Self::pagestream_read_message( &mut pgb_reader, tenant_id, timeline_id, @@ -1198,15 +1209,9 @@ impl PageServerHandler { &ctx, request_span.clone(), ) - .await?; - let msg = match msg { - Some(msg) => msg, - None => { - debug!("pagestream subprotocol end observed"); - break; - } - }; - match requests_tx.send(msg).await { + .await; + exit |= res.is_err(); + match requests_tx.send(res).await { Ok(()) => {} Err(tokio::sync::mpsc::error::SendError(_)) => { debug!("downstream is gone"); @@ -1214,10 +1219,7 @@ impl PageServerHandler { } } } - // Make downstream exit after we exit. - // Explicit drop here is for robustness in future refactors. - drop(requests_tx); - Ok((pgb_reader, timeline_handles)) + (pgb_reader, timeline_handles) } }); @@ -1225,18 +1227,43 @@ impl PageServerHandler { // Create Batching future. // - let (mut batch_tx, mut batch_rx) = spsc_fold::channel(); - let batcher = pipeline_stage!("batcher", move |_cancel| async move { - loop { + enum Batch { + Request(Box), + ReadError(QueryError), + } + let (mut batch_tx, mut batch_rx) = spsc_fold::channel::(); + let batcher = pipeline_stage!("batcher", async move { + let mut exit = false; + while !exit { let maybe_req = requests_rx.recv().await; - let Some(req) = maybe_req else { + let Some(read_res) = maybe_req else { + debug!("upstream is gone"); break; }; - let send_res = batch_tx - .send(req, |batch, req| { - Self::pagestream_do_batch(max_batch_size, batch, req) - }) - .await; + let send_res = match read_res { + Ok(None) => { + debug!("upstream end of sub-protocol"); + break; + } + Ok(Some(req)) => { + batch_tx + .send(Batch::Request(req), |batch, req| match (batch, req) { + (Batch::Request(ref mut batch), Batch::Request(req)) => { + Self::pagestream_do_batch(max_batch_size, batch, req) + .map_err(|req| Batch::Request(req)) + } + (Batch::Request(_), x @ Batch::ReadError(_)) => Err(x), + (Batch::ReadError(_), Batch::Request(_) | Batch::ReadError(_)) => { + unreachable!("we exit from batcher after storing a read error"); + } + }) + .await + } + Err(e) => { + exit = true; + batch_tx.send(Batch::ReadError(e), |_, req| Err(req)).await + } + }; match send_res { Ok(()) => {} Err(spsc_fold::SendError::ReceiverGone) => { @@ -1245,88 +1272,52 @@ impl PageServerHandler { } } } - // Make downstream exit after we exit. - // Explicit drop here is for robustness in future refactors. - drop(batch_tx); }); // // Create Executor future. // - let executor = pipeline_stage!("executor", |cancel| { + let executor = pipeline_stage!("executor", { + let cancel = cancel.clone(); let ctx = ctx.attached_child(); async move { + scopeguard::defer! { + cancel.cancel(); + }; loop { let maybe_batch = batch_rx.recv().await; let batch = match maybe_batch { Ok(batch) => batch, Err(spsc_fold::RecvError::SenderGone) => { debug!("upstream gone"); - break; + return Ok(()); } }; - self.pagesteam_handle_batched_message(pgb_writer, *batch, &cancel, &ctx) - .await?; + match batch { + Batch::Request(batch) => { + self.pagesteam_handle_batched_message( + pgb_writer, *batch, &cancel, &ctx, + ) + .await?; + } + Batch::ReadError(e) => { + return Err(e); + } + } } - // Make upstreams exit after we exit. - // Explicit drop here is for robustness in future refactors. - drop(batch_rx); - Ok(()) } }); // // Execute the stages. // - // We can either run the pipeline as concurrent futures or we can - // run it in separate tokio tasks. - // - // In any way, we wait for all stages to exit. - // - // See the top of this function for why all stages exit quickly - // if one of them does. let read_messages_res; - let batcher_res; - let executor_res; - match protocol_pipelining_mode { - PageServiceProtocolPipeliningMode::ConcurrentFutures => { - (read_messages_res, batcher_res, executor_res) = - tokio::join!(read_messages, batcher, executor) - } - PageServiceProtocolPipeliningMode::Tasks => { - // We must run all tasks to completion and not panic; otherwise we leak the tasks. - let read_messages_task = tokio::task::spawn(read_messages); - let batcher_task = tokio::task::spawn(batcher); - let executor_task = tokio::task::spawn(executor); - let read_messages_task_res; - let batcher_task_res; - let executor_task_res; - (read_messages_task_res, batcher_task_res, executor_task_res) = - tokio::join!(read_messages_task, batcher_task, executor_task); - read_messages_res = read_messages_task_res - .context("read_messages task panicked, check logs for details")?; - let _: () = - batcher_task_res.context("batcher task panicked, check logs for details")?; - } - } + let executor_res: Result<(), QueryError>; + (read_messages_res, (), executor_res) = tokio::join!(read_messages, batcher, executor); - if let Err(batcher_err) = batcher_res { - warn!(error=?batcher_err, "batcher exited with error, this is unexpected"); - } - - match (read_messages_res, executor_res) { - (Err(e), _) => { - let e: QueryError = e; - Err(e) // - } - (_, Err(e)) => { - let e: QueryError = e; - Err(e) - } - (Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)), - } + (read_messages_res, executor_res) } /// Helper function to handle the LSN from client request. diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 669ce32d57..46fa65db5d 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -17,20 +17,16 @@ TARGET_RUNTIME = 30 @dataclass class PageServicePipeliningConfig: max_batch_size: int - protocol_pipelining_mode: str -PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"] NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] for max_batch_size in [1, 32]: - for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: - NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size)) BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] for max_batch_size in [1, 2, 4, 8, 16, 32]: - for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: - BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + BATCHABLE.append(PageServicePipeliningConfig(max_batch_size)) @pytest.mark.parametrize(