diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index a8c2c2e992..31f4370855 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -127,6 +127,7 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); + info!(?conf.page_service_pipelining, "starting with page service pipelining config"); // The tenants directory contains all the pageserver local disk state. // Create if not exists and make sure all the contents are durable before proceeding. @@ -302,7 +303,7 @@ fn start_pageserver( pageserver::metrics::tokio_epoll_uring::Collector::new(), )) .unwrap(); - pageserver::preinitialize_metrics(); + pageserver::preinitialize_metrics(conf); // If any failpoints were set from FAILPOINTS environment variable, // print them to the log for debugging purposes diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 7afcf52cf2..8f2177fe5b 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -91,8 +91,6 @@ use crate::task_mgr::TaskKind; -pub(crate) mod optional_counter; - // The main structure of this module, see module-level comment. #[derive(Debug)] pub struct RequestContext { @@ -100,7 +98,6 @@ pub struct RequestContext { download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, - pub micros_spent_throttled: optional_counter::MicroSecondsCounterU32, } /// The kind of access to the page cache. @@ -158,7 +155,6 @@ impl RequestContextBuilder { download_behavior: DownloadBehavior::Download, access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, - micros_spent_throttled: Default::default(), }, } } @@ -172,7 +168,6 @@ impl RequestContextBuilder { download_behavior: original.download_behavior, access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, - micros_spent_throttled: Default::default(), }, } } diff --git a/pageserver/src/context/optional_counter.rs b/pageserver/src/context/optional_counter.rs deleted file mode 100644 index 100c649f18..0000000000 --- a/pageserver/src/context/optional_counter.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::{ - sync::atomic::{AtomicU32, Ordering}, - time::Duration, -}; - -#[derive(Debug)] -pub struct CounterU32 { - inner: AtomicU32, -} -impl Default for CounterU32 { - fn default() -> Self { - Self { - inner: AtomicU32::new(u32::MAX), - } - } -} -impl CounterU32 { - pub fn open(&self) -> Result<(), &'static str> { - match self - .inner - .compare_exchange(u32::MAX, 0, Ordering::Relaxed, Ordering::Relaxed) - { - Ok(_) => Ok(()), - Err(_) => Err("open() called on clsoed state"), - } - } - pub fn close(&self) -> Result { - match self.inner.swap(u32::MAX, Ordering::Relaxed) { - u32::MAX => Err("close() called on closed state"), - x => Ok(x), - } - } - - pub fn add(&self, count: u32) -> Result<(), &'static str> { - if count == 0 { - return Ok(()); - } - let mut had_err = None; - self.inner - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| match cur { - u32::MAX => { - had_err = Some("add() called on closed state"); - None - } - x => { - let (new, overflowed) = x.overflowing_add(count); - if new == u32::MAX || overflowed { - had_err = Some("add() overflowed the counter"); - None - } else { - Some(new) - } - } - }) - .map_err(|_| had_err.expect("we set it whenever the function returns None")) - .map(|_| ()) - } -} - -#[derive(Default, Debug)] -pub struct MicroSecondsCounterU32 { - inner: CounterU32, -} - -impl MicroSecondsCounterU32 { - pub fn open(&self) -> Result<(), &'static str> { - self.inner.open() - } - pub fn add(&self, duration: Duration) -> Result<(), &'static str> { - match duration.as_micros().try_into() { - Ok(x) => self.inner.add(x), - Err(_) => Err("add(): duration conversion error"), - } - } - pub fn close_and_checked_sub_from(&self, from: Duration) -> Result { - let val = self.inner.close()?; - let val = Duration::from_micros(val as u64); - let subbed = match from.checked_sub(val) { - Some(v) => v, - None => return Err("Duration::checked_sub"), - }; - Ok(subbed) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[test] - fn test_basic() { - let counter = MicroSecondsCounterU32::default(); - counter.open().unwrap(); - counter.add(Duration::from_micros(23)).unwrap(); - let res = counter - .close_and_checked_sub_from(Duration::from_micros(42)) - .unwrap(); - assert_eq!(res, Duration::from_micros(42 - 23)); - } -} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 86be97587f..d04fae7627 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -7,6 +7,10 @@ use metrics::{ IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, }; use once_cell::sync::Lazy; +use pageserver_api::config::{ + PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, + PageServiceProtocolPipelinedExecutionStrategy, +}; use pageserver_api::shard::TenantShardId; use postgres_backend::{is_expected_io_error, QueryError}; use pq_proto::framed::ConnectionError; @@ -1216,50 +1220,21 @@ pub(crate) mod virtual_file_io_engine { }); } -struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { - global_latency_histo: &'a Histogram, +pub(crate) struct SmgrOpTimer { + global_latency_histo: Histogram, // Optional because not all op types are tracked per-timeline - per_timeline_latency_histo: Option<&'a Histogram>, + per_timeline_latency_histo: Option, - ctx: &'c RequestContext, - start: std::time::Instant, - op: SmgrQueryType, - count: usize, + start: Instant, } -impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> { +impl Drop for SmgrOpTimer { fn drop(&mut self) { - let elapsed = self.start.elapsed(); - let ex_throttled = self - .ctx - .micros_spent_throttled - .close_and_checked_sub_from(elapsed); - let ex_throttled = match ex_throttled { - Ok(res) => res, - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[self.op]; - rate_limit.call(|| { - warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit"); - }); - elapsed - } - }; - - for _ in 0..self.count { - self.global_latency_histo - .observe(ex_throttled.as_secs_f64()); - if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo { - per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64()); - } + let elapsed = self.start.elapsed().as_secs_f64(); + self.global_latency_histo.observe(elapsed); + if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo { + per_timeline_getpage_histo.observe(elapsed); } } } @@ -1289,6 +1264,8 @@ pub(crate) struct SmgrQueryTimePerTimeline { global_latency: [Histogram; SmgrQueryType::COUNT], per_timeline_getpage_started: IntCounter, per_timeline_getpage_latency: Histogram, + global_batch_size: Histogram, + per_timeline_batch_size: Histogram, } static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { @@ -1381,6 +1358,76 @@ static SMGR_QUERY_TIME_GLOBAL: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +static PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL: Lazy> = Lazy::new(|| { + (1..=u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap()) + .map(|v| v.into()) + .collect() +}); + +static PAGE_SERVICE_BATCH_SIZE_GLOBAL: Lazy = Lazy::new(|| { + register_histogram!( + "pageserver_page_service_batch_size_global", + "Batch size of pageserver page service requests", + PAGE_SERVICE_BATCH_SIZE_BUCKETS_GLOBAL.clone(), + ) + .expect("failed to define a metric") +}); + +static PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE: Lazy> = Lazy::new(|| { + let mut buckets = Vec::new(); + for i in 0.. { + let bucket = 1 << i; + if bucket > u32::try_from(Timeline::MAX_GET_VECTORED_KEYS).unwrap() { + break; + } + buckets.push(bucket.into()); + } + buckets +}); + +static PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE: Lazy = Lazy::new(|| { + register_histogram_vec!( + "pageserver_page_service_batch_size", + "Batch size of pageserver page service requests", + &["tenant_id", "shard_id", "timeline_id"], + PAGE_SERVICE_BATCH_SIZE_BUCKETS_PER_TIMELINE.clone() + ) + .expect("failed to define a metric") +}); + +pub(crate) static PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "pageserver_page_service_config_max_batch_size", + "Configured maximum batch size for the server-side batching functionality of page_service. \ + Labels expose more of the configuration parameters.", + &["mode", "execution"] + ) + .expect("failed to define a metric") +}); + +fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) { + PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE.reset(); + let (label_values, value) = match conf { + PageServicePipeliningConfig::Serial => (["serial", "-"], 1), + PageServicePipeliningConfig::Pipelined(PageServicePipeliningConfigPipelined { + max_batch_size, + execution, + }) => { + let mode = "pipelined"; + let execution = match execution { + PageServiceProtocolPipelinedExecutionStrategy::ConcurrentFutures => { + "concurrent-futures" + } + PageServiceProtocolPipelinedExecutionStrategy::Tasks => "tasks", + }; + ([mode, execution], max_batch_size.get()) + } + }; + PAGE_SERVICE_CONFIG_MAX_BATCH_SIZE + .with_label_values(&label_values) + .set(value.try_into().unwrap()); +} + impl SmgrQueryTimePerTimeline { pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_shard_id.tenant_id.to_string(); @@ -1416,78 +1463,51 @@ impl SmgrQueryTimePerTimeline { ]) .unwrap(); + let global_batch_size = PAGE_SERVICE_BATCH_SIZE_GLOBAL.clone(); + let per_timeline_batch_size = PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE + .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) + .unwrap(); + Self { global_started, global_latency, per_timeline_getpage_latency, per_timeline_getpage_started, + global_batch_size, + per_timeline_batch_size, } } - pub(crate) fn start_timer<'c: 'a, 'a>( - &'a self, - op: SmgrQueryType, - ctx: &'c RequestContext, - ) -> Option { - self.start_timer_many(op, 1, ctx) - } - pub(crate) fn start_timer_many<'c: 'a, 'a>( - &'a self, - op: SmgrQueryType, - count: usize, - ctx: &'c RequestContext, - ) -> Option { - let start = Instant::now(); - + pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer { self.global_started[op as usize].inc(); - // We subtract time spent throttled from the observed latency. - match ctx.micros_spent_throttled.open() { - Ok(()) => (), - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[op]; - rate_limit.call(|| { - warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit"); - }); - } - } - let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { self.per_timeline_getpage_started.inc(); - Some(&self.per_timeline_getpage_latency) + Some(self.per_timeline_getpage_latency.clone()) } else { None }; - Some(GlobalAndPerTimelineHistogramTimer { - global_latency_histo: &self.global_latency[op as usize], + SmgrOpTimer { + global_latency_histo: self.global_latency[op as usize].clone(), per_timeline_latency_histo, - ctx, - start, - op, - count, - }) + start: started_at, + } + } + + pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) { + self.global_batch_size.observe(batch_size as f64); + self.per_timeline_batch_size.observe(batch_size as f64); } } #[cfg(test)] mod smgr_query_time_tests { + use std::time::Instant; + use pageserver_api::shard::TenantShardId; use strum::IntoEnumIterator; use utils::id::{TenantId, TimelineId}; - use crate::{ - context::{DownloadBehavior, RequestContext}, - task_mgr::TaskKind, - }; - // Regression test, we used hard-coded string constants before using an enum. #[test] fn op_label_name() { @@ -1531,8 +1551,7 @@ mod smgr_query_time_tests { let (pre_global, pre_per_tenant_timeline) = get_counts(); assert_eq!(pre_per_tenant_timeline, 0); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); - let timer = metrics.start_timer(*op, &ctx); + let timer = metrics.start_smgr_op(*op, Instant::now()); drop(timer); let (post_global, post_per_tenant_timeline) = get_counts(); @@ -1579,58 +1598,24 @@ pub(crate) static BASEBACKUP_QUERY_TIME: Lazy = Lazy::new(| } }); -pub(crate) struct BasebackupQueryTimeOngoingRecording<'a, 'c> { +pub(crate) struct BasebackupQueryTimeOngoingRecording<'a> { parent: &'a BasebackupQueryTime, - ctx: &'c RequestContext, start: std::time::Instant, } impl BasebackupQueryTime { - pub(crate) fn start_recording<'c: 'a, 'a>( - &'a self, - ctx: &'c RequestContext, - ) -> BasebackupQueryTimeOngoingRecording<'a, 'a> { + pub(crate) fn start_recording(&self) -> BasebackupQueryTimeOngoingRecording<'_> { let start = Instant::now(); - match ctx.micros_spent_throttled.open() { - Ok(()) => (), - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); - let mut rate_limit = LOGGED.lock().unwrap(); - rate_limit.call(|| { - warn!(error, "error opening micros_spent_throttled; this message is logged at a global rate limit"); - }); - } - } BasebackupQueryTimeOngoingRecording { parent: self, - ctx, start, } } } -impl BasebackupQueryTimeOngoingRecording<'_, '_> { +impl BasebackupQueryTimeOngoingRecording<'_> { pub(crate) fn observe(self, res: &Result) { - let elapsed = self.start.elapsed(); - let ex_throttled = self - .ctx - .micros_spent_throttled - .close_and_checked_sub_from(elapsed); - let ex_throttled = match ex_throttled { - Ok(ex_throttled) => ex_throttled, - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); - let mut rate_limit = LOGGED.lock().unwrap(); - rate_limit.call(|| { - warn!(error, "error deducting time spent throttled; this message is logged at a global rate limit"); - }); - elapsed - } - }; + let elapsed = self.start.elapsed().as_secs_f64(); // If you want to change categorize of a specific error, also change it in `log_query_error`. let metric = match res { Ok(_) => &self.parent.ok, @@ -1641,7 +1626,7 @@ impl BasebackupQueryTimeOngoingRecording<'_, '_> { } Err(_) => &self.parent.error, }; - metric.observe(ex_throttled.as_secs_f64()); + metric.observe(elapsed); } } @@ -2722,6 +2707,11 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + let _ = PAGE_SERVICE_BATCH_SIZE_PER_TENANT_TIMELINE.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + ]); } } @@ -2747,10 +2737,12 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext}; use crate::task_mgr::TaskKind; use crate::tenant::mgr::TenantSlot; use crate::tenant::tasks::BackgroundLoopKind; +use crate::tenant::Timeline; /// Maintain a per timeline gauge in addition to the global gauge. pub(crate) struct PerTimelineRemotePhysicalSizeGauge { @@ -3562,7 +3554,9 @@ pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) { .set(u64::try_from(num_threads.get()).unwrap()); } -pub fn preinitialize_metrics() { +pub fn preinitialize_metrics(conf: &'static PageServerConf) { + set_page_service_config_max_batch_size(&conf.page_service_pipelining); + // Python tests need these and on some we do alerting. // // FIXME(4813): make it so that we have no top level metrics as this fn will easily fall out of @@ -3630,6 +3624,7 @@ pub fn preinitialize_metrics() { &WAL_REDO_RECORDS_HISTOGRAM, &WAL_REDO_BYTES_HISTOGRAM, &WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, + &PAGE_SERVICE_BATCH_SIZE_GLOBAL, ] .into_iter() .for_each(|h| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1917e7f5b7..64842aa5b8 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -51,7 +51,7 @@ use crate::auth::check_permission; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::metrics::{self}; +use crate::metrics::{self, SmgrOpTimer}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -540,11 +540,13 @@ impl From for QueryError { enum BatchedFeMessage { Exists { span: Span, + timer: SmgrOpTimer, shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, + timer: SmgrOpTimer, shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, @@ -552,15 +554,17 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, }, DbSize { span: Span, + timer: SmgrOpTimer, shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, + timer: SmgrOpTimer, shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, @@ -632,6 +636,8 @@ impl PageServerHandler { msg = pgb.read_message() => { msg } }; + let received_at = Instant::now(); + let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => { @@ -660,7 +666,15 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::Exists { span, shard, req } + let timer = shard + .query_metrics + .start_smgr_op(metrics::SmgrQueryType::GetRelExists, received_at); + BatchedFeMessage::Exists { + span, + timer, + shard, + req, + } } PagestreamFeMessage::Nblocks(req) => { let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); @@ -668,7 +682,15 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::Nblocks { span, shard, req } + let timer = shard + .query_metrics + .start_smgr_op(metrics::SmgrQueryType::GetRelSize, received_at); + BatchedFeMessage::Nblocks { + span, + timer, + shard, + req, + } } PagestreamFeMessage::DbSize(req) => { let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); @@ -676,7 +698,15 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::DbSize { span, shard, req } + let timer = shard + .query_metrics + .start_smgr_op(metrics::SmgrQueryType::GetDbSize, received_at); + BatchedFeMessage::DbSize { + span, + timer, + shard, + req, + } } PagestreamFeMessage::GetSlruSegment(req) => { let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); @@ -684,7 +714,15 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::GetSlruSegment { span, shard, req } + let timer = shard + .query_metrics + .start_smgr_op(metrics::SmgrQueryType::GetSlruSegment, received_at); + BatchedFeMessage::GetSlruSegment { + span, + timer, + shard, + req, + } } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { request_lsn, @@ -728,6 +766,14 @@ impl PageServerHandler { return respond_error!(e.into()); } }; + + // It's important to start the timer before waiting for the LSN + // so that the _started counters are incremented before we do + // any serious waiting, e.g., for LSNs. + let timer = shard + .query_metrics + .start_smgr_op(metrics::SmgrQueryType::GetPageAtLsn, received_at); + let effective_request_lsn = match Self::wait_or_get_last_lsn( &shard, request_lsn, @@ -747,7 +793,7 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno)], + pages: smallvec::smallvec![(rel, blkno, timer)], } } }; @@ -832,88 +878,112 @@ impl PageServerHandler { IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { // invoke handler function - let (handler_results, span): (Vec>, _) = - match batch { - BatchedFeMessage::Exists { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - ( - vec![ - self.handle_get_rel_exists_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::Nblocks { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - ( - vec![ - self.handle_get_nblocks_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetPage { + let (handler_results, span): ( + Vec>, + _, + ) = match batch { + BatchedFeMessage::Exists { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + ( + vec![self + .handle_get_rel_exists_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], span, - shard, - effective_request_lsn, - pages, - } => { - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - ( - { - let npages = pages.len(); - trace!(npages, "handling getpage request"); - let res = self - .handle_get_page_at_lsn_request_batched( - &shard, - effective_request_lsn, - pages, - ctx, - ) - .instrument(span.clone()) - .await; - assert_eq!(res.len(), npages); - res - }, - span, - ) - } - BatchedFeMessage::DbSize { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - ( - vec![ - self.handle_db_size_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetSlruSegment { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - ( - vec![ - self.handle_get_slru_segment_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::RespondError { span, error } => { - // We've already decided to respond with an error, so we don't need to - // call the handler. - (vec![Err(error)], span) - } - }; + ) + } + BatchedFeMessage::Nblocks { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + ( + vec![self + .handle_get_nblocks_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], + span, + ) + } + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages, + } => { + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + ( + { + let npages = pages.len(); + trace!(npages, "handling getpage request"); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) + } + BatchedFeMessage::DbSize { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + ( + vec![self + .handle_db_size_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], + span, + ) + } + BatchedFeMessage::GetSlruSegment { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + ( + vec![self + .handle_get_slru_segment_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], + span, + ) + } + BatchedFeMessage::RespondError { span, error } => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (vec![Err(error)], span) + } + }; // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. + let mut timers: smallvec::SmallVec<[_; 1]> = + smallvec::SmallVec::with_capacity(handler_results.len()); for handler_result in handler_results { let response_msg = match handler_result { Err(e) => match &e { @@ -944,7 +1014,12 @@ impl PageServerHandler { }) } }, - Ok(response_msg) => response_msg, + Ok((response_msg, timer)) => { + // Extending the lifetime of the timers so observations on drop + // include the flush time. + timers.push(timer); + response_msg + } }; // marshal & transmit response message @@ -961,6 +1036,7 @@ impl PageServerHandler { res?; } } + drop(timers); Ok(()) } @@ -1423,10 +1499,6 @@ impl PageServerHandler { req: &PagestreamExistsRequest, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1453,10 +1525,6 @@ impl PageServerHandler { req: &PagestreamNblocksRequest, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1483,10 +1551,6 @@ impl PageServerHandler { req: &PagestreamDbSizeRequest, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1512,26 +1576,41 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); - let _timer = timeline.query_metrics.start_timer_many( - metrics::SmgrQueryType::GetPageAtLsn, - pages.len(), - ctx, - ); - let pages = timeline - .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + timeline + .query_metrics + .observe_getpage_batch_start(requests.len()); + + let results = timeline + .get_rel_page_at_lsn_batched( + requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), + effective_lsn, + ctx, + ) .await; + assert_eq!(results.len(), requests.len()); - Vec::from_iter(pages.into_iter().map(|page| { - page.map(|page| { - PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page }) - }) - .map_err(PageStreamError::from) - })) + // TODO: avoid creating the new Vec here + Vec::from_iter( + requests + .into_iter() + .zip(results.into_iter()) + .map(|((_, _, timer), res)| { + res.map(|page| { + ( + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { + page, + }), + timer, + ) + }) + .map_err(PageStreamError::from) + }), + ) } #[instrument(skip_all, fields(shard_id))] @@ -1541,10 +1620,6 @@ impl PageServerHandler { req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -2045,7 +2120,7 @@ where COMPUTE_COMMANDS_COUNTERS .for_command(ComputeCommandKind::Basebackup) .inc(); - let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(&ctx); + let metric_recording = metrics::BASEBACKUP_QUERY_TIME.start_recording(); let res = async { self.handle_basebackup_request( pgb, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index d48a1ba117..a00ec761e2 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -203,9 +203,13 @@ impl Timeline { ) -> Result { match version { Version::Lsn(effective_lsn) => { - let pages = smallvec::smallvec![(tag, blknum)]; + let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self - .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + .get_rel_page_at_lsn_batched( + pages.iter().map(|(tag, blknum)| (tag, blknum)), + effective_lsn, + ctx, + ) .await; assert_eq!(res.len(), 1); res.into_iter().next().unwrap() @@ -240,7 +244,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: impl ExactSizeIterator, effective_lsn: Lsn, ctx: &RequestContext, ) -> Vec> { @@ -254,7 +258,7 @@ impl Timeline { let result_slots = result.spare_capacity_mut(); let mut keys_slots: BTreeMap> = BTreeMap::default(); - for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() { + for (response_slot_idx, (tag, blknum)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -265,7 +269,7 @@ impl Timeline { } let nblocks = match self - .get_rel_size(tag, Version::Lsn(effective_lsn), ctx) + .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx) .await { Ok(nblocks) => nblocks, @@ -276,7 +280,7 @@ impl Timeline { } }; - if blknum >= nblocks { + if *blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", tag, blknum, effective_lsn, nblocks @@ -286,7 +290,7 @@ impl Timeline { continue; } - let key = rel_block_to_key(tag, blknum); + let key = rel_block_to_key(*tag, *blknum); let key_slots = keys_slots.entry(key).or_default(); key_slots.push(response_slot_idx); diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 6a80953901..7c4de55a47 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -2,14 +2,14 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, + Arc, }, time::{Duration, Instant}, }; use arc_swap::ArcSwap; use enumset::EnumSet; -use tracing::{error, warn}; +use tracing::error; use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -162,19 +162,6 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); - match ctx.micros_spent_throttled.add(wait_time) { - Ok(res) => res, - Err(error) => { - use once_cell::sync::Lazy; - use utils::rate_limit::RateLimit; - static WARN_RATE_LIMIT: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10)))); - let mut guard = WARN_RATE_LIMIT.lock().unwrap(); - guard.call(move || { - warn!(error, "error adding time spent throttled; this message is logged at a global rate limit"); - }); - } - } Some(wait_time) } else { None diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 730477a7f4..dc3f823f20 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1059,7 +1059,8 @@ impl Timeline { .map(|metric| (metric, Instant::now())); // start counting after throttle so that throttle time - // is always less than observation time + // is always less than observation time and we don't + // underflow when computing `ex_throttled` below. let throttled = self .timeline_get_throttle .throttle(ctx, key_count as usize) @@ -1138,7 +1139,9 @@ impl Timeline { .map(ScanLatencyOngoingRecording::start_recording); // start counting after throttle so that throttle time - // is always less than observation time + // is always less than observation time and we don't + // underflow when computing the `ex_throttled` value in + // `recording.observe(throttled)` below. let throttled = self .timeline_get_throttle // assume scan = 1 quota for now until we find a better way to process this diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 3f90c233a6..ffdbd988a5 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -173,6 +173,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( counter("pageserver_tenant_throttling_count_accounted_finish"), counter("pageserver_tenant_throttling_wait_usecs_sum"), counter("pageserver_tenant_throttling_count"), + *histogram("pageserver_page_service_batch_size"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, # "pageserver_directory_entries_count", -- only used if above a certain threshold # "pageserver_broken_tenants_count" -- used only for broken diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index c47a849fec..562094a059 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -167,18 +167,18 @@ def test_throughput( @dataclass class Metrics: time: float - pageserver_getpage_count: float - pageserver_vectored_get_count: float + pageserver_batch_size_histo_sum: float + pageserver_batch_size_histo_count: float compute_getpage_count: float pageserver_cpu_seconds_total: float def __sub__(self, other: "Metrics") -> "Metrics": return Metrics( time=self.time - other.time, - pageserver_getpage_count=self.pageserver_getpage_count - - other.pageserver_getpage_count, - pageserver_vectored_get_count=self.pageserver_vectored_get_count - - other.pageserver_vectored_get_count, + pageserver_batch_size_histo_sum=self.pageserver_batch_size_histo_sum + - other.pageserver_batch_size_histo_sum, + pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count + - other.pageserver_batch_size_histo_count, compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count, pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total - other.pageserver_cpu_seconds_total, @@ -187,8 +187,8 @@ def test_throughput( def normalize(self, by) -> "Metrics": return Metrics( time=self.time / by, - pageserver_getpage_count=self.pageserver_getpage_count / by, - pageserver_vectored_get_count=self.pageserver_vectored_get_count / by, + pageserver_batch_size_histo_sum=self.pageserver_batch_size_histo_sum / by, + pageserver_batch_size_histo_count=self.pageserver_batch_size_histo_count / by, compute_getpage_count=self.compute_getpage_count / by, pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by, ) @@ -202,11 +202,11 @@ def test_throughput( pageserver_metrics = ps_http.get_metrics() return Metrics( time=time.time(), - pageserver_getpage_count=pageserver_metrics.query_one( - "pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"} + pageserver_batch_size_histo_sum=pageserver_metrics.query_one( + "pageserver_page_service_batch_size_sum" ).value, - pageserver_vectored_get_count=pageserver_metrics.query_one( - "pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"} + pageserver_batch_size_histo_count=pageserver_metrics.query_one( + "pageserver_page_service_batch_size_count" ).value, compute_getpage_count=compute_getpage_count, pageserver_cpu_seconds_total=pageserver_metrics.query_one( @@ -243,7 +243,7 @@ def test_throughput( # Sanity-checks on the collected data # # assert that getpage counts roughly match between compute and ps - assert metrics.pageserver_getpage_count == pytest.approx( + assert metrics.pageserver_batch_size_histo_sum == pytest.approx( metrics.compute_getpage_count, rel=0.01 ) @@ -256,7 +256,7 @@ def test_throughput( zenbenchmark.record( "perfmetric.batching_factor", - metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count, + metrics.pageserver_batch_size_histo_sum / metrics.pageserver_batch_size_histo_count, unit="", report=MetricReport.HIGHER_IS_BETTER, ) diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index ba6a1d9045..62aec50a9e 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -4,6 +4,7 @@ import copy import json import uuid +import pytest from anyio import Path from fixtures.common_types import TenantId, TimelineId from fixtures.log_helper import log @@ -70,14 +71,21 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P log.info("warmup / make sure metrics are present") run_pagebench_at_max_speed_and_get_total_requests_completed(2) - metrics_query = { + smgr_metrics_query = { "tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "smgr_query_type": "get_page_at_lsn", } - metric_name = "pageserver_smgr_query_seconds_sum" - smgr_query_seconds_pre = ps_http.get_metric_value(metric_name, metrics_query) + smgr_metric_name = "pageserver_smgr_query_seconds_sum" + throttle_metrics_query = { + "tenant_id": str(tenant_id), + } + throttle_metric_name = "pageserver_tenant_throttling_wait_usecs_sum_total" + + smgr_query_seconds_pre = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query) assert smgr_query_seconds_pre is not None + throttled_usecs_pre = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query) + assert throttled_usecs_pre is not None marker = uuid.uuid4().hex ps_http.post_tracing_event("info", marker) @@ -108,14 +116,23 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P timeout=compaction_period, ) - log.info("validate that the metric doesn't include throttle wait time") - smgr_query_seconds_post = ps_http.get_metric_value(metric_name, metrics_query) + log.info("the smgr metric includes throttle time") + smgr_query_seconds_post = ps_http.get_metric_value(smgr_metric_name, smgr_metrics_query) assert smgr_query_seconds_post is not None + throttled_usecs_post = ps_http.get_metric_value(throttle_metric_name, throttle_metrics_query) + assert throttled_usecs_post is not None actual_smgr_query_seconds = smgr_query_seconds_post - smgr_query_seconds_pre + actual_throttled_usecs = throttled_usecs_post - throttled_usecs_pre + actual_throttled_secs = actual_throttled_usecs / 1_000_000 assert ( - duration_secs >= 10 * actual_smgr_query_seconds - ), "smgr metrics should not include throttle wait time" + pytest.approx(duration_secs, 0.1) == actual_smgr_query_seconds + ), "smgr metrics include throttle wait time" + smgr_ex_throttle = actual_smgr_query_seconds - actual_throttled_secs + assert smgr_ex_throttle > 0 + assert ( + duration_secs > 10 * smgr_ex_throttle + ), "most of the time in this test is spent throttled because the rate-limit's contribution to latency dominates" throttle_config_with_field_fair_set = {