diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 6ab1178a7b..719284a4d2 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1253,9 +1253,6 @@ pub(crate) struct SmgrOpTimerInner { global_batch_wait_time: Histogram, per_timeline_batch_wait_time: Histogram, - global_flush_in_progress_micros: IntCounter, - per_timeline_flush_in_progress_micros: IntCounter, - throttling: Arc, timings: SmgrOpTimerState, @@ -1366,20 +1363,18 @@ impl SmgrOpTimer { /// The first callers receives Some, subsequent ones None. /// /// See [`SmgrOpTimerState`] for more context. - pub(crate) fn observe_execution_end_flush_start( - &mut self, - at: Instant, - ) -> Option { + pub(crate) fn observe_execution_end_flush_start(&mut self, at: Instant) { // NB: unlike the other observe_* methods, this one take()s. #[allow(clippy::question_mark)] // maintain similar code pattern. let Some(mut inner) = self.0.take() else { - return None; + // NB: this take() isn't needed anymore, maybe we can simplify + return; }; let SmgrOpTimerState::Executing { execution_started_at, } = &inner.timings else { - return None; + return; }; // update metrics let execution = at - *execution_started_at; @@ -1394,36 +1389,9 @@ impl SmgrOpTimer { // state transition inner.timings = SmgrOpTimerState::Flushing; - - // return the flush in progress object which - // will do the remaining metrics updates - let SmgrOpTimerInner { - global_flush_in_progress_micros, - per_timeline_flush_in_progress_micros, - .. - } = inner; - Some(SmgrOpFlushInProgress { - flush_started_at: at, - global_micros: global_flush_in_progress_micros, - per_timeline_micros: per_timeline_flush_in_progress_micros, - }) } } -/// The last stage of request processing is serializing and flushing the request -/// into the TCP connection. We want to make slow flushes observable -/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`] -/// to periodically bump the metric. -/// -/// If in the future we decide that we're not interested in live updates, we can -/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there, -/// and remove this struct from the code base. -pub(crate) struct SmgrOpFlushInProgress { - flush_started_at: Instant, - global_micros: IntCounter, - per_timeline_micros: IntCounter, -} - impl Drop for SmgrOpTimer { fn drop(&mut self) { // In case of early drop, update any of the remaining metrics with @@ -1442,42 +1410,6 @@ impl Drop for SmgrOpTimer { } } -impl SmgrOpFlushInProgress { - pub(crate) async fn measure(mut self, mut fut: Fut) -> O - where - Fut: std::future::Future, - { - let mut fut = std::pin::pin!(fut); - - // Whenever observe_guard gets called, or dropped, - // it adds the time elapsed since its last call to metrics. - // Last call is tracked in `now`. - let mut observe_guard = scopeguard::guard( - || { - let now = Instant::now(); - let elapsed = now - self.flush_started_at; - self.global_micros - .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.per_timeline_micros - .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.flush_started_at = now; - }, - |mut observe| { - observe(); - }, - ); - - loop { - match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { - Ok(v) => return v, - Err(_timeout) => { - (*observe_guard)(); - } - } - } - } -} - #[derive( Debug, Clone, @@ -1513,6 +1445,56 @@ pub(crate) struct SmgrQueryTimePerTimeline { throttling: Arc, } +impl SmgrQueryTimePerTimeline { + pub(crate) async fn record_flush_in_progress( + shard: &crate::tenant::timeline::handle::WeakHandle< + crate::page_service::TenantManagerTypes, + >, + start_at: Instant, + mut fut: Fut, + ) -> O + where + Fut: std::future::Future, + { + let mut fut = std::pin::pin!(fut); + + // Whenever observe_guard gets called, or dropped, + // it adds the time elapsed since its last call to metrics. + // Last call is tracked in `now`. + let mut base = start_at; + let mut observe_guard = scopeguard::guard( + || { + let Ok(upgraded) = shard.upgrade() else { + return; + }; + let now = Instant::now(); + let elapsed = now - base; + upgraded + .query_metrics + .global_flush_in_progress_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + upgraded + .query_metrics + .per_timeline_flush_in_progress_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + base = now; + }, + |mut observe| { + observe(); + }, + ); + + loop { + match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { + Ok(v) => return v, + Err(_timeout) => { + (*observe_guard)(); + } + } + } + } +} + static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { register_int_counter_vec!( // it's a counter, but, name is prepared to extend it to a histogram of queue depth @@ -1797,10 +1779,6 @@ impl SmgrQueryTimePerTimeline { SmgrOpTimer(Some(SmgrOpTimerInner { global_execution_latency_histo: self.global_latency[op as usize].clone(), per_timeline_execution_latency_histo: per_timeline_latency_histo, - global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(), - per_timeline_flush_in_progress_micros: self - .per_timeline_flush_in_progress_micros - .clone(), global_batch_wait_time: self.global_batch_wait_time.clone(), per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(), throttling: self.throttling.clone(), diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 679cf3b2d5..a0721a9882 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1063,9 +1063,10 @@ impl PageServerHandler { }; // invoke handler function - let (handler_results, span): ( + let (handler_results, span, shard): ( Vec>, _, + _, ) = match batch { BatchedFeMessage::Exists { span, @@ -1082,6 +1083,7 @@ impl PageServerHandler { .map(|msg| (msg, timer)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr })], span, + Some(shard), ) } BatchedFeMessage::Nblocks { @@ -1099,6 +1101,7 @@ impl PageServerHandler { .map(|msg| (msg, timer)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr })], span, + Some(shard), ) } BatchedFeMessage::GetPage { @@ -1126,6 +1129,7 @@ impl PageServerHandler { res }, span, + Some(shard), ) } BatchedFeMessage::DbSize { @@ -1143,6 +1147,7 @@ impl PageServerHandler { .map(|msg| (msg, timer)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr })], span, + Some(shard), ) } BatchedFeMessage::GetSlruSegment { @@ -1160,6 +1165,7 @@ impl PageServerHandler { .map(|msg| (msg, timer)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr })], span, + Some(shard), ) } #[cfg(feature = "testing")] @@ -1181,12 +1187,13 @@ impl PageServerHandler { res }, span, + Some(shard), ) } BatchedFeMessage::RespondError { span, error } => { // We've already decided to respond with an error, so we don't need to // call the handler. - (vec![Err(error)], span) + (vec![Err(error)], span, None) } }; @@ -1194,7 +1201,7 @@ impl PageServerHandler { // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. for handler_result in handler_results { - let (response_msg, timer) = match handler_result { + let (response_msg, mut timer) = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1250,20 +1257,24 @@ impl PageServerHandler { // The timer's underlying metric is used for a storage-internal latency SLO and // we don't want to include latency in it that we can't control. // And as pointed out above, in this case, we don't control the time that flush will take. - let flushing_timer = timer.map(|mut timer| { - timer - .observe_execution_end_flush_start(Instant::now()) - .expect("we are the first caller") - }); + let start_flushing_at = Instant::now(); + if let Some(timer) = &mut timer { + timer.observe_execution_end_flush_start(start_flushing_at); + } // what we want to do let flush_fut = pgb_writer.flush(); - // metric for how long flushing takes - let flush_fut = match flushing_timer { - Some(flushing_timer) => { - futures::future::Either::Left(flushing_timer.measure(flush_fut)) - } - None => futures::future::Either::Right(flush_fut), + let flush_fut = if let Some(shard) = &shard { + // don't hold upgraded handle while flushing! + futures::future::Either::Left( + metrics::SmgrQueryTimePerTimeline::record_flush_in_progress( + shard, + start_flushing_at, + flush_fut, + ), + ) + } else { + futures::future::Either::Right(flush_fut) }; // do it while respecting cancellation let _: () = async move { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b6a349a209..5d9d3ea98c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -326,7 +326,7 @@ pub struct Timeline { // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code // in `crate::page_service` writes these metrics. - pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline, + pub(crate) query_metrics: Arc, directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM], @@ -2517,11 +2517,11 @@ impl Timeline { metrics, - query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new( + query_metrics: Arc::new(crate::metrics::SmgrQueryTimePerTimeline::new( &tenant_shard_id, &timeline_id, resources.pagestream_throttle_metrics, - ), + )), directory_metrics: array::from_fn(|_| AtomicU64::new(0)),