From c4f92a21bfe5322ede49e1a5f96f47349fba43e2 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 25 Nov 2024 09:48:40 +0100 Subject: [PATCH] WIP: batching observability improvements --- pageserver/src/metrics.rs | 53 +++++++++------------- pageserver/src/page_service.rs | 69 +++++++++++++++++++++++------ pageserver/src/pgdatadir_mapping.rs | 12 ++--- 3 files changed, 83 insertions(+), 51 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3cdc2a761e..92f7c4e2e6 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1178,19 +1178,21 @@ pub(crate) mod virtual_file_io_engine { }); } -struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { +struct GlobalAndPerTimelineHistogramTimer<'a, 'c, I> +where + I: IntoIterator + ExactSizeIterator, +{ global_latency_histo: &'a Histogram, // Optional because not all op types are tracked per-timeline per_timeline_latency_histo: Option<&'a Histogram>, ctx: &'c RequestContext, - start: std::time::Instant, + starts: I, op: SmgrQueryType, - count: usize, } -impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> { +impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_, _> { fn drop(&mut self) { let elapsed = self.start.elapsed(); let ex_throttled = self @@ -1392,37 +1394,27 @@ impl SmgrQueryTimePerTimeline { ) -> Option { self.start_timer_many(op, 1, ctx) } - pub(crate) fn start_timer_many<'c: 'a, 'a>( + + pub(crate) fn start_timer_at<'c: 'a, 'a>( &'a self, op: SmgrQueryType, - count: usize, + start: Instant, ctx: &'c RequestContext, ) -> Option { - let start = Instant::now(); - - self.global_started[op as usize].inc(); - - // We subtract time spent throttled from the observed latency. - match ctx.micros_spent_throttled.open() { - Ok(()) => (), - Err(error) => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[op]; - rate_limit.call(|| { - warn!(?op, error, "error opening micros_spent_throttled; this message is logged at a global rate limit"); - }); - } - } + self.start_timer_at_many(op, std::iter::once(start), ctx) + } + pub(crate) fn start_timer_at_many<'c: 'a, 'a, T>( + &'a self, + op: SmgrQueryType, + starts: T, + ctx: &'c RequestContext, + ) -> Option + where + T: IntoIterator + ExactSizeIterator, + { let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { - self.per_timeline_getpage_started.inc(); + self.per_timeline_getpage_started.inc_by(starts.len()); Some(&self.per_timeline_getpage_latency) } else { None @@ -1432,9 +1424,8 @@ impl SmgrQueryTimePerTimeline { global_latency_histo: &self.global_latency[op as usize], per_timeline_latency_histo, ctx, - start, op, - count, + starts, }) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 88f636c7da..0316948a7d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -536,11 +536,13 @@ impl From for QueryError { enum BatchedFeMessage { Exists { span: Span, + received_at: Instant, shard: timeline::handle::Handle, req: models::PagestreamExistsRequest, }, Nblocks { span: Span, + received_at: Instant, shard: timeline::handle::Handle, req: models::PagestreamNblocksRequest, }, @@ -548,15 +550,17 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>, }, DbSize { span: Span, + received_at: Instant, shard: timeline::handle::Handle, req: models::PagestreamDbSizeRequest, }, GetSlruSegment { span: Span, + received_at: Instant, shard: timeline::handle::Handle, req: models::PagestreamGetSlruSegmentRequest, }, @@ -628,6 +632,8 @@ impl PageServerHandler { msg = pgb.read_message() => { msg } }; + let received_at = Instant::now(); + let copy_data_bytes = match msg? { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => { @@ -656,7 +662,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::Exists { span, shard, req } + BatchedFeMessage::Exists { + span, + received_at, + shard, + req, + } } PagestreamFeMessage::Nblocks(req) => { let span = tracing::info_span!(parent: parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); @@ -664,7 +675,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::Nblocks { span, shard, req } + BatchedFeMessage::Nblocks { + span, + received_at, + shard, + req, + } } PagestreamFeMessage::DbSize(req) => { let span = tracing::info_span!(parent: parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); @@ -672,7 +688,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::DbSize { span, shard, req } + BatchedFeMessage::DbSize { + span, + received_at, + shard, + req, + } } PagestreamFeMessage::GetSlruSegment(req) => { let span = tracing::info_span!(parent: parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); @@ -680,7 +701,12 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .instrument(span.clone()) // sets `shard_id` field .await?; - BatchedFeMessage::GetSlruSegment { span, shard, req } + BatchedFeMessage::GetSlruSegment { + span, + received_at, + shard, + req, + } } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { request_lsn, @@ -743,7 +769,7 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![(rel, blkno)], + pages: smallvec::smallvec![(rel, blkno, received_at)], } } }; @@ -830,7 +856,12 @@ impl PageServerHandler { // invoke handler function let (handler_results, span): (Vec>, _) = match batch { - BatchedFeMessage::Exists { span, shard, req } => { + BatchedFeMessage::Exists { + span, + received_at: _, + shard, + req, + } => { fail::fail_point!("ps::handle-pagerequest-message::exists"); ( vec![ @@ -841,7 +872,12 @@ impl PageServerHandler { span, ) } - BatchedFeMessage::Nblocks { span, shard, req } => { + BatchedFeMessage::Nblocks { + span, + received_at, + shard, + req, + } => { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); ( vec![ @@ -1409,11 +1445,12 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamExistsRequest, + received_at: Instant, ctx: &RequestContext, ) -> Result { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelExists, ctx); + .start_timer_at(metrics::SmgrQueryType::GetRelExists, received_at, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( @@ -1439,11 +1476,12 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamNblocksRequest, + received_at: Instant, ctx: &RequestContext, ) -> Result { let _timer = timeline .query_metrics - .start_timer(metrics::SmgrQueryType::GetRelSize, ctx); + .start_timer_at(metrics::SmgrQueryType::GetRelSize, received_at, ctx); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( @@ -1469,11 +1507,14 @@ impl PageServerHandler { &mut self, timeline: &Timeline, req: &PagestreamDbSizeRequest, + received_at: Instant, ctx: &RequestContext, ) -> Result { - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetDbSize, ctx); + let _timer = timeline.query_metrics.start_timer_at( + metrics::SmgrQueryType::GetDbSize, + received_at, + ctx, + ); let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn( @@ -1500,7 +1541,7 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, Instant); 1]>, ctx: &RequestContext, ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 5995d1cc57..9d21652e00 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -202,7 +202,7 @@ impl Timeline { Version::Lsn(effective_lsn) => { let pages = smallvec::smallvec![(tag, blknum)]; let res = self - .get_rel_page_at_lsn_batched(pages, effective_lsn, ctx) + .get_rel_page_at_lsn_batched(&pages, effective_lsn, ctx) .await; assert_eq!(res.len(), 1); res.into_iter().next().unwrap() @@ -237,7 +237,7 @@ impl Timeline { /// The ordering of the returned vec corresponds to the ordering of `pages`. pub(crate) async fn get_rel_page_at_lsn_batched( &self, - pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + pages: &smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, effective_lsn: Lsn, ctx: &RequestContext, ) -> Vec> { @@ -251,7 +251,7 @@ impl Timeline { let result_slots = result.spare_capacity_mut(); let mut keys_slots: BTreeMap> = BTreeMap::default(); - for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() { + for (response_slot_idx, (tag, blknum)) in pages.iter().enumerate() { if tag.relnode == 0 { result_slots[response_slot_idx].write(Err(PageReconstructError::Other( RelationError::InvalidRelnode.into(), @@ -262,7 +262,7 @@ impl Timeline { } let nblocks = match self - .get_rel_size(tag, Version::Lsn(effective_lsn), ctx) + .get_rel_size(*tag, Version::Lsn(effective_lsn), ctx) .await { Ok(nblocks) => nblocks, @@ -273,7 +273,7 @@ impl Timeline { } }; - if blknum >= nblocks { + if *blknum >= nblocks { debug!( "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", tag, blknum, effective_lsn, nblocks @@ -283,7 +283,7 @@ impl Timeline { continue; } - let key = rel_block_to_key(tag, blknum); + let key = rel_block_to_key(*tag, *blknum); let key_slots = keys_slots.entry(key).or_default(); key_slots.push(response_slot_idx);