From a2a36131858f434add4ac533303c976e48f2e7a7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 28 Nov 2024 20:50:06 +0100 Subject: [PATCH] reintroduce task-based execution --- libs/pageserver_api/src/config.rs | 9 +++++++ pageserver/src/page_service.rs | 27 ++++++++++++++++--- .../pageserver/test_page_service_batching.py | 14 +++++++--- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index bc24cdedad..a0a6dedcdd 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -131,6 +131,14 @@ pub struct DiskUsageEvictionTaskConfig { pub struct PageServicePipeliningConfig { /// Causes runtime errors if larger than max get_vectored batch size. pub max_batch_size: NonZeroUsize, + pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum PageServiceProtocolPipeliningMode { + ConcurrentFutures, + Tasks, } pub mod statvfs { @@ -409,6 +417,7 @@ impl Default for ConfigToml { no_sync: None, page_service_pipelining: Some(PageServicePipeliningConfig { max_batch_size: NonZeroUsize::new(32).unwrap(), + protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures, }), } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index eaba291e49..e33d2c22d4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,7 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; -use pageserver_api::config::PageServicePipeliningConfig; +use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode}; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -1166,7 +1166,10 @@ impl PageServerHandler { // the `handle_*` function will fail with an error that bubbles up and results in // the Executor stage exiting with Err(QueryError::Shutdown). - let PageServicePipeliningConfig { max_batch_size } = pipelining_config; + let PageServicePipeliningConfig { + max_batch_size, + protocol_pipelining_mode, + } = pipelining_config; // Cancellation root for the pipeline. // If any one stage exits, this gets cancelled. @@ -1314,8 +1317,26 @@ impl PageServerHandler { // let read_messages_res; + let _batcher_res: (); let executor_res: Result<(), QueryError>; - (read_messages_res, (), executor_res) = tokio::join!(read_messages, batcher, executor); + match protocol_pipelining_mode { + PageServiceProtocolPipeliningMode::ConcurrentFutures => { + (read_messages_res, _batcher_res, executor_res) = + tokio::join!(read_messages, batcher, executor); + } + PageServiceProtocolPipeliningMode::Tasks => { + // These tasks are not tracked anywhere. + let read_messages_task = tokio::spawn(read_messages); + let batcher_task = tokio::spawn(batcher); + let (read_messages_task_res, batcher_task_res, executor_res_) = + tokio::join!(read_messages_task, batcher_task, executor,); + (read_messages_res, _batcher_res, executor_res) = ( + read_messages_task_res.expect("propagated panic from read_messages"), + batcher_task_res.expect("propagated panic from batcher"), + executor_res_, + ); + } + } (read_messages_res, executor_res) } diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 29d3f41179..669ce32d57 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -17,15 +17,20 @@ TARGET_RUNTIME = 30 @dataclass class PageServicePipeliningConfig: max_batch_size: int + protocol_pipelining_mode: str +PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"] + NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] for max_batch_size in [1, 32]: - NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size)) + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] for max_batch_size in [1, 2, 4, 8, 16, 32]: - BATCHABLE.append(PageServicePipeliningConfig(max_batch_size)) + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) @pytest.mark.parametrize( @@ -252,7 +257,10 @@ def test_throughput( PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None] for max_batch_size in [1, 32]: - PRECISION_CONFIGS.append(PageServicePipeliningConfig(max_batch_size)) + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + PRECISION_CONFIGS.append( + PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode) + ) @pytest.mark.parametrize(