diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 277d89dfe5..5335650e44 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1216,52 +1216,21 @@ pub(crate) mod virtual_file_io_engine { }); } -struct GlobalAndPerTimelineHistogramTimer<'a, 'c, I> -where - I: IntoIterator + ExactSizeIterator, -{ - global_latency_histo: &'a Histogram, +pub(crate) struct GlobalAndPerTimelineHistogramTimer { + global_latency_histo: Histogram, // Optional because not all op types are tracked per-timeline - per_timeline_latency_histo: Option<&'a Histogram>, + per_timeline_latency_histo: Option, - ctx: &'c RequestContext, - starts: I, - op: SmgrQueryType, + start: Instant, } -impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_, _> { +impl Drop for GlobalAndPerTimelineHistogramTimer { fn drop(&mut self) { - let elapsed = self.start.elapsed(); - let ex_throttled = self - .ctx - .micros_spent_throttled - .close_and_checked_sub_from(elapsed); - let ex_throttled = match ex_throttled { - Ok(res) => res, - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[self.op]; - rate_limit.call(|| { - warn!(op=?self.op, error, "error deducting time spent throttled; this message is logged at a global rate limit"); - }); - elapsed - } - }; - - for _ in 0..self.count { - self.global_latency_histo - .observe(ex_throttled.as_secs_f64()); - if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo { - per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64()); - } + let elapsed = self.start.elapsed().as_secs_f64(); + self.global_latency_histo.observe(elapsed); + if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo { + per_timeline_getpage_histo.observe(elapsed); } } } @@ -1425,60 +1394,36 @@ impl SmgrQueryTimePerTimeline { per_timeline_getpage_started, } } - pub(crate) fn start_timer<'c: 'a, 'a>( - &'a self, - op: SmgrQueryType, - ctx: &'c RequestContext, - ) -> Option { - self.start_timer_many(op, 1, ctx) - } - - pub(crate) fn start_timer_at<'c: 'a, 'a>( - &'a self, + pub(crate) fn start_timer_at( + &self, op: SmgrQueryType, start: Instant, - ctx: &'c RequestContext, - ) -> Option { - self.start_timer_at_many(op, std::iter::once(start), ctx) - } + ) -> GlobalAndPerTimelineHistogramTimer { + self.global_started[op as usize].inc(); - pub(crate) fn start_timer_at_many<'c: 'a, 'a, T>( - &'a self, - op: SmgrQueryType, - starts: T, - ctx: &'c RequestContext, - ) -> Option - where - T: IntoIterator + ExactSizeIterator, - { let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { - self.per_timeline_getpage_started.inc_by(starts.len()); - Some(&self.per_timeline_getpage_latency) + self.per_timeline_getpage_started.inc(); + Some(self.per_timeline_getpage_latency.clone()) } else { None }; - Some(GlobalAndPerTimelineHistogramTimer { - global_latency_histo: &self.global_latency[op as usize], + GlobalAndPerTimelineHistogramTimer { + global_latency_histo: self.global_latency[op as usize].clone(), per_timeline_latency_histo, - ctx, - op, - starts, - }) + start, + } } } #[cfg(test)] mod smgr_query_time_tests { + use std::time::Instant; + use pageserver_api::shard::TenantShardId; use strum::IntoEnumIterator; use utils::id::{TenantId, TimelineId}; - use crate::{ - context::{DownloadBehavior, RequestContext}, - task_mgr::TaskKind, - }; - // Regression test, we used hard-coded string constants before using an enum. #[test] fn op_label_name() { @@ -1522,8 +1467,7 @@ mod smgr_query_time_tests { let (pre_global, pre_per_tenant_timeline) = get_counts(); assert_eq!(pre_per_tenant_timeline, 0); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); - let timer = metrics.start_timer(*op, &ctx); + let timer = metrics.start_timer_at(*op, Instant::now()); drop(timer); let (post_global, post_per_tenant_timeline) = get_counts(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bd5dd88f04..2147dcff10 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -51,7 +51,7 @@ use crate::auth::check_permission; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; -use crate::metrics::{self}; +use crate::metrics::{self, GlobalAndPerTimelineHistogramTimer}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -540,13 +540,13 @@ impl From for QueryError { enum BatchedFeMessage { Exists { span: Span, - received_at: Instant, + timer: GlobalAndPerTimelineHistogramTimer, shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, - received_at: Instant, + timer: GlobalAndPerTimelineHistogramTimer, shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, @@ -554,17 +554,17 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1]>, }, DbSize { span: Span, - received_at: Instant, + timer: GlobalAndPerTimelineHistogramTimer, shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, - received_at: Instant, + timer: GlobalAndPerTimelineHistogramTimer, shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, @@ -666,9 +666,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; + let timer = shard + .query_metrics + .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at); BatchedFeMessage::Exists { span, - received_at, + timer, shard, req, } @@ -679,9 +682,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; + let timer = shard + .query_metrics + .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at); BatchedFeMessage::Nblocks { span, - received_at, + timer, shard, req, } @@ -692,9 +698,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; + let timer = shard + .query_metrics + .start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at); BatchedFeMessage::DbSize { span, - received_at, + timer, shard, req, } @@ -705,9 +714,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; + let timer = shard + .query_metrics + .start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at); BatchedFeMessage::GetSlruSegment { span, - received_at, + timer, shard, req, } @@ -754,6 +766,14 @@ impl PageServerHandler { return respond_error!(e.into()); } }; + + // It's important to start the timer before waiting for the LSN + // so that the _started counters are incremented before we do + // any serious waiting, e.g., for LSNs. + let timer = shard + .query_metrics + .start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at); + let effective_request_lsn = match Self::wait_or_get_last_lsn( &shard, request_lsn, @@ -773,7 +793,7 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno, received_at)], + pages: smallvec::smallvec![(rel, blkno, timer)], } } }; @@ -858,98 +878,112 @@ impl PageServerHandler { IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { // invoke handler function - let (handler_results, span): (Vec>, _) = - match batch { - BatchedFeMessage::Exists { + let (handler_results, span): ( + Vec>, + _, + ) = match batch { + BatchedFeMessage::Exists { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + ( + vec![self + .handle_get_rel_exists_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], span, - received_at: _, - shard, - req, - } => { - fail::fail_point!("ps::handle-pagerequest-message::exists"); - ( - vec![ - self.handle_get_rel_exists_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::Nblocks { + ) + } + BatchedFeMessage::Nblocks { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + ( + vec![self + .handle_get_nblocks_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], span, - received_at, - shard, - req, - } => { - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - ( - vec![ - self.handle_get_nblocks_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetPage { + ) + } + BatchedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages, + } => { + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + ( + { + let npages = pages.len(); + trace!(npages, "handling getpage request"); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, span, - shard, - effective_request_lsn, - pages, - } => { - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - ( - { - let npages = pages.len(); - trace!(npages, "handling getpage request"); - let res = self - .handle_get_page_at_lsn_request_batched( - &shard, - effective_request_lsn, - pages, - ctx, - ) - .instrument(span.clone()) - .await; - assert_eq!(res.len(), npages); - res - }, - span, - ) - } - BatchedFeMessage::DbSize { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - ( - vec![ - self.handle_db_size_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::GetSlruSegment { span, shard, req } => { - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - ( - vec![ - self.handle_get_slru_segment_request(&shard, &req, ctx) - .instrument(span.clone()) - .await, - ], - span, - ) - } - BatchedFeMessage::RespondError { span, error } => { - // We've already decided to respond with an error, so we don't need to - // call the handler. - (vec![Err(error)], span) - } - }; + ) + } + BatchedFeMessage::DbSize { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + ( + vec![self + .handle_db_size_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], + span, + ) + } + BatchedFeMessage::GetSlruSegment { + span, + timer, + shard, + req, + } => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + ( + vec![self + .handle_get_slru_segment_request(&shard, &req, ctx) + .instrument(span.clone()) + .await + .map(|msg| (msg, timer))], + span, + ) + } + BatchedFeMessage::RespondError { span, error } => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (vec![Err(error)], span) + } + }; // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. + let mut timers: smallvec::SmallVec<[_; 1]> = + smallvec::SmallVec::with_capacity(handler_results.len()); for handler_result in handler_results { let response_msg = match handler_result { Err(e) => match &e { @@ -980,7 +1014,10 @@ impl PageServerHandler { }) } }, - Ok(response_msg) => response_msg, + Ok((response_msg, timer)) => { + timers.push(timer); + response_msg + } }; // marshal & transmit response message @@ -997,6 +1034,7 @@ impl PageServerHandler { res?; } } + drop(timers); Ok(()) } @@ -1461,13 +1499,8 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamExistsRequest, - received_at: Instant, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1492,13 +1525,8 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamNblocksRequest, - received_at: Instant, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1523,15 +1551,8 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamDbSizeRequest, - received_at: Instant, ctx: &RequestContext, ) -> Result { - let _timer = timeline.query_metrics.start_timer_at( - metrics::SmgrQueryType::GetDbSize, - received_at, - ctx, - ); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, @@ -1557,26 +1578,40 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>, + requests: smallvec::SmallVec< + [(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1], + >, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> + { debug_assert_current_span_has_tenant_and_timeline_id(); - let _timer = timeline.query_metrics.start_timer_many( - metrics::SmgrQueryType::GetPageAtLsn, - pages.len(), - ctx, - ); - let pages = timeline - .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + let results = timeline + .get_rel_page_at_lsn_batched( + requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), + effective_lsn, + ctx, + ) .await; + assert_eq!(results.len(), requests.len()); - Vec::from_iter(pages.into_iter().map(|page| { - page.map(|page| { - PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page }) - }) - .map_err(PageStreamError::from) - })) + // TODO: avoid creating the new Vec here + Vec::from_iter( + requests + .into_iter() + .zip(results.into_iter()) + .map(|((_, _, timer), res)| { + res.map(|page| { + ( + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { + page, + }), + timer, + ) + }) + .map_err(PageStreamError::from) + }), + ) } #[instrument(skip_all, fields(shard_id))] @@ -1586,10 +1621,6 @@ impl PageServerHandler { req: &PagestreamGetSlruSegmentRequest, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetSlruSegment, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( timeline, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 3174d93bb8..1fd3a209b0 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -203,9 +203,13 @@ impl Timeline { ) -> Result { match version { Version::Lsn(effective_lsn) => { - let pages = smallvec::smallvec![(tag, blknum)]; + let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)]; let res = self - .get_rel_page_at_lsn_batched(&pages, effective_lsn, ctx) + .get_rel_page_at_lsn_batched( + pages.iter().map(|(tag, blknum)| (tag, blknum)), + effective_lsn, + ctx, + ) .await; assert_eq!(res.len(), 1); res.into_iter().next().unwrap() @@ -240,7 +244,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: &smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: impl Iterator + ExactSizeIterator, effective_lsn: Lsn, ctx: &RequestContext, ) -> Vec> { @@ -254,7 +258,7 @@ impl Timeline { let result_slots = result.spare_capacity_mut(); let mut keys_slots: BTreeMap> = BTreeMap::default(); - for (response_slot_idx, (tag, blknum)) in pages.iter().enumerate() { + for (response_slot_idx, (tag, blknum)) in pages.enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(),