diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 59bb3410f9..2a779b0daa 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1714,6 +1714,28 @@ pub enum SmgrQueryType { Test, } +#[derive( + Debug, + Clone, + Copy, + IntoStaticStr, + strum_macros::EnumCount, + strum_macros::EnumIter, + strum_macros::FromRepr, + enum_map::Enum, +)] +#[strum(serialize_all = "snake_case")] +pub enum GetPageBatchBreakReason { + BatchFull, + NonBatchableRequest, + NonUniformLsn, + SamePageAtDifferentLsn, + NonUniformTimeline, + ExecutorSteal, + #[cfg(feature = "testing")] + NonUniformKey, +} + pub(crate) struct SmgrQueryTimePerTimeline { global_started: [IntCounter; SmgrQueryType::COUNT], global_latency: [Histogram; SmgrQueryType::COUNT], @@ -1725,6 +1747,8 @@ pub(crate) struct SmgrQueryTimePerTimeline { per_timeline_flush_in_progress_micros: IntCounter, global_batch_wait_time: Histogram, per_timeline_batch_wait_time: Histogram, + global_batch_break_reason: [IntCounter; GetPageBatchBreakReason::COUNT], + per_timeline_batch_break_reason: GetPageBatchBreakReasonTimelineMetrics, throttling: Arc, } @@ -1858,6 +1882,49 @@ static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy = Lazy::n .expect("failed to define a metric") }); +static PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL: Lazy = Lazy::new(|| { + register_int_counter_vec!( + // it's a counter, but, name is prepared to extend it to a histogram of queue depth + "pageserver_page_service_batch_break_reason_global", + "Reason for breaking batches of get page requests", + &["reason"], + ) + .expect("failed to define a metric") +}); + +struct GetPageBatchBreakReasonTimelineMetrics { + map: EnumMap, +} + +impl GetPageBatchBreakReasonTimelineMetrics { + fn new(tenant_id: &str, shard_slug: &str, timeline_id: &str) -> Self { + GetPageBatchBreakReasonTimelineMetrics { + map: EnumMap::from_array(std::array::from_fn(|reason_idx| { + let reason = GetPageBatchBreakReason::from_usize(reason_idx); + PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.with_label_values(&[ + tenant_id, + shard_slug, + timeline_id, + reason.into(), + ]) + })), + } + } + + fn inc(&self, reason: GetPageBatchBreakReason) { + self.map[reason].inc() + } +} + +static PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_service_batch_break_reason", + "Reason for breaking batches of get page requests", + &["tenant_id", "shard_id", "timeline_id", "reason"], + ) + .expect("failed to define a metric") +}); + pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy = Lazy::new(|| { register_int_gauge_vec!( "pageserver_page_service_config_max_batch_size", @@ -1985,6 +2052,15 @@ impl SmgrQueryTimePerTimeline { .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) .unwrap(); + let global_batch_break_reason = std::array::from_fn(|i| { + let reason = GetPageBatchBreakReason::from_usize(i); + PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL + .get_metric_with_label_values(&[reason.into()]) + .unwrap() + }); + let per_timeline_batch_break_reason = + GetPageBatchBreakReasonTimelineMetrics::new(&tenant_id, &shard_slug, &timeline_id); + let global_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone(); let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS @@ -2002,6 +2078,8 @@ impl SmgrQueryTimePerTimeline { per_timeline_flush_in_progress_micros, global_batch_wait_time, per_timeline_batch_wait_time, + global_batch_break_reason, + per_timeline_batch_break_reason, throttling: pagestream_throttle_metrics, } } @@ -2030,9 +2108,16 @@ impl SmgrQueryTimePerTimeline { } /// TODO: do something about this? seems odd, we have a similar call on SmgrOpTimer - pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) { + pub(crate) fn observe_getpage_batch_start( + &self, + batch_size: usize, + break_reason: GetPageBatchBreakReason, + ) { self.global_batch_size.observe(batch_size as f64); self.per_timeline_batch_size.observe(batch_size as f64); + + self.global_batch_break_reason[break_reason.into_usize()].inc(); + self.per_timeline_batch_break_reason.inc(break_reason); } } @@ -3398,6 +3483,15 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + + for reason in GetPageBatchBreakReason::iter() { + let _ = PAGE_SERVICE_BATCH_BREAK_REASON_PER_TENANT_TIMELINE.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + reason.into(), + ]); + } } } @@ -4276,6 +4370,7 @@ pub fn preinitialize_metrics( [ &BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT, &SMGR_QUERY_STARTED_GLOBAL, + &PAGE_SERVICE_BATCH_BREAK_REASON_GLOBAL, ] .into_iter() .for_each(|c| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 26eea5183b..7a62d8049b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -58,8 +58,8 @@ use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, }; use crate::metrics::{ - self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer, - TimelineMetrics, + self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS, + SmgrOpTimer, TimelineMetrics, }; use crate::pgdatadir_mapping::Version; use crate::span::{ @@ -672,6 +672,7 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::WeakHandle, pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + batch_break_reason: GetPageBatchBreakReason, }, DbSize { span: Span, @@ -724,6 +725,119 @@ impl BatchedFeMessage { BatchedFeMessage::RespondError { .. } => {} } } + + fn should_break_batch( + &self, + other: &BatchedFeMessage, + max_batch_size: NonZeroUsize, + batching_strategy: PageServiceProtocolPipelinedBatchingStrategy, + ) -> Option { + match (self, other) { + ( + BatchedFeMessage::GetPage { + shard: accum_shard, + pages: accum_pages, + .. + }, + BatchedFeMessage::GetPage { + shard: this_shard, + pages: this_pages, + .. + }, + ) => { + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= max_batch_size.get() { + trace!(%max_batch_size, "stopping batching because of batch size"); + assert_eq!(accum_pages.len(), max_batch_size.get()); + + return Some(GetPageBatchBreakReason::BatchFull); + } + if !accum_shard.is_same_handle_as(this_shard) { + trace!("stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + + return Some(GetPageBatchBreakReason::NonUniformTimeline); + } + + match batching_strategy { + PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => { + if let Some(last_in_batch) = accum_pages.last() { + if last_in_batch.effective_request_lsn + != this_pages[0].effective_request_lsn + { + trace!( + accum_lsn = %last_in_batch.effective_request_lsn, + this_lsn = %this_pages[0].effective_request_lsn, + "stopping batching because LSN changed" + ); + + return Some(GetPageBatchBreakReason::NonUniformLsn); + } + } + } + PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => { + // The read path doesn't curently support serving the same page at different LSNs. + // While technically possible, it's uncertain if the complexity is worth it. + // Break the batch if such a case is encountered. + let same_page_different_lsn = accum_pages.iter().any(|batched| { + batched.req.rel == this_pages[0].req.rel + && batched.req.blkno == this_pages[0].req.blkno + && batched.effective_request_lsn + != this_pages[0].effective_request_lsn + }); + + if same_page_different_lsn { + trace!( + rel=%this_pages[0].req.rel, + blkno=%this_pages[0].req.blkno, + lsn=%this_pages[0].effective_request_lsn, + "stopping batching because same page was requested at different LSNs" + ); + + return Some(GetPageBatchBreakReason::SamePageAtDifferentLsn); + } + } + } + + None + } + #[cfg(feature = "testing")] + ( + BatchedFeMessage::Test { + shard: accum_shard, + requests: accum_requests, + .. + }, + BatchedFeMessage::Test { + shard: this_shard, + requests: this_requests, + .. + }, + ) => { + assert!(this_requests.len() == 1); + if accum_requests.len() >= max_batch_size.get() { + trace!(%max_batch_size, "stopping batching because of batch size"); + assert_eq!(accum_requests.len(), max_batch_size.get()); + return Some(GetPageBatchBreakReason::BatchFull); + } + if !accum_shard.is_same_handle_as(this_shard) { + trace!("stopping batching because timeline object mismatch"); + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logic for keeping responses in order does not support that. + return Some(GetPageBatchBreakReason::NonUniformTimeline); + } + let this_batch_key = this_requests[0].req.batch_key; + let accum_batch_key = accum_requests[0].req.batch_key; + if this_requests[0].req.batch_key != accum_requests[0].req.batch_key { + trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed"); + return Some(GetPageBatchBreakReason::NonUniformKey); + } + None + } + (_, _) => Some(GetPageBatchBreakReason::NonBatchableRequest), + } + } } impl PageServerHandler { @@ -1047,6 +1161,10 @@ impl PageServerHandler { effective_request_lsn, ctx, }], + // The executor grabs the batch when it becomes idle. + // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the + // default reason for breaking the batch. + batch_break_reason: GetPageBatchBreakReason::ExecutorSteal, } } #[cfg(feature = "testing")] @@ -1084,118 +1202,59 @@ impl PageServerHandler { Err(e) => return Err(Err(e)), }; - match (&mut *batch, this_msg) { - // something batched already, let's see if we can add this message to the batch - ( - Ok(BatchedFeMessage::GetPage { - span: _, - shard: accum_shard, - pages: accum_pages, - }), - BatchedFeMessage::GetPage { - span: _, - shard: this_shard, - pages: this_pages, - }, - ) if (|| { - assert_eq!(this_pages.len(), 1); - if accum_pages.len() >= max_batch_size.get() { - trace!(%max_batch_size, "stopping batching because of batch size"); - assert_eq!(accum_pages.len(), max_batch_size.get()); - return false; - } - if !accum_shard.is_same_handle_as(&this_shard) { - trace!("stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; - } - - match batching_strategy { - PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => { - if let Some(last_in_batch) = accum_pages.last() { - if last_in_batch.effective_request_lsn - != this_pages[0].effective_request_lsn - { - return false; - } - } - } - PageServiceProtocolPipelinedBatchingStrategy::ScatteredLsn => { - // The read path doesn't curently support serving the same page at different LSNs. - // While technically possible, it's uncertain if the complexity is worth it. - // Break the batch if such a case is encountered. - // - // TODO(vlad): Include a metric for batch breaks with a reason label. - let same_page_different_lsn = accum_pages.iter().any(|batched| { - batched.req.rel == this_pages[0].req.rel - && batched.req.blkno == this_pages[0].req.blkno - && batched.effective_request_lsn - != this_pages[0].effective_request_lsn - }); - - if same_page_different_lsn { - trace!( - rel=%this_pages[0].req.rel, - blkno=%this_pages[0].req.blkno, - lsn=%this_pages[0].effective_request_lsn, - "stopping batching because same page was requested at different LSNs" - ); - return false; - } - } - } - - true - })() => - { - // ok to batch - accum_pages.extend(this_pages); - Ok(()) + let eligible_batch = match batch { + Ok(b) => b, + Err(_) => { + return Err(Ok(this_msg)); } - #[cfg(feature = "testing")] - ( - Ok(BatchedFeMessage::Test { - shard: accum_shard, - requests: accum_requests, - .. - }), - BatchedFeMessage::Test { - shard: this_shard, - requests: this_requests, - .. - }, - ) if (|| { - assert!(this_requests.len() == 1); - if accum_requests.len() >= max_batch_size.get() { - trace!(%max_batch_size, "stopping batching because of batch size"); - assert_eq!(accum_requests.len(), max_batch_size.get()); - return false; + }; + + let batch_break = + eligible_batch.should_break_batch(&this_msg, max_batch_size, batching_strategy); + + match batch_break { + Some(reason) => { + if let BatchedFeMessage::GetPage { + batch_break_reason, .. + } = eligible_batch + { + *batch_break_reason = reason; } - if !accum_shard.is_same_handle_as(&this_shard) { - trace!("stopping batching because timeline object mismatch"); - // TODO: we _could_ batch & execute each shard seperately (and in parallel). - // But the current logic for keeping responses in order does not support that. - return false; - } - let this_batch_key = this_requests[0].req.batch_key; - let accum_batch_key = accum_requests[0].req.batch_key; - if this_requests[0].req.batch_key != accum_requests[0].req.batch_key { - trace!(%accum_batch_key, %this_batch_key, "stopping batching because batch key changed"); - return false; - } - true - })() => - { - // ok to batch - accum_requests.extend(this_requests); - Ok(()) - } - // something batched already but this message is unbatchable - (_, this_msg) => { - // by default, don't continue batching + Err(Ok(this_msg)) } + None => { + // ok to batch + match (eligible_batch, this_msg) { + ( + BatchedFeMessage::GetPage { + pages: accum_pages, .. + }, + BatchedFeMessage::GetPage { + pages: this_pages, .. + }, + ) => { + accum_pages.extend(this_pages); + Ok(()) + } + #[cfg(feature = "testing")] + ( + BatchedFeMessage::Test { + requests: accum_requests, + .. + }, + BatchedFeMessage::Test { + requests: this_requests, + .. + }, + ) => { + accum_requests.extend(this_requests); + Ok(()) + } + // Shape guaranteed by [`BatchedFeMessage::should_break_batch`] + _ => unreachable!(), + } + } } } @@ -1413,7 +1472,12 @@ impl PageServerHandler { span, ) } - BatchedFeMessage::GetPage { span, shard, pages } => { + BatchedFeMessage::GetPage { + span, + shard, + pages, + batch_break_reason, + } => { fail::fail_point!("ps::handle-pagerequest-message::getpage"); let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( @@ -1425,6 +1489,7 @@ impl PageServerHandler { &shard, pages, io_concurrency, + batch_break_reason, &ctx, ) .instrument(span.clone()) @@ -2113,13 +2178,14 @@ impl PageServerHandler { timeline: &Timeline, requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, io_concurrency: IoConcurrency, + batch_break_reason: GetPageBatchBreakReason, ctx: &RequestContext, ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); timeline .query_metrics - .observe_getpage_batch_start(requests.len()); + .observe_getpage_batch_start(requests.len(), batch_break_reason); // If a page trace is running, submit an event for this request. if let Some(page_trace) = timeline.page_trace.load().as_ref() { diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index df500544dc..879808b7ba 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -194,6 +194,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( counter("pageserver_wait_lsn_started_count"), counter("pageserver_wait_lsn_finished_count"), counter("pageserver_wait_ondemand_download_seconds_sum"), + counter("pageserver_page_service_batch_break_reason"), *histogram("pageserver_page_service_batch_size"), *histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index 520a019cf5..b17ca772c9 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -1,7 +1,6 @@ import concurrent.futures import dataclasses import json -import re import threading import time from dataclasses import dataclass @@ -170,6 +169,7 @@ def test_throughput( time: float pageserver_batch_size_histo_sum: float pageserver_batch_size_histo_count: float + pageserver_batch_breaks_reason_count: dict[str, int] compute_getpage_count: float pageserver_cpu_seconds_total: float @@ -183,6 +183,10 @@ def test_throughput( compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count, pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total - other.pageserver_cpu_seconds_total, + pageserver_batch_breaks_reason_count={ + reason: count - other.pageserver_batch_breaks_reason_count.get(reason, 0) + for reason, count in self.pageserver_batch_breaks_reason_count.items() + }, ) def normalize(self, by) -> "Metrics": @@ -192,6 +196,10 @@ def test_throughput( pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by, compute_getpage_count=self.compute_getpage_count / by, pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by, + pageserver_batch_breaks_reason_count={ + reason: count / by + for reason, count in self.pageserver_batch_breaks_reason_count.items() + }, ) def get_metrics() -> Metrics: @@ -201,6 +209,20 @@ def test_throughput( ) compute_getpage_count = cur.fetchall()[0][0] pageserver_metrics = ps_http.get_metrics() + for name, samples in pageserver_metrics.metrics.items(): + for sample in samples: + log.info(f"{name=} labels={sample.labels} {sample.value}") + + raw_batch_break_reason_count = pageserver_metrics.query_all( + "pageserver_page_service_batch_break_reason_total", + filter={"timeline_id": str(env.initial_timeline)}, + ) + + batch_break_reason_count = { + sample.labels["reason"]: int(sample.value) + for sample in raw_batch_break_reason_count + } + return Metrics( time=time.time(), pageserver_batch_size_histo_sum=pageserver_metrics.query_one( @@ -209,6 +231,7 @@ def test_throughput( pageserver_batch_size_histo_count=pageserver_metrics.query_one( "pageserver_page_service_batch_size_count" ).value, + pageserver_batch_breaks_reason_count=batch_break_reason_count, compute_getpage_count=compute_getpage_count, pageserver_cpu_seconds_total=pageserver_metrics.query_one( "libmetrics_process_cpu_seconds_highres" @@ -263,25 +286,6 @@ def test_throughput( log.info("Results: %s", metrics) - since_last_start: list[str] = [] - for line in env.pageserver.logfile.read_text().splitlines(): - if "git:" in line: - since_last_start = [] - since_last_start.append(line) - - stopping_batching_because_re = re.compile( - r"stopping batching because (LSN changed|of batch size|timeline object mismatch|batch key changed|same page was requested at different LSNs|.*)" - ) - reasons_for_stopping_batching = {} - for line in since_last_start: - match = stopping_batching_because_re.search(line) - if match: - if match.group(1) not in reasons_for_stopping_batching: - reasons_for_stopping_batching[match.group(1)] = 0 - reasons_for_stopping_batching[match.group(1)] += 1 - - log.info("Reasons for stopping batching: %s", reasons_for_stopping_batching) - # # Sanity-checks on the collected data # @@ -295,7 +299,16 @@ def test_throughput( # for metric, value in dataclasses.asdict(metrics).items(): - zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM) + if metric == "pageserver_batch_breaks_reason_count": + assert isinstance(value, dict) + for reason, count in value.items(): + zenbenchmark.record( + f"counters.{metric}_{reason}", count, unit="", report=MetricReport.TEST_PARAM + ) + else: + zenbenchmark.record( + f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM + ) zenbenchmark.record( "perfmetric.batching_factor",