diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 62bf9acf01..96ee157856 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1223,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine { }); } -pub(crate) struct SmgrOpTimer { +pub(crate) struct SmgrOpTimer(Option); +pub(crate) struct SmgrOpTimerInner { global_latency_histo: Histogram, // Optional because not all op types are tracked per-timeline per_timeline_latency_histo: Option, + global_flush_in_progress_micros: IntCounter, + per_timeline_flush_in_progress_micros: IntCounter, + start: Instant, throttled: Duration, op: SmgrQueryType, } +pub(crate) struct SmgrOpFlushInProgress { + base: Instant, + global_micros: IntCounter, + per_timeline_micros: IntCounter, +} + impl SmgrOpTimer { pub(crate) fn deduct_throttle(&mut self, throttle: &Option) { let Some(throttle) = throttle else { return; }; - self.throttled += *throttle; + let inner = self.0.as_mut().expect("other public methods consume self"); + inner.throttled += *throttle; } -} -impl Drop for SmgrOpTimer { - fn drop(&mut self) { - let elapsed = self.start.elapsed(); + pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress { + let (flush_start, inner) = self + .smgr_op_end() + .expect("this method consume self, and the only other caller is drop handler"); + let SmgrOpTimerInner { + global_flush_in_progress_micros, + per_timeline_flush_in_progress_micros, + .. + } = inner; + SmgrOpFlushInProgress { + base: flush_start, + global_micros: global_flush_in_progress_micros, + per_timeline_micros: per_timeline_flush_in_progress_micros, + } + } - let elapsed = match elapsed.checked_sub(self.throttled) { + /// Returns `None`` if this method has already been called, `Some` otherwise. + fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> { + let inner = self.0.take()?; + + let now = Instant::now(); + let elapsed = now - inner.start; + + let elapsed = match elapsed.checked_sub(inner.throttled) { Some(elapsed) => elapsed, None => { use utils::rate_limit::RateLimit; @@ -1258,9 +1287,9 @@ impl Drop for SmgrOpTimer { }))) }); let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[self.op]; + let rate_limit = &mut guard[inner.op]; rate_limit.call(|| { - warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time"); + warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time"); }); elapsed // un-throttled time, more info than just saturating to 0 } @@ -1268,10 +1297,54 @@ impl Drop for SmgrOpTimer { let elapsed = elapsed.as_secs_f64(); - self.global_latency_histo.observe(elapsed); - if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo { + inner.global_latency_histo.observe(elapsed); + if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo { per_timeline_getpage_histo.observe(elapsed); } + + Some((now, inner)) + } +} + +impl Drop for SmgrOpTimer { + fn drop(&mut self) { + self.smgr_op_end(); + } +} + +impl SmgrOpFlushInProgress { + pub(crate) async fn measure(mut self, mut fut: Fut) -> O + where + Fut: std::future::Future, + { + let mut fut = std::pin::pin!(fut); + + let now = Instant::now(); + // Whenever observe_guard gets called, or dropped, + // it adds the time elapsed since its last call to metrics. + // Last call is tracked in `now`. + let mut observe_guard = scopeguard::guard( + || { + let elapsed = now - self.base; + self.global_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + self.per_timeline_micros + .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); + self.base = now; + }, + |mut observe| { + observe(); + }, + ); + + loop { + match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { + Ok(v) => return v, + Err(_timeout) => { + (*observe_guard)(); + } + } + } } } @@ -1302,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline { per_timeline_getpage_latency: Histogram, global_batch_size: Histogram, per_timeline_batch_size: Histogram, + global_flush_in_progress_micros: IntCounter, + per_timeline_flush_in_progress_micros: IntCounter, } static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { @@ -1464,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) { .set(value.try_into().unwrap()); } +static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_service_pagestream_flush_in_progress_micros", + "Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \ + If the flush is particularly slow, this counter will be updated periodically to make slow flushes \ + easily discoverable in monitoring. \ + Hence, this is NOT a completion latency historgram.", + &["tenant_id", "shard_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + +static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_page_service_pagestream_flush_in_progress_micros_global", + "Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.", + ) + .expect("failed to define a metric") +}); + impl SmgrQueryTimePerTimeline { pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_shard_id.tenant_id.to_string(); @@ -1504,6 +1599,12 @@ impl SmgrQueryTimePerTimeline { .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) .unwrap(); + let global_flush_in_progress_micros = + PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone(); + let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS + .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) + .unwrap(); + Self { global_started, global_latency, @@ -1511,6 +1612,8 @@ impl SmgrQueryTimePerTimeline { per_timeline_getpage_started, global_batch_size, per_timeline_batch_size, + global_flush_in_progress_micros, + per_timeline_flush_in_progress_micros, } } pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer { @@ -1523,13 +1626,17 @@ impl SmgrQueryTimePerTimeline { None }; - SmgrOpTimer { + SmgrOpTimer(Some(SmgrOpTimerInner { global_latency_histo: self.global_latency[op as usize].clone(), per_timeline_latency_histo, start: started_at, op, throttled: Duration::ZERO, - } + global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(), + per_timeline_flush_in_progress_micros: self + .per_timeline_flush_in_progress_micros + .clone(), + })) } pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) { @@ -2777,6 +2884,11 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + ]); } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7026df9527..97d94bbe7f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1017,10 +1017,8 @@ impl PageServerHandler { // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. - let mut timers: smallvec::SmallVec<[_; 1]> = - smallvec::SmallVec::with_capacity(handler_results.len()); for handler_result in handler_results { - let response_msg = match handler_result { + let (response_msg, timer) = match handler_result { Err(e) => match &e { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1044,34 +1042,66 @@ impl PageServerHandler { span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) + ( + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }), + None, // TODO: measure errors + ) } }, - Ok((response_msg, timer)) => { - // Extending the lifetime of the timers so observations on drop - // include the flush time. - timers.push(timer); - response_msg - } + Ok((response_msg, timer)) => (response_msg, Some(timer)), }; + // // marshal & transmit response message + // + pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - } - tokio::select! { - biased; - _ = cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb_writer.flush() => { - res?; + + // We purposefully don't count flush time into the timer. + // + // The reason is that current compute client will not perform protocol processing + // if the postgres backend process is doing things other than `->smgr_read()`. + // This is especially the case for prefetch. + // + // If the compute doesn't read from the connection, eventually TCP will backpressure + // all the way into our flush call below. + // + // The timer's underlying metric is used for a storage-internal latency SLO and + // we don't want to include latency in it that we can't control. + // And as pointed out above, in this case, we don't control the time that flush will take. + let flushing_timer = + timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing()); + + // what we want to do + let flush_fut = pgb_writer.flush(); + // metric for how long flushing takes + let flush_fut = match flushing_timer { + Some(flushing_timer) => { + futures::future::Either::Left(flushing_timer.measure(flush_fut)) + } + None => futures::future::Either::Right(flush_fut), + }; + // do it while respecting cancellation + let _: () = async move { + tokio::select! { + biased; + _ = cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = flush_fut => { + res?; + } + } + Ok(()) } + // and log the info! line inside the request span + .instrument(span.clone()) + .await?; } - drop(timers); Ok(()) } diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 52ed7da36b..a591e088ef 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -176,6 +176,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( counter("pageserver_tenant_throttling_wait_usecs_sum"), counter("pageserver_tenant_throttling_count"), counter("pageserver_timeline_wal_records_received"), + counter("pageserver_page_service_pagestream_flush_in_progress_micros"), *histogram("pageserver_page_service_batch_size"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, # "pageserver_directory_entries_count", -- only used if above a certain threshold