diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 5335650e44..feed251ee0 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1216,16 +1216,18 @@ pub(crate) mod virtual_file_io_engine { }); } -pub(crate) struct GlobalAndPerTimelineHistogramTimer { +pub(crate) struct GlobalAndPerTimelineHistogramTimer<'c> { global_latency_histo: Histogram, // Optional because not all op types are tracked per-timeline per_timeline_latency_histo: Option, start: Instant, + + ctx: &'c RequestContext, } -impl Drop for GlobalAndPerTimelineHistogramTimer { +impl<'c> Drop for GlobalAndPerTimelineHistogramTimer<'c> { fn drop(&mut self) { let elapsed = self.start.elapsed().as_secs_f64(); self.global_latency_histo.observe(elapsed); @@ -1394,11 +1396,12 @@ impl SmgrQueryTimePerTimeline { per_timeline_getpage_started, } } - pub(crate) fn start_timer_at( + pub(crate) fn start_timer_at<'c>( &self, op: SmgrQueryType, start: Instant, - ) -> GlobalAndPerTimelineHistogramTimer { + ctx: &'c RequestContext, + ) -> GlobalAndPerTimelineHistogramTimer<'c> { self.global_started[op as usize].inc(); let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { @@ -1412,6 +1415,7 @@ impl SmgrQueryTimePerTimeline { global_latency_histo: self.global_latency[op as usize].clone(), per_timeline_latency_histo, start, + ctx, } } } @@ -1424,6 +1428,8 @@ mod smgr_query_time_tests { use strum::IntoEnumIterator; use utils::id::{TenantId, TimelineId}; + use crate::{context::{DownloadBehavior, RequestContext}, task_mgr::TaskKind}; + // Regression test, we used hard-coded string constants before using an enum. #[test] fn op_label_name() { @@ -1467,7 +1473,8 @@ mod smgr_query_time_tests { let (pre_global, pre_per_tenant_timeline) = get_counts(); assert_eq!(pre_per_tenant_timeline, 0); - let timer = metrics.start_timer_at(*op, Instant::now()); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); + let timer = metrics.start_timer_at(*op, Instant::now(), &ctx); drop(timer); let (post_global, post_per_tenant_timeline) = get_counts(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2147dcff10..ef2ca3a9fe 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -537,16 +537,16 @@ impl From for QueryError { } } -enum BatchedFeMessage { +enum BatchedFeMessage<'c> { Exists { span: Span, - timer: GlobalAndPerTimelineHistogramTimer, + timer: GlobalAndPerTimelineHistogramTimer<'c>, shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, - timer: GlobalAndPerTimelineHistogramTimer, + timer: GlobalAndPerTimelineHistogramTimer<'c>, shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, @@ -554,17 +554,17 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1]>, }, DbSize { span: Span, - timer: GlobalAndPerTimelineHistogramTimer, + timer: GlobalAndPerTimelineHistogramTimer<'c>, shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, - timer: GlobalAndPerTimelineHistogramTimer, + timer: GlobalAndPerTimelineHistogramTimer<'c>, shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, @@ -616,7 +616,7 @@ impl PageServerHandler { ) } - async fn pagestream_read_message( + async fn pagestream_read_message<'c, IO>( pgb: &mut PostgresBackendReader, tenant_id: TenantId, timeline_id: TimelineId, @@ -624,7 +624,7 @@ impl PageServerHandler { cancel: &CancellationToken, ctx: &RequestContext, parent_span: Span, - ) -> Result, QueryError> + ) -> Result>, QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, { @@ -668,7 +668,7 @@ impl PageServerHandler { .await?; let timer = shard .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at); + .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx); BatchedFeMessage::Exists { span, timer, @@ -684,7 +684,7 @@ impl PageServerHandler { .await?; let timer = shard .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at); + .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx); BatchedFeMessage::Nblocks { span, timer, @@ -700,7 +700,7 @@ impl PageServerHandler { .await?; let timer = shard .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at); + .start_timer_at(metrics::SmgrQueryType::GetDbSize, received_at, ctx); BatchedFeMessage::DbSize { span, timer, @@ -716,7 +716,7 @@ impl PageServerHandler { .await?; let timer = shard .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at); + .start_timer_at(metrics::SmgrQueryType::GetSlruSegment, received_at, ctx); BatchedFeMessage::GetSlruSegment { span, timer, @@ -772,7 +772,7 @@ impl PageServerHandler { // any serious waiting, e.g., for LSNs. let timer = shard .query_metrics - .start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at); + .start_timer_at(metrics::SmgrQueryType::GetPageAtLsn, received_at, ctx); let effective_request_lsn = match Self::wait_or_get_last_lsn( &shard, @@ -803,11 +803,11 @@ impl PageServerHandler { /// Post-condition: `batch` is Some() #[instrument(skip_all, level = tracing::Level::TRACE)] #[allow(clippy::boxed_local)] - fn pagestream_do_batch( + fn pagestream_do_batch<'c>( max_batch_size: NonZeroUsize, - batch: &mut Result, - this_msg: Result, - ) -> Result<(), Result> { + batch: &mut Result, QueryError>, + this_msg: Result, QueryError>, + ) -> Result<(), Result, QueryError>> { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); let this_msg = match this_msg { @@ -867,19 +867,19 @@ impl PageServerHandler { } #[instrument(level = tracing::Level::DEBUG, skip_all)] - async fn pagesteam_handle_batched_message( + async fn pagesteam_handle_batched_message<'c, IO>( &mut self, pgb_writer: &mut PostgresBackend, - batch: BatchedFeMessage, + batch: BatchedFeMessage<'c>, cancel: &CancellationToken, - ctx: &RequestContext, + ctx: &'c RequestContext, ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { // invoke handler function let (handler_results, span): ( - Vec>, + Vec), PageStreamError>>, _, ) = match batch { BatchedFeMessage::Exists { @@ -1574,15 +1574,15 @@ impl PageServerHandler { } #[instrument(skip_all)] - async fn handle_get_page_at_lsn_request_batched( + async fn handle_get_page_at_lsn_request_batched<'c>( &mut self, timeline: &Timeline, effective_lsn: Lsn, requests: smallvec::SmallVec< - [(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer); 1], + [(RelTag, BlockNumber, GlobalAndPerTimelineHistogramTimer<'c>); 1], >, - ctx: &RequestContext, - ) -> Vec> + ctx: &'c RequestContext, + ) -> Vec), PageStreamError>> { debug_assert_current_span_has_tenant_and_timeline_id();