From c1040bc25d171dd01903c967dd44ac7450423016 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 22 Nov 2024 08:56:30 +0100 Subject: [PATCH] task-based mode --- libs/pageserver_api/src/config.rs | 3 +- pageserver/src/page_service.rs | 245 ++++++++++-------- .../test_pageserver_getpage_merge.py | 41 +-- 3 files changed, 154 insertions(+), 135 deletions(-) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 29b59b8664..c43c9ebe32 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -129,7 +129,8 @@ pub struct DiskUsageEvictionTaskConfig { #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(deny_unknown_fields)] pub struct PageServicePipeliningConfig { - pub max_batch_size: usize, + // Causes runtime errors if larger than max get_vectored batch size. + pub max_batch_size: NonZeroUsize, } pub mod statvfs { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7dfd3888a1..196e2c0e4b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::borrow::Cow; use std::io; +use std::num::NonZeroUsize; use std::str; use std::str::FromStr; use std::sync::Arc; @@ -757,6 +758,7 @@ impl PageServerHandler { /// Post-condition: `maybe_carry` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] fn pagestream_do_batch( + max_batch_size: NonZeroUsize, maybe_carry: &mut Option>, this_msg: Box, ) -> Option> { @@ -785,9 +787,9 @@ impl PageServerHandler { }, ) if (|| { assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { - trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + if accum_pages.len() >= max_batch_size.get() { + trace!(%accum_lsn, %this_lsn, %max_batch_size, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), max_batch_size.get()); return false; } if (accum_shard.tenant_shard_id, accum_shard.timeline_id) @@ -1008,46 +1010,44 @@ impl PageServerHandler { let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1); let request_span = info_span!("request", shard_id = tracing::field::Empty); - let read_message_task: JoinHandle> = tokio::spawn( - { - let cancel = self.cancel.child_token(); - let ctx = ctx.attached_child(); - async move { - scopeguard::defer! { - debug!("exiting"); - } - let mut pgb_reader = pgb_reader; - loop { - let msg = Self::pagestream_read_message( - &mut pgb_reader, - tenant_id, - timeline_id, - &mut timeline_handles, - &cancel, - &ctx, - request_span.clone(), - ) - .await?; - let msg = match msg { - Some(msg) => msg, - None => { - debug!("pagestream subprotocol end observed"); - break; - } - }; - match requests_tx.send(msg).await { - Ok(()) => {} - Err(tokio::sync::mpsc::error::SendError(_)) => { - debug!("downstream is gone"); - break; - } + let read_messages = { + let cancel = self.cancel.child_token(); + let ctx = ctx.attached_child(); + async move { + scopeguard::defer! { + debug!("exiting"); + } + let mut pgb_reader = pgb_reader; + loop { + let msg = Self::pagestream_read_message( + &mut pgb_reader, + tenant_id, + timeline_id, + &mut timeline_handles, + &cancel, + &ctx, + request_span.clone(), + ) + .await?; + let msg = match msg { + Some(msg) => msg, + None => { + debug!("pagestream subprotocol end observed"); + break; + } + }; + match requests_tx.send(msg).await { + Ok(()) => {} + Err(tokio::sync::mpsc::error::SendError(_)) => { + debug!("downstream is gone"); + break; } } - Ok((pgb_reader, timeline_handles)) } + Ok((pgb_reader, timeline_handles)) } - .instrument(tracing::info_span!("read_protocol")), - ); + } + .instrument(tracing::info_span!("read_messages")); enum BatchState { Building(Option>), @@ -1065,89 +1065,106 @@ impl PageServerHandler { std::sync::Mutex::new(BatchState::Building(None)), )); let notify_batcher = Arc::new(tokio::sync::Notify::new()); - tokio::spawn( - { - let notify_batcher = notify_batcher.clone(); - async move { - scopeguard::defer! { - debug!("exiting"); - } - loop { - let maybe_req = requests_rx.recv().await; - let Some(req) = maybe_req else { - batch_tx.send_modify(|pending_batch| { - let mut guard = pending_batch.lock().unwrap(); - match &mut *guard { - BatchState::Building(carry) => { - *guard = BatchState::UpstreamDead(carry.take()); - } - BatchState::UpstreamDead(_) => panic!("twice"), + let batcher = { + let notify_batcher = notify_batcher.clone(); + let max_batch_size = self + .pipelining_config + .as_ref() + .map(|PageServicePipeliningConfig { max_batch_size, .. }| *max_batch_size) + .unwrap_or(NonZeroUsize::new(1).unwrap()); + async move { + scopeguard::defer! { + debug!("exiting"); + } + loop { + let maybe_req = requests_rx.recv().await; + let Some(req) = maybe_req else { + batch_tx.send_modify(|pending_batch| { + let mut guard = pending_batch.lock().unwrap(); + match &mut *guard { + BatchState::Building(carry) => { + *guard = BatchState::UpstreamDead(carry.take()); } - }); - break; - }; - // don't read new requests before this one has been processed - let mut req = Some(req); - loop { - let mut wait_notified = None; - let batched = batch_tx.send_if_modified(|pending_batch| { - let mut guard = pending_batch.lock().unwrap(); - let building = guard.must_building_mut(); - match Self::pagestream_do_batch(building, req.take().unwrap()) { - Some(req_was_not_batched) => { - req.replace(req_was_not_batched); - wait_notified = Some(notify_batcher.notified()); - false - } - None => true, - } - }); - if batched { - break; - } else { - wait_notified.unwrap().await; + BatchState::UpstreamDead(_) => panic!("twice"), } + }); + break; + }; + // don't read new requests before this one has been processed + let mut req = Some(req); + loop { + let mut wait_notified = None; + let batched = batch_tx.send_if_modified(|pending_batch| { + let mut guard = pending_batch.lock().unwrap(); + let building = guard.must_building_mut(); + match Self::pagestream_do_batch( + max_batch_size, + building, + req.take().unwrap(), + ) { + Some(req_was_not_batched) => { + req.replace(req_was_not_batched); + wait_notified = Some(notify_batcher.notified()); + false + } + None => true, + } + }); + if batched { + break; + } else { + wait_notified.unwrap().await; } } } } - .instrument(tracing::info_span!("batching")), - ); - - let mut stop = false; - while !stop { - match batch_rx.changed().await { - Ok(()) => {} - Err(_) => { - debug!("batch_rx observed disconnection of batcher"); - } - }; - let maybe_batch = { - let borrow = batch_rx.borrow(); - let mut guard = borrow.lock().unwrap(); - match &mut *guard { - BatchState::Building(maybe_batch) => maybe_batch.take(), - BatchState::UpstreamDead(maybe_batch) => { - debug!("upstream dead"); - stop = true; - maybe_batch.take() - } - } - }; - let Some(batch) = maybe_batch else { - break; - }; - notify_batcher.notify_one(); - debug!("processing batch"); - self.pagesteam_handle_batched_message(pgb, *batch, &ctx) - .await?; } + .instrument(tracing::info_span!("batcher")); - let (pgb_reader, timeline_handles) = read_message_task - .await - .context("read message task panicked")? - // if the client made a protocol error, this is where we bubble up the QueryError - ?; + let executor = async { + let mut stop = false; + while !stop { + match batch_rx.changed().await { + Ok(()) => {} + Err(_) => { + debug!("batch_rx observed disconnection of batcher"); + } + }; + let maybe_batch = { + let borrow = batch_rx.borrow(); + let mut guard = borrow.lock().unwrap(); + match &mut *guard { + BatchState::Building(maybe_batch) => maybe_batch.take(), + BatchState::UpstreamDead(maybe_batch) => { + debug!("upstream dead"); + stop = true; + maybe_batch.take() + } + } + }; + let Some(batch) = maybe_batch else { + break; + }; + notify_batcher.notify_one(); + debug!("processing batch"); + self.pagesteam_handle_batched_message(pgb, *batch, &ctx) + .await?; + } + Ok(()) + }; + + let (read_message_task_res, _, executor_res): (_, (), _) = + tokio::join!(read_messages, batcher, executor); + + let (pgb_reader, timeline_handles) = match (read_message_task_res, executor_res) { + (_, Err(e)) => { + return Err(e); + } + (Err(e), _) => { + return Err(e); + } + (Ok((pgb_reader, timeline_handles)), Ok(())) => (pgb_reader, timeline_handles), + }; debug!("pagestream subprotocol shut down cleanly"); diff --git a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py index 3fddb506de..db054eb91e 100644 --- a/test_runner/performance/pageserver/test_pageserver_getpage_merge.py +++ b/test_runner/performance/pageserver/test_pageserver_getpage_merge.py @@ -13,28 +13,24 @@ from fixtures.utils import humantime_to_ms TARGET_RUNTIME = 5 - +MAX_BATCH_SIZES = [None, 1, 2, 4, 8, 16, 32] @pytest.mark.parametrize( - "tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name", + "tablesize_mib, max_batch_size, target_runtime, effective_io_concurrency, readhead_buffer_size, name", [ # the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout - (50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"), - (50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"), - (50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"), - (50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"), - # the next 4 cases demonstrate how batchable workloads benefit from batching - (50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"), - (50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"), - (50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"), - (50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"), - (50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"), + *[ + (50, n, TARGET_RUNTIME, 1, 128, f"not batchable max batch size {n}") for n in MAX_BATCH_SIZES + ], + *[ + (50, n, TARGET_RUNTIME, 100, 128, f"batchable max batch size {n}") for n in MAX_BATCH_SIZES + ] ], ) def test_getpage_merge_smoke( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, tablesize_mib: int, - batch_timeout: Optional[str], + max_batch_size: Optional[int], target_runtime: int, effective_io_concurrency: int, readhead_buffer_size: int, @@ -52,9 +48,9 @@ def test_getpage_merge_smoke( params.update( { "tablesize_mib": (tablesize_mib, {"unit": "MiB"}), - "batch_timeout": ( - -1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout), - {"unit": "us"}, + "max_batch_size": ( + -1 if max_batch_size is None else max_batch_size, + {}, ), # target_runtime is just a polite ask to the workload to run for this long "effective_io_concurrency": (effective_io_concurrency, {}), @@ -171,7 +167,9 @@ def test_getpage_merge_smoke( after = get_metrics() return (after - before).normalize(iters - 1) - env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout}) + env.pageserver.patch_config_toml_nonrecursive({"page_service_pipelining": { + "max_batch_size": max_batch_size, + }} if max_batch_size is not None else {}) env.pageserver.restart() metrics = workload() @@ -201,13 +199,13 @@ def test_getpage_merge_smoke( @pytest.mark.parametrize( - "batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"] + "max_batch_size", [None, 1, 32] ) def test_timer_precision( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin, - batch_timeout: Optional[str], + max_batch_size: Optional[int], ): """ Determine the batching timeout precision (mean latency) and tail latency impact. @@ -223,7 +221,10 @@ def test_timer_precision( # def patch_ps_config(ps_config): - ps_config["server_side_batch_timeout"] = batch_timeout + if max_batch_size is not None: + ps_config["page_service_pipelining"] = { + "max_batch_size": max_batch_size, + } neon_env_builder.pageserver_config_override = patch_ps_config