diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c3bea1e680..6ab1178a7b 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1297,69 +1297,69 @@ enum SmgrOpTimerState { impl SmgrOpTimer { /// See [`SmgrOpTimerState`] for more context. pub(crate) fn observe_throttle_start(&mut self, at: Instant) { - // let Some(inner) = self.0.as_mut() else { - // return; - // }; - // let SmgrOpTimerState::Received { received_at: _ } = &mut inner.timings else { - // return; - // }; - // inner.throttling.count_accounted_start.inc(); - // inner.timings = SmgrOpTimerState::Throttling { - // throttle_started_at: at, - // }; + let Some(inner) = self.0.as_mut() else { + return; + }; + let SmgrOpTimerState::Received { received_at: _ } = &mut inner.timings else { + return; + }; + inner.throttling.count_accounted_start.inc(); + inner.timings = SmgrOpTimerState::Throttling { + throttle_started_at: at, + }; } /// See [`SmgrOpTimerState`] for more context. pub(crate) fn observe_throttle_done(&mut self, throttle: ThrottleResult) { - // let Some(inner) = self.0.as_mut() else { - // return; - // }; - // let SmgrOpTimerState::Throttling { - // throttle_started_at, - // } = &inner.timings - // else { - // return; - // }; - // inner.throttling.count_accounted_finish.inc(); - // match throttle { - // ThrottleResult::NotThrottled { end } => { - // inner.timings = SmgrOpTimerState::Batching { - // throttle_done_at: end, - // }; - // } - // ThrottleResult::Throttled { end } => { - // // update metrics - // inner.throttling.count_throttled.inc(); - // inner - // .throttling - // .wait_time - // .inc_by((end - *throttle_started_at).as_micros().try_into().unwrap()); - // // state transition - // inner.timings = SmgrOpTimerState::Batching { - // throttle_done_at: end, - // }; - // } - // } + let Some(inner) = self.0.as_mut() else { + return; + }; + let SmgrOpTimerState::Throttling { + throttle_started_at, + } = &inner.timings + else { + return; + }; + inner.throttling.count_accounted_finish.inc(); + match throttle { + ThrottleResult::NotThrottled { end } => { + inner.timings = SmgrOpTimerState::Batching { + throttle_done_at: end, + }; + } + ThrottleResult::Throttled { end } => { + // update metrics + inner.throttling.count_throttled.inc(); + inner + .throttling + .wait_time + .inc_by((end - *throttle_started_at).as_micros().try_into().unwrap()); + // state transition + inner.timings = SmgrOpTimerState::Batching { + throttle_done_at: end, + }; + } + } } /// See [`SmgrOpTimerState`] for more context. pub(crate) fn observe_execution_start(&mut self, at: Instant) { - // let Some(inner) = self.0.as_mut() else { - // return; - // }; - // let SmgrOpTimerState::Batching { throttle_done_at } = &inner.timings else { - // return; - // }; - // // update metrics - // let batch = at - *throttle_done_at; - // inner.global_batch_wait_time.observe(batch.as_secs_f64()); - // inner - // .per_timeline_batch_wait_time - // .observe(batch.as_secs_f64()); - // // state transition - // inner.timings = SmgrOpTimerState::Executing { - // execution_started_at: at, - // } + let Some(inner) = self.0.as_mut() else { + return; + }; + let SmgrOpTimerState::Batching { throttle_done_at } = &inner.timings else { + return; + }; + // update metrics + let batch = at - *throttle_done_at; + inner.global_batch_wait_time.observe(batch.as_secs_f64()); + inner + .per_timeline_batch_wait_time + .observe(batch.as_secs_f64()); + // state transition + inner.timings = SmgrOpTimerState::Executing { + execution_started_at: at, + } } /// For all but the first caller, this is a no-op. @@ -1370,33 +1370,33 @@ impl SmgrOpTimer { &mut self, at: Instant, ) -> Option { - // // NB: unlike the other observe_* methods, this one take()s. + // NB: unlike the other observe_* methods, this one take()s. #[allow(clippy::question_mark)] // maintain similar code pattern. let Some(mut inner) = self.0.take() else { return None; }; - // let SmgrOpTimerState::Executing { - // execution_started_at, - // } = &inner.timings - // else { - // return None; - // }; - // // update metrics - // let execution = at - *execution_started_at; - // inner - // .global_execution_latency_histo - // .observe(execution.as_secs_f64()); - // if let Some(per_timeline_execution_latency_histo) = - // &inner.per_timeline_execution_latency_histo - // { - // per_timeline_execution_latency_histo.observe(execution.as_secs_f64()); - // } + let SmgrOpTimerState::Executing { + execution_started_at, + } = &inner.timings + else { + return None; + }; + // update metrics + let execution = at - *execution_started_at; + inner + .global_execution_latency_histo + .observe(execution.as_secs_f64()); + if let Some(per_timeline_execution_latency_histo) = + &inner.per_timeline_execution_latency_histo + { + per_timeline_execution_latency_histo.observe(execution.as_secs_f64()); + } - // // state transition - // inner.timings = SmgrOpTimerState::Flushing; + // state transition + inner.timings = SmgrOpTimerState::Flushing; - // // return the flush in progress object which - // // will do the remaining metrics updates + // return the flush in progress object which + // will do the remaining metrics updates let SmgrOpTimerInner { global_flush_in_progress_micros, per_timeline_flush_in_progress_micros, @@ -1426,19 +1426,19 @@ pub(crate) struct SmgrOpFlushInProgress { impl Drop for SmgrOpTimer { fn drop(&mut self) { - // // In case of early drop, update any of the remaining metrics with - // // observations so that (started,finished) counter pairs balance out - // // and all counters on the latency path have the the same number of - // // observations. - // // It's technically lying and it would be better if each metric had - // // a separate label or similar for cancelled requests. - // // But we don't have that right now and counter pairs balancing - // // out is useful when using the metrics in panels and whatnot. - // let now = Instant::now(); - // self.observe_throttle_start(now); - // self.observe_throttle_done(ThrottleResult::NotThrottled { end: now }); - // self.observe_execution_start(now); - // self.observe_execution_end_flush_start(now); + // In case of early drop, update any of the remaining metrics with + // observations so that (started,finished) counter pairs balance out + // and all counters on the latency path have the the same number of + // observations. + // It's technically lying and it would be better if each metric had + // a separate label or similar for cancelled requests. + // But we don't have that right now and counter pairs balancing + // out is useful when using the metrics in panels and whatnot. + let now = Instant::now(); + self.observe_throttle_start(now); + self.observe_throttle_done(ThrottleResult::NotThrottled { end: now }); + self.observe_execution_start(now); + self.observe_execution_end_flush_start(now); } } @@ -1447,35 +1447,34 @@ impl SmgrOpFlushInProgress { where Fut: std::future::Future, { - fut.await - // let mut fut = std::pin::pin!(fut); + let mut fut = std::pin::pin!(fut); - // // Whenever observe_guard gets called, or dropped, - // // it adds the time elapsed since its last call to metrics. - // // Last call is tracked in `now`. - // let mut observe_guard = scopeguard::guard( - // || { - // let now = Instant::now(); - // let elapsed = now - self.flush_started_at; - // self.global_micros - // .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - // self.per_timeline_micros - // .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - // self.flush_started_at = now; - // }, - // |mut observe| { - // observe(); - // }, - // ); + // Whenever observe_guard gets called, or dropped, + // it adds the time elapsed since its last call to metrics. + // Last call is tracked in `now`. + let mut observe_guard = scopeguard::guard( + || { + let now = Instant::now(); + let elapsed = now - self.flush_started_at; + self.global_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + self.per_timeline_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + self.flush_started_at = now; + }, + |mut observe| { + observe(); + }, + ); - // loop { - // match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { - // Ok(v) => return v, - // Err(_timeout) => { - // (*observe_guard)(); - // } - // } - // } + loop { + match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { + Ok(v) => return v, + Err(_timeout) => { + (*observe_guard)(); + } + } + } } }