diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b4e20cb8b9..fe019cc678 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1239,12 +1239,20 @@ pub(crate) struct SmgrOpTimerInner { #[derive(Debug)] enum SmgrOpTimerState { Received { + // In the future, we may want to track the full time the request spent + // inside pageserver process (time spent in kernel buffers can't be tracked). + // `received_at` would be used for that. + #[allow(dead_code)] received_at: Instant, }, - ThrottleDoneExecutionStarting { - received_at: Instant, + ParsedRoutedThrottledNowBatching { throttle_started_at: Instant, - started_execution_at: Instant, + throttle_done_at: Instant, + }, + BatchedNowExecuting { + throttle_started_at: Instant, + throttle_done_at: Instant, + execution_started_at: Instant, }, } @@ -1255,22 +1263,20 @@ pub(crate) struct SmgrOpFlushInProgress { } impl SmgrOpTimer { - pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) { + pub(crate) fn observe_throttle_done(&mut self, throttle: &ThrottleResult) { let inner = self.0.as_mut().expect("other public methods consume self"); match (&mut inner.timings, throttle) { - (SmgrOpTimerState::Received { received_at }, throttle) => match throttle { + (SmgrOpTimerState::Received { received_at: _ }, throttle) => match throttle { ThrottleResult::NotThrottled { start } => { - inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { - received_at: *received_at, + inner.timings = SmgrOpTimerState::ParsedRoutedThrottledNowBatching { throttle_started_at: *start, - started_execution_at: *start, + throttle_done_at: *start, }; } ThrottleResult::Throttled { start, end } => { - inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { - received_at: *start, + inner.timings = SmgrOpTimerState::ParsedRoutedThrottledNowBatching { throttle_started_at: *start, - started_execution_at: *end, + throttle_done_at: *end, }; } }, @@ -1278,7 +1284,32 @@ impl SmgrOpTimer { } } - pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress { + pub(crate) fn observe_execution_start(&mut self, at: Instant) { + let inner = self.0.as_mut().expect("other public methods consume self"); + match &mut inner.timings { + SmgrOpTimerState::ParsedRoutedThrottledNowBatching { + throttle_started_at, + throttle_done_at, + } => { + inner.timings = SmgrOpTimerState::BatchedNowExecuting { + throttle_started_at: *throttle_started_at, + throttle_done_at: *throttle_done_at, + execution_started_at: at, + }; + } + x => panic!("called in unexpected state: {x:?}"), + } + } + + pub(crate) fn observe_execution_end_flush_start(mut self) -> SmgrOpFlushInProgress { + assert!( + matches!( + self.0.as_ref().unwrap().timings, + SmgrOpTimerState::BatchedNowExecuting { .. } + ), + "called in unexpected state: {:?}", + self.0.as_ref().unwrap().timings, + ); let (flush_start, inner) = self .smgr_op_end() .expect("this method consume self, and the only other caller is drop handler"); @@ -1300,41 +1331,53 @@ impl SmgrOpTimer { let now = Instant::now(); + // TODO: use label for unfinished requests instead of Duration::ZERO. + // This is quite rare in practice, only during tenant/pageservers shutdown. + let throttle; let batch; let execution; - let throttle; match inner.timings { - SmgrOpTimerState::Received { received_at } => { - batch = (now - received_at).as_secs_f64(); - // TODO: use label for dropped requests. - // This is quite rare in practice, only during tenant/pageservers shutdown. + SmgrOpTimerState::Received { received_at: _ } => { throttle = Duration::ZERO; - execution = Duration::ZERO.as_secs_f64(); + batch = Duration::ZERO; + execution = Duration::ZERO; } - SmgrOpTimerState::ThrottleDoneExecutionStarting { - received_at, + SmgrOpTimerState::ParsedRoutedThrottledNowBatching { throttle_started_at, - started_execution_at, + throttle_done_at, } => { - batch = (throttle_started_at - received_at).as_secs_f64(); - throttle = started_execution_at - throttle_started_at; - execution = (now - started_execution_at).as_secs_f64(); + throttle = throttle_done_at - throttle_started_at; + batch = Duration::ZERO; + execution = Duration::ZERO; + } + SmgrOpTimerState::BatchedNowExecuting { + throttle_started_at, + throttle_done_at, + execution_started_at, + } => { + throttle = throttle_done_at - throttle_started_at; + batch = throttle_done_at - execution_started_at; + execution = now - execution_started_at; } } // update time spent in batching - inner.global_batch_wait_time.observe(batch); - inner.per_timeline_batch_wait_time.observe(batch); + inner.global_batch_wait_time.observe(batch.as_secs_f64()); + inner + .per_timeline_batch_wait_time + .observe(batch.as_secs_f64()); // time spent in throttle metric is updated by throttle impl let _ = throttle; // update metrics for execution latency - inner.global_execution_latency_histo.observe(execution); + inner + .global_execution_latency_histo + .observe(execution.as_secs_f64()); if let Some(per_timeline_execution_latency_histo) = &inner.per_timeline_execution_latency_histo { - per_timeline_execution_latency_histo.observe(execution); + per_timeline_execution_latency_histo.observe(execution.as_secs_f64()); } Some((now, inner)) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a050509373..f72f7dab11 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -574,6 +574,25 @@ enum BatchedFeMessage { }, } +impl BatchedFeMessage { + fn observe_execution_start(&mut self, at: Instant) { + match self { + BatchedFeMessage::Exists { timer, .. } + | BatchedFeMessage::Nblocks { timer, .. } + | BatchedFeMessage::DbSize { timer, .. } + | BatchedFeMessage::GetSlruSegment { timer, .. } => { + timer.observe_execution_start(at); + } + BatchedFeMessage::GetPage { pages, .. } => { + for (_, _, timer) in pages { + timer.observe_execution_start(at); + } + } + BatchedFeMessage::RespondError { .. } => {} + } + } +} + impl PageServerHandler { pub fn new( tenant_manager: Arc, @@ -673,7 +692,7 @@ impl PageServerHandler { res = shard.pagestream_throttle.throttle(1) => res, _ = shard.cancel.cancelled() => return Err(QueryError::Shutdown), }; - timer.observe_throttle_done_execution_starting(&throttled); + timer.observe_throttle_done(&throttled); Ok(timer) } @@ -912,6 +931,13 @@ impl PageServerHandler { where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { + let started_at = Instant::now(); + let batch = { + let mut batch = batch; + batch.observe_execution_start(started_at); + batch + }; + // invoke handler function let (handler_results, span): ( Vec>, @@ -1071,8 +1097,7 @@ impl PageServerHandler { // The timer's underlying metric is used for a storage-internal latency SLO and // we don't want to include latency in it that we can't control. // And as pointed out above, in this case, we don't control the time that flush will take. - let flushing_timer = - timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing()); + let flushing_timer = timer.map(|timer| timer.observe_execution_end_flush_start()); // what we want to do let flush_fut = pgb_writer.flush(); @@ -1393,7 +1418,6 @@ impl PageServerHandler { return Err(e); } }; - self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) .await?; }