From dd722fdaf6870e15cfbbe171a90a3e9e7764a605 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 7 Feb 2025 03:14:05 +0100 Subject: [PATCH] bring back flush in progress recording --- pageserver/src/metrics.rs | 44 +++++++++++++++++++++++++++++++++- pageserver/src/page_service.rs | 15 ++++++++++-- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 58959f0e7a..0cb06b2cea 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1244,10 +1244,11 @@ impl SmgrOpTimer { inner.throttled += *throttle; } - pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) { + pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> Instant { let (flush_start, inner) = self .smgr_op_end() .expect("this method consume self, and the only other caller is drop handler"); + flush_start } /// Returns `None`` if this method has already been called, `Some` otherwise. @@ -1324,6 +1325,47 @@ pub(crate) struct SmgrQueryTimePerTimeline { per_timeline_flush_in_progress_micros: IntCounter, } +impl SmgrQueryTimePerTimeline { + pub(crate) async fn record_flush_in_progress( + &self, + start_at: Instant, + mut fut: Fut, + ) -> O + where + Fut: std::future::Future, + { + let mut fut = std::pin::pin!(fut); + + // Whenever observe_guard gets called, or dropped, + // it adds the time elapsed since its last call to metrics. + // Last call is tracked in `now`. + let mut base = start_at; + let mut observe_guard = scopeguard::guard( + || { + let now = Instant::now(); + let elapsed = now - base; + self.global_flush_in_progress_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + self.per_timeline_flush_in_progress_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + base = now; + }, + |mut observe| { + observe(); + }, + ); + + loop { + match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { + Ok(v) => return v, + Err(_timeout) => { + (*observe_guard)(); + } + } + } + } +} + static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { register_int_counter_vec!( // it's a counter, but, name is prepared to extend it to a histogram of queue depth diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b4161ca6b5..af1bd88a55 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1077,11 +1077,22 @@ impl PageServerHandler { // The timer's underlying metric is used for a storage-internal latency SLO and // we don't want to include latency in it that we can't control. // And as pointed out above, in this case, we don't control the time that flush will take. - let flushing_timer = - timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing()); + let start_flushing_at = match timer { + Some(timer) => timer.observe_smgr_op_completion_and_start_flushing(), + None => Instant::now(), + }; // what we want to do let flush_fut = pgb_writer.flush(); + let flush_fut = if let Some(handle) = &shard { + futures::future::Either::Left( + handle + .query_metrics + .record_flush_in_progress(start_flushing_at, flush_fut), + ) + } else { + futures::future::Either::Right(flush_fut) + }; // do it while respecting cancellation let _: () = async move { tokio::select! {