experiment: clone the metrics but don't measure or update the metrics

This commit is contained in:
Christian Schwarz
2025-02-07 11:12:01 +01:00
parent 98dd19ef53
commit 949fbc15a0

View File

@@ -1297,69 +1297,69 @@ enum SmgrOpTimerState {
impl SmgrOpTimer {
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_throttle_start(&mut self, at: Instant) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Received { received_at: _ } = &mut inner.timings else {
return;
};
inner.throttling.count_accounted_start.inc();
inner.timings = SmgrOpTimerState::Throttling {
throttle_started_at: at,
};
// let Some(inner) = self.0.as_mut() else {
// return;
// };
// let SmgrOpTimerState::Received { received_at: _ } = &mut inner.timings else {
// return;
// };
// inner.throttling.count_accounted_start.inc();
// inner.timings = SmgrOpTimerState::Throttling {
// throttle_started_at: at,
// };
}
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_throttle_done(&mut self, throttle: ThrottleResult) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Throttling {
throttle_started_at,
} = &inner.timings
else {
return;
};
inner.throttling.count_accounted_finish.inc();
match throttle {
ThrottleResult::NotThrottled { end } => {
inner.timings = SmgrOpTimerState::Batching {
throttle_done_at: end,
};
}
ThrottleResult::Throttled { end } => {
// update metrics
inner.throttling.count_throttled.inc();
inner
.throttling
.wait_time
.inc_by((end - *throttle_started_at).as_micros().try_into().unwrap());
// state transition
inner.timings = SmgrOpTimerState::Batching {
throttle_done_at: end,
};
}
}
// let Some(inner) = self.0.as_mut() else {
// return;
// };
// let SmgrOpTimerState::Throttling {
// throttle_started_at,
// } = &inner.timings
// else {
// return;
// };
// inner.throttling.count_accounted_finish.inc();
// match throttle {
// ThrottleResult::NotThrottled { end } => {
// inner.timings = SmgrOpTimerState::Batching {
// throttle_done_at: end,
// };
// }
// ThrottleResult::Throttled { end } => {
// // update metrics
// inner.throttling.count_throttled.inc();
// inner
// .throttling
// .wait_time
// .inc_by((end - *throttle_started_at).as_micros().try_into().unwrap());
// // state transition
// inner.timings = SmgrOpTimerState::Batching {
// throttle_done_at: end,
// };
// }
// }
}
/// See [`SmgrOpTimerState`] for more context.
pub(crate) fn observe_execution_start(&mut self, at: Instant) {
let Some(inner) = self.0.as_mut() else {
return;
};
let SmgrOpTimerState::Batching { throttle_done_at } = &inner.timings else {
return;
};
// update metrics
let batch = at - *throttle_done_at;
inner.global_batch_wait_time.observe(batch.as_secs_f64());
inner
.per_timeline_batch_wait_time
.observe(batch.as_secs_f64());
// state transition
inner.timings = SmgrOpTimerState::Executing {
execution_started_at: at,
}
// let Some(inner) = self.0.as_mut() else {
// return;
// };
// let SmgrOpTimerState::Batching { throttle_done_at } = &inner.timings else {
// return;
// };
// // update metrics
// let batch = at - *throttle_done_at;
// inner.global_batch_wait_time.observe(batch.as_secs_f64());
// inner
// .per_timeline_batch_wait_time
// .observe(batch.as_secs_f64());
// // state transition
// inner.timings = SmgrOpTimerState::Executing {
// execution_started_at: at,
// }
}
/// For all but the first caller, this is a no-op.
@@ -1370,33 +1370,33 @@ impl SmgrOpTimer {
&mut self,
at: Instant,
) -> Option<SmgrOpFlushInProgress> {
// NB: unlike the other observe_* methods, this one take()s.
// // NB: unlike the other observe_* methods, this one take()s.
#[allow(clippy::question_mark)] // maintain similar code pattern.
let Some(mut inner) = self.0.take() else {
return None;
};
let SmgrOpTimerState::Executing {
execution_started_at,
} = &inner.timings
else {
return None;
};
// update metrics
let execution = at - *execution_started_at;
inner
.global_execution_latency_histo
.observe(execution.as_secs_f64());
if let Some(per_timeline_execution_latency_histo) =
&inner.per_timeline_execution_latency_histo
{
per_timeline_execution_latency_histo.observe(execution.as_secs_f64());
}
// let SmgrOpTimerState::Executing {
// execution_started_at,
// } = &inner.timings
// else {
// return None;
// };
// // update metrics
// let execution = at - *execution_started_at;
// inner
// .global_execution_latency_histo
// .observe(execution.as_secs_f64());
// if let Some(per_timeline_execution_latency_histo) =
// &inner.per_timeline_execution_latency_histo
// {
// per_timeline_execution_latency_histo.observe(execution.as_secs_f64());
// }
// state transition
inner.timings = SmgrOpTimerState::Flushing;
// // state transition
// inner.timings = SmgrOpTimerState::Flushing;
// return the flush in progress object which
// will do the remaining metrics updates
// // return the flush in progress object which
// // will do the remaining metrics updates
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
@@ -1426,19 +1426,19 @@ pub(crate) struct SmgrOpFlushInProgress {
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
// In case of early drop, update any of the remaining metrics with
// observations so that (started,finished) counter pairs balance out
// and all counters on the latency path have the the same number of
// observations.
// It's technically lying and it would be better if each metric had
// a separate label or similar for cancelled requests.
// But we don't have that right now and counter pairs balancing
// out is useful when using the metrics in panels and whatnot.
let now = Instant::now();
self.observe_throttle_start(now);
self.observe_throttle_done(ThrottleResult::NotThrottled { end: now });
self.observe_execution_start(now);
self.observe_execution_end_flush_start(now);
// // In case of early drop, update any of the remaining metrics with
// // observations so that (started,finished) counter pairs balance out
// // and all counters on the latency path have the the same number of
// // observations.
// // It's technically lying and it would be better if each metric had
// // a separate label or similar for cancelled requests.
// // But we don't have that right now and counter pairs balancing
// // out is useful when using the metrics in panels and whatnot.
// let now = Instant::now();
// self.observe_throttle_start(now);
// self.observe_throttle_done(ThrottleResult::NotThrottled { end: now });
// self.observe_execution_start(now);
// self.observe_execution_end_flush_start(now);
}
}
@@ -1447,34 +1447,35 @@ impl SmgrOpFlushInProgress {
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
fut.await
// let mut fut = std::pin::pin!(fut);
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let now = Instant::now();
let elapsed = now - self.flush_started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.flush_started_at = now;
},
|mut observe| {
observe();
},
);
// // Whenever observe_guard gets called, or dropped,
// // it adds the time elapsed since its last call to metrics.
// // Last call is tracked in `now`.
// let mut observe_guard = scopeguard::guard(
// || {
// let now = Instant::now();
// let elapsed = now - self.flush_started_at;
// self.global_micros
// .inc_by(u64::try_from(elapsed.as_micros()).unwrap());
// self.per_timeline_micros
// .inc_by(u64::try_from(elapsed.as_micros()).unwrap());
// self.flush_started_at = now;
// },
// |mut observe| {
// observe();
// },
// );
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
// loop {
// match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
// Ok(v) => return v,
// Err(_timeout) => {
// (*observe_guard)();
// }
// }
// }
}
}