diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index c43c9ebe32..22e61577df 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -129,8 +129,16 @@ pub struct DiskUsageEvictionTaskConfig { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(deny_unknown_fields)] pub struct PageServicePipeliningConfig { - // Causes runtime errors if larger than max get_vectored batch size. + /// Causes runtime errors if larger than max get_vectored batch size. pub max_batch_size: NonZeroUsize, + pub protocol_pipelining_mode: PageServiceProtocolPipeliningMode, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum PageServiceProtocolPipeliningMode { + ConcurrentFutures, + Tasks, } pub mod statvfs { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index efbb8342ba..795b4bcb68 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -7,7 +7,7 @@ use bytes::Buf; use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; -use pageserver_api::config::PageServicePipeliningConfig; +use pageserver_api::config::{PageServicePipeliningConfig, PageServiceProtocolPipeliningMode}; use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -757,6 +757,7 @@ impl PageServerHandler { /// Post-condition: `maybe_carry` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] + #[allow(clippy::boxed_local)] fn pagestream_do_batch( max_batch_size: NonZeroUsize, maybe_carry: &mut Option>, @@ -838,7 +839,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::exists"); ( vec![ - self.handle_get_rel_exists_request(&shard, &req, &ctx) + self.handle_get_rel_exists_request(&shard, &req, ctx) .instrument(span.clone()) .await, ], @@ -849,7 +850,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); ( vec![ - self.handle_get_nblocks_request(&shard, &req, &ctx) + self.handle_get_nblocks_request(&shard, &req, ctx) .instrument(span.clone()) .await, ], @@ -872,7 +873,7 @@ impl PageServerHandler { &shard, effective_request_lsn, pages, - &ctx, + ctx, ) .instrument(span.clone()) .await; @@ -886,7 +887,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::dbsize"); ( vec![ - self.handle_db_size_request(&shard, &req, &ctx) + self.handle_db_size_request(&shard, &req, ctx) .instrument(span.clone()) .await, ], @@ -897,7 +898,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); ( vec![ - self.handle_get_slru_segment_request(&shard, &req, &ctx) + self.handle_get_slru_segment_request(&shard, &req, ctx) .instrument(span.clone()) .await, ], @@ -1009,8 +1010,8 @@ impl PageServerHandler { .expect("implementation error: timeline_handles should not be locked"); let request_span = info_span!("request", shard_id = tracing::field::Empty); - let (pgb_reader, timeline_handles) = match self.pipelining_config { - Some(PageServicePipeliningConfig { max_batch_size, .. }) => { + let (pgb_reader, timeline_handles) = match self.pipelining_config.clone() { + Some(pipelining_config) => { self.handle_pagerequests_pipelined( pgb, pgb_reader, @@ -1018,7 +1019,7 @@ impl PageServerHandler { timeline_id, timeline_handles, request_span, - max_batch_size, + pipelining_config, &ctx, ) .await @@ -1048,6 +1049,7 @@ impl PageServerHandler { Ok(()) } + #[allow(clippy::too_many_arguments)] async fn handle_pagerequests_serial( &mut self, pgb_writer: &mut PostgresBackend, @@ -1068,7 +1070,7 @@ impl PageServerHandler { timeline_id, &mut timeline_handles, &self.cancel, - &ctx, + ctx, request_span.clone(), ) .await?; @@ -1079,11 +1081,12 @@ impl PageServerHandler { return Ok((pgb_reader, timeline_handles)); } }; - self.pagesteam_handle_batched_message(pgb_writer, *msg, &ctx) + self.pagesteam_handle_batched_message(pgb_writer, *msg, ctx) .await?; } } + #[allow(clippy::too_many_arguments)] async fn handle_pagerequests_pipelined( &mut self, pgb_writer: &mut PostgresBackend, @@ -1092,12 +1095,17 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, - max_batch_size: NonZeroUsize, + pipelining_config: PageServicePipeliningConfig, ctx: &RequestContext, ) -> Result<(PostgresBackendReader, TimelineHandles), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { + let PageServicePipeliningConfig { + max_batch_size, + protocol_pipelining_mode, + } = pipelining_config; + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let read_messages = { let cancel = self.cancel.child_token(); @@ -1231,19 +1239,38 @@ impl PageServerHandler { }; notify_batcher.notify_one(); debug!("processing batch"); - self.pagesteam_handle_batched_message(pgb_writer, *batch, &ctx) + self.pagesteam_handle_batched_message(pgb_writer, *batch, ctx) .await?; } Ok(()) }; - let (read_message_task_res, _, executor_res): (_, (), _) = - tokio::join!(read_messages, batcher, executor); + let read_messages_res; + let executor_res; + match protocol_pipelining_mode { + PageServiceProtocolPipeliningMode::ConcurrentFutures => { + (read_messages_res, _, executor_res) = + tokio::join!(read_messages, batcher, executor); + } + PageServiceProtocolPipeliningMode::Tasks => { + // cancelled via sensitivity to self.cancel + let read_messages_task = tokio::task::spawn(read_messages); + // cancelled when it observes read_messages_task disconnect the channel + let batcher_task = tokio::task::spawn(batcher); + executor_res = executor.await; + read_messages_res = read_messages_task + .await + .context("read_messages task panicked, check logs for details")?; + let _: () = batcher_task + .await + .context("batcher task panicked, check logs for details")?; + } + } - match (read_message_task_res, executor_res) { + match (read_messages_res, executor_res) { (Err(e), _) | (_, Err(e)) => { let e: QueryError = e; - return Err(e); + Err(e) } (Ok((pgb_reader, timeline_handles)), Ok(())) => Ok((pgb_reader, timeline_handles)), } @@ -1395,7 +1422,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1425,7 +1452,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1455,7 +1482,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, @@ -1513,7 +1540,7 @@ impl PageServerHandler { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( - &timeline, + timeline, req.request_lsn, req.not_modified_since, &latest_gc_cutoff_lsn, diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index db054eb91e..e8ccbc4854 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -13,24 +13,61 @@ from fixtures.utils import humantime_to_ms TARGET_RUNTIME = 5 -MAX_BATCH_SIZES = [None, 1, 2, 4, 8, 16, 32] + +@dataclass +class PageServicePipeliningConfig: + max_batch_size: int + protocol_pipelining_mode: str + + +PROTOCOL_PIPELINING_MODES = ["concurrent-futures", "tasks"] + +NON_BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + NON_BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + +BATCHABLE: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 2, 4, 8, 16, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + BATCHABLE.append(PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode)) + + @pytest.mark.parametrize( - "tablesize_mib, max_batch_size, target_runtime, effective_io_concurrency, readhead_buffer_size, name", + "tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name", [ - # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout + # non-batchable workloads should identically modulo overheads of pipelining and batching. + # importantly, latency of pipelined configs should be no worse than non-pipelined *[ - (50, n, TARGET_RUNTIME, 1, 128, f"not batchable max batch size {n}") for n in MAX_BATCH_SIZES + ( + 50, + config, + TARGET_RUNTIME, + 1, + 128, + f"not batchable {dataclasses.asdict(config) if config else None}", + ) + for config in NON_BATCHABLE ], + # batchable workloads should show throughput and CPU efficiency improvements *[ - (50, n, TARGET_RUNTIME, 100, 128, f"batchable max batch size {n}") for n in MAX_BATCH_SIZES - ] + ( + 50, + config, + TARGET_RUNTIME, + 100, + 128, + f"batchable {dataclasses.asdict(config) if config else None}", + ) + for config in BATCHABLE + ], ], ) def test_getpage_merge_smoke( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, tablesize_mib: int, - max_batch_size: Optional[int], + pipelining_config: None | PageServicePipeliningConfig, target_runtime: int, effective_io_concurrency: int, readhead_buffer_size: int, @@ -48,16 +85,20 @@ def test_getpage_merge_smoke( params.update( { "tablesize_mib": (tablesize_mib, {"unit": "MiB"}), - "max_batch_size": ( - -1 if max_batch_size is None else max_batch_size, - {}, - ), + "pipelining_enabled": (1 if pipelining_config else 0, {}), # target_runtime is just a polite ask to the workload to run for this long "effective_io_concurrency": (effective_io_concurrency, {}), "readhead_buffer_size": (readhead_buffer_size, {}), # name is not a metric } ) + if pipelining_config: + params.update( + { + f"pipelining_config.{k}": (v, {}) + for k, v in dataclasses.asdict(pipelining_config).items() + } + ) log.info("params: %s", params) @@ -167,9 +208,11 @@ def test_getpage_merge_smoke( after = get_metrics() return (after - before).normalize(iters - 1) - env.pageserver.patch_config_toml_nonrecursive({"page_service_pipelining": { - "max_batch_size": max_batch_size, - }} if max_batch_size is not None else {}) + env.pageserver.patch_config_toml_nonrecursive( + {"page_service_pipelining": dataclasses.asdict(pipelining_config)} + if pipelining_config is not None + else {} + ) env.pageserver.restart() metrics = workload() @@ -198,14 +241,20 @@ def test_getpage_merge_smoke( ) -@pytest.mark.parametrize( - "max_batch_size", [None, 1, 32] -) +PRECISION_CONFIGS: list[Optional[PageServicePipeliningConfig]] = [None] +for max_batch_size in [1, 32]: + for protocol_pipelining_mode in PROTOCOL_PIPELINING_MODES: + PRECISION_CONFIGS.append( + PageServicePipeliningConfig(max_batch_size, protocol_pipelining_mode) + ) + + +@pytest.mark.parametrize("pipelining_config", PRECISION_CONFIGS) def test_timer_precision( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin, - max_batch_size: Optional[int], + pipelining_config: Optional[PageServicePipeliningConfig], ): """ Determine the batching timeout precision (mean latency) and tail latency impact. @@ -221,10 +270,8 @@ def test_timer_precision( # def patch_ps_config(ps_config): - if max_batch_size is not None: - ps_config["page_service_pipelining"] = { - "max_batch_size": max_batch_size, - } + if pipelining_config is not None: + ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config) neon_env_builder.pageserver_config_override = patch_ps_config