diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index a0a6dedcdd..c831c84f17 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -109,7 +109,7 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, - pub page_service_pipelining: Option, + pub page_service_pipelining: PageServicePipeliningConfig, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -127,16 +127,23 @@ pub struct DiskUsageEvictionTaskConfig { } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(tag = "mode", rename_all = "kebab-case")] #[serde(deny_unknown_fields)] -pub struct PageServicePipeliningConfig { +pub enum PageServicePipeliningConfig { + Serial, + Pipelined(PageServicePipeliningConfigPipelined), +} +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct PageServicePipeliningConfigPipelined { /// Causes runtime errors if larger than max get_vectored batch size. pub max_batch_size: NonZeroUsize, - pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode, + pub execution: PageServiceProtocolPipelinedExecutionStrategy, } #[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub enum PageServiceProtocolPipeliningMode { +pub enum PageServiceProtocolPipelinedExecutionStrategy { ConcurrentFutures, Tasks, } @@ -415,10 +422,12 @@ impl Default for ConfigToml { virtual_file_io_mode: None, tenant_config: TenantConfigToml::default(), no_sync: None, - page_service_pipelining: Some(PageServicePipeliningConfig { - max_batch_size: NonZeroUsize::new(32).unwrap(), - protocol_pipelining_mode: PageServiceProtocolPipeliningMode::ConcurrentFutures, - }), + page_service_pipelining: PageServicePipeliningConfig::Pipelined( + PageServicePipeliningConfigPipelined { + max_batch_size: NonZeroUsize::new(32).unwrap(), + execution: PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures, + }, + ), } } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 86c3621cf0..99c9590ffe 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -183,7 +183,7 @@ pub struct PageServerConf { /// Optionally disable disk syncs (unsafe!) pub no_sync: bool, - pub page_service_pipelining: Option, + pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig, } /// Token for authentication to safekeepers diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 002ad9ddbd..8cb82d0449 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,7 +7,10 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; -use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode}; +use pageserver_api::config::{ + PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, + PageServiceProtocolPipelinedExecutionStrategy, +}; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -159,7 +162,7 @@ pub async fn libpq_listener_main( auth: Option>, listener: tokio::net::TcpListener, auth_type: AuthType, - pipelining_config: Option, + pipelining_config: PageServicePipeliningConfig, listener_ctx: RequestContext, listener_cancel: CancellationToken, ) -> Connections { @@ -218,7 +221,7 @@ async fn page_service_conn_main( auth: Option>, socket: tokio::net::TcpStream, auth_type: AuthType, - pipelining_config: Option, + pipelining_config: PageServicePipeliningConfig, connection_ctx: RequestContext, cancel: CancellationToken, ) -> ConnectionHandlerResult { @@ -320,7 +323,7 @@ struct PageServerHandler { /// None only while pagestream protocol is being processed. timeline_handles: Option, - pipelining_config: Option, + pipelining_config: PageServicePipeliningConfig, } struct TimelineHandles { @@ -571,7 +574,7 @@ impl PageServerHandler { pub fn new( tenant_manager: Arc, auth: Option>, - pipelining_config: Option, + pipelining_config: PageServicePipeliningConfig, connection_ctx: RequestContext, cancel: CancellationToken, ) -> Self { @@ -1003,7 +1006,7 @@ impl PageServerHandler { let request_span = info_span!("request", shard_id = tracing::field::Empty); let ((pgb_reader, timeline_handles), result) = match self.pipelining_config.clone() { - Some(pipelining_config) => { + PageServicePipeliningConfig::Pipelined(pipelining_config) => { self.handle_pagerequests_pipelined( pgb, pgb_reader, @@ -1016,7 +1019,7 @@ impl PageServerHandler { ) .await } - None => { + PageServicePipeliningConfig::Serial => { self.handle_pagerequests_serial( pgb, pgb_reader, @@ -1104,7 +1107,7 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, - pipelining_config: PageServicePipeliningConfig, + pipelining_config: PageServicePipeliningConfigPipelined, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1162,9 +1165,9 @@ impl PageServerHandler { // the batch that was in flight when the Batcher encountered an error, // thereby beahving identical to a serial implementation. - let PageServicePipeliningConfig { + let PageServicePipeliningConfigPipelined { max_batch_size, - protocol_pipelining_mode, + execution, } = pipelining_config; // Macro to _define_ a pipeline stage. @@ -1285,11 +1288,11 @@ impl PageServerHandler { // Execute the stages. // - match protocol_pipelining_mode { - PageServiceProtocolPipeliningMode::ConcurrentFutures => { + match execution { + PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => { tokio::join!(read_messages, executor) } - PageServiceProtocolPipeliningMode::Tasks => { + PageServiceProtocolPipelinedExecutionStrategy::Tasks => { // These tasks are not tracked anywhere. let read_messages_task = tokio::spawn(read_messages); let (read_messages_task_res, executor_res_) = diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 669ce32d57..8f38228d42 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -3,7 +3,7 @@ import json import time from dataclasses import dataclass from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Union import pytest from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker @@ -16,21 +16,31 @@ TARGET_RUNTIME = 30 @dataclass class PageServicePipeliningConfig: + pass + +@dataclass +class PageServicePipeliningConfigSerial(PageServicePipeliningConfig): + mode: str = "serial" + + +@dataclass +class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig): max_batch_size: int - protocol_pipelining_mode: str + execution: str + mode: str = "pipelined" -PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"] +EXECUTION = ["concurrent-futures", "tasks"] -NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] for max_batch_size in [1, 32]: - for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: - NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + for execution in EXECUTION: + NON_BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) -BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] for max_batch_size in [1, 2, 4, 8, 16, 32]: - for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: - BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + for execution in EXECUTION: + BATCHABLE.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) @pytest.mark.parametrize( @@ -45,7 +55,7 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]: TARGET_RUNTIME, 1, 128, - f"not batchable {dataclasses.asdict(config) if config else None}", + f"not batchable {dataclasses.asdict(config)}", ) for config in NON_BATCHABLE ], @@ -57,7 +67,7 @@ for max_batch_size in [1, 2, 4, 8, 16, 32]: TARGET_RUNTIME, 100, 128, - f"batchable {dataclasses.asdict(config) if config else None}", + f"batchable {dataclasses.asdict(config)}", ) for config in BATCHABLE ], @@ -67,7 +77,7 @@ def test_throughput( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, tablesize_mib: int, - pipelining_config: None | PageServicePipeliningConfig, + pipelining_config: PageServicePipeliningConfig, target_runtime: int, effective_io_concurrency: int, readhead_buffer_size: int, @@ -99,20 +109,18 @@ def test_throughput( params.update( { "tablesize_mib": (tablesize_mib, {"unit": "MiB"}), - "pipelining_enabled": (1 if pipelining_config else 0, {}), # target_runtime is just a polite ask to the workload to run for this long "effective_io_concurrency": (effective_io_concurrency, {}), "readhead_buffer_size": (readhead_buffer_size, {}), # name is not a metric, we just use it to identify the test easily in the `test_...[...]`` notation } ) - if pipelining_config: - params.update( - { - f"pipelining_config.{k}": (v, {}) - for k, v in dataclasses.asdict(pipelining_config).items() - } - ) + params.update( + { + f"pipelining_config.{k}": (v, {}) + for k, v in dataclasses.asdict(pipelining_config).items() + } + ) log.info("params: %s", params) @@ -224,8 +232,6 @@ def test_throughput( env.pageserver.patch_config_toml_nonrecursive( {"page_service_pipelining": dataclasses.asdict(pipelining_config)} - if pipelining_config is not None - else {} ) env.pageserver.restart() metrics = workload() @@ -255,23 +261,21 @@ def test_throughput( ) -PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None] +PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()] for max_batch_size in [1, 32]: - for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: - PRECISION_CONFIGS.append( - PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode) - ) + for execution in EXECUTION: + PRECISION_CONFIGS.append(PageServicePipeliningConfigPipelined(max_batch_size, execution)) @pytest.mark.parametrize( "pipelining_config,name", - [(config, f"{dataclasses.asdict(config) if config else None}") for config in PRECISION_CONFIGS], + [(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS], ) def test_latency( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin, - pipelining_config: Optional[PageServicePipeliningConfig], + pipelining_config: PageServicePipeliningConfig, name: str, ): """