following up to the last commit, the observation points that we use to calculate the various latency metrics are different, adjust for that

This commit is contained in:
Christian Schwarz
2025-01-10 17:12:45 +01:00
parent 8793e28ccb
commit 4d496a29c2
2 changed files with 99 additions and 32 deletions

View File

@@ -1239,12 +1239,20 @@ pub(crate) struct SmgrOpTimerInner {
#[derive(Debug)]
enum SmgrOpTimerState {
Received {
// In the future, we may want to track the full time the request spent
// inside pageserver process (time spent in kernel buffers can't be tracked).
// `received_at` would be used for that.
#[allow(dead_code)]
received_at: Instant,
},
ThrottleDoneExecutionStarting {
received_at: Instant,
ParsedRoutedThrottledNowBatching {
throttle_started_at: Instant,
started_execution_at: Instant,
throttle_done_at: Instant,
},
BatchedNowExecuting {
throttle_started_at: Instant,
throttle_done_at: Instant,
execution_started_at: Instant,
},
}
@@ -1255,22 +1263,20 @@ pub(crate) struct SmgrOpFlushInProgress {
}
impl SmgrOpTimer {
pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) {
pub(crate) fn observe_throttle_done(&mut self, throttle: &ThrottleResult) {
let inner = self.0.as_mut().expect("other public methods consume self");
match (&mut inner.timings, throttle) {
(SmgrOpTimerState::Received { received_at }, throttle) => match throttle {
(SmgrOpTimerState::Received { received_at: _ }, throttle) => match throttle {
ThrottleResult::NotThrottled { start } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *received_at,
inner.timings = SmgrOpTimerState::ParsedRoutedThrottledNowBatching {
throttle_started_at: *start,
started_execution_at: *start,
throttle_done_at: *start,
};
}
ThrottleResult::Throttled { start, end } => {
inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at: *start,
inner.timings = SmgrOpTimerState::ParsedRoutedThrottledNowBatching {
throttle_started_at: *start,
started_execution_at: *end,
throttle_done_at: *end,
};
}
},
@@ -1278,7 +1284,32 @@ impl SmgrOpTimer {
}
}
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
pub(crate) fn observe_execution_start(&mut self, at: Instant) {
let inner = self.0.as_mut().expect("other public methods consume self");
match &mut inner.timings {
SmgrOpTimerState::ParsedRoutedThrottledNowBatching {
throttle_started_at,
throttle_done_at,
} => {
inner.timings = SmgrOpTimerState::BatchedNowExecuting {
throttle_started_at: *throttle_started_at,
throttle_done_at: *throttle_done_at,
execution_started_at: at,
};
}
x => panic!("called in unexpected state: {x:?}"),
}
}
pub(crate) fn observe_execution_end_flush_start(mut self) -> SmgrOpFlushInProgress {
assert!(
matches!(
self.0.as_ref().unwrap().timings,
SmgrOpTimerState::BatchedNowExecuting { .. }
),
"called in unexpected state: {:?}",
self.0.as_ref().unwrap().timings,
);
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
@@ -1300,41 +1331,53 @@ impl SmgrOpTimer {
let now = Instant::now();
// TODO: use label for unfinished requests instead of Duration::ZERO.
// This is quite rare in practice, only during tenant/pageservers shutdown.
let throttle;
let batch;
let execution;
let throttle;
match inner.timings {
SmgrOpTimerState::Received { received_at } => {
batch = (now - received_at).as_secs_f64();
// TODO: use label for dropped requests.
// This is quite rare in practice, only during tenant/pageservers shutdown.
SmgrOpTimerState::Received { received_at: _ } => {
throttle = Duration::ZERO;
execution = Duration::ZERO.as_secs_f64();
batch = Duration::ZERO;
execution = Duration::ZERO;
}
SmgrOpTimerState::ThrottleDoneExecutionStarting {
received_at,
SmgrOpTimerState::ParsedRoutedThrottledNowBatching {
throttle_started_at,
started_execution_at,
throttle_done_at,
} => {
batch = (throttle_started_at - received_at).as_secs_f64();
throttle = started_execution_at - throttle_started_at;
execution = (now - started_execution_at).as_secs_f64();
throttle = throttle_done_at - throttle_started_at;
batch = Duration::ZERO;
execution = Duration::ZERO;
}
SmgrOpTimerState::BatchedNowExecuting {
throttle_started_at,
throttle_done_at,
execution_started_at,
} => {
throttle = throttle_done_at - throttle_started_at;
batch = throttle_done_at - execution_started_at;
execution = now - execution_started_at;
}
}
// update time spent in batching
inner.global_batch_wait_time.observe(batch);
inner.per_timeline_batch_wait_time.observe(batch);
inner.global_batch_wait_time.observe(batch.as_secs_f64());
inner
.per_timeline_batch_wait_time
.observe(batch.as_secs_f64());
// time spent in throttle metric is updated by throttle impl
let _ = throttle;
// update metrics for execution latency
inner.global_execution_latency_histo.observe(execution);
inner
.global_execution_latency_histo
.observe(execution.as_secs_f64());
if let Some(per_timeline_execution_latency_histo) =
&inner.per_timeline_execution_latency_histo
{
per_timeline_execution_latency_histo.observe(execution);
per_timeline_execution_latency_histo.observe(execution.as_secs_f64());
}
Some((now, inner))

View File

@@ -574,6 +574,25 @@ enum BatchedFeMessage {
},
}
impl BatchedFeMessage {
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
| BatchedFeMessage::Nblocks { timer, .. }
| BatchedFeMessage::DbSize { timer, .. }
| BatchedFeMessage::GetSlruSegment { timer, .. } => {
timer.observe_execution_start(at);
}
BatchedFeMessage::GetPage { pages, .. } => {
for (_, _, timer) in pages {
timer.observe_execution_start(at);
}
}
BatchedFeMessage::RespondError { .. } => {}
}
}
}
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
@@ -673,7 +692,7 @@ impl PageServerHandler {
res = shard.pagestream_throttle.throttle(1) => res,
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
};
timer.observe_throttle_done_execution_starting(&throttled);
timer.observe_throttle_done(&throttled);
Ok(timer)
}
@@ -912,6 +931,13 @@ impl PageServerHandler {
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let started_at = Instant::now();
let batch = {
let mut batch = batch;
batch.observe_execution_start(started_at);
batch
};
// invoke handler function
let (handler_results, span): (
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), PageStreamError>>,
@@ -1071,8 +1097,7 @@ impl PageServerHandler {
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
let flushing_timer = timer.map(|timer| timer.observe_execution_end_flush_start());
// what we want to do
let flush_fut = pgb_writer.flush();
@@ -1393,7 +1418,6 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.await?;
}