diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3b8612a3fa..983a3079e4 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1366,10 +1366,7 @@ impl SmgrOpTimer { /// The first callers receives Some, subsequent ones None. /// /// See [`SmgrOpTimerState`] for more context. - pub(crate) fn observe_execution_end_flush_start( - &mut self, - at: Instant, - ) -> Option { + pub(crate) fn observe_execution_end(&mut self, at: Instant) -> Option { // NB: unlike the other observe_* methods, this one take()s. #[allow(clippy::question_mark)] // maintain similar code pattern. let Some(mut inner) = self.0.take() else { @@ -1403,7 +1400,6 @@ impl SmgrOpTimer { .. } = inner; Some(SmgrOpFlushInProgress { - flush_started_at: at, global_micros: global_flush_in_progress_micros, per_timeline_micros: per_timeline_flush_in_progress_micros, }) @@ -1419,7 +1415,6 @@ impl SmgrOpTimer { /// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there, /// and remove this struct from the code base. pub(crate) struct SmgrOpFlushInProgress { - flush_started_at: Instant, global_micros: IntCounter, per_timeline_micros: IntCounter, } @@ -1438,12 +1433,13 @@ impl Drop for SmgrOpTimer { self.observe_throttle_start(now); self.observe_throttle_done(ThrottleResult::NotThrottled { end: now }); self.observe_execution_start(now); - self.observe_execution_end_flush_start(now); + let maybe_flush_timer = self.observe_execution_end(now); + drop(maybe_flush_timer); } } impl SmgrOpFlushInProgress { - pub(crate) async fn measure(mut self, mut fut: Fut) -> O + pub(crate) async fn measure(self, mut started_at: Instant, mut fut: Fut) -> O where Fut: std::future::Future, { @@ -1455,12 +1451,12 @@ impl SmgrOpFlushInProgress { let mut observe_guard = scopeguard::guard( || { let now = Instant::now(); - let elapsed = now - self.flush_started_at; + let elapsed = now - started_at; self.global_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); self.per_timeline_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.flush_started_at = now; + started_at = now; }, |mut observe| { observe(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 69f1f1c051..972dad34d4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1074,7 +1074,7 @@ impl PageServerHandler { }; // invoke handler function - let (handler_results, span): ( + let (mut handler_results, span): ( Vec>, _, ) = match batch { @@ -1201,7 +1201,7 @@ impl PageServerHandler { } }; - // We purposefully don't count flush time into the smgr operaiton timer. + // We purposefully don't count flush time into the smgr operation timer. // // The reason is that current compute client will not perform protocol processing // if the postgres backend process is doing things other than `->smgr_read()`. @@ -1218,17 +1218,32 @@ impl PageServerHandler { // call, which (all unmeasured) adds syscall overhead but reduces time to first byte // and avoids building up a "giant" contiguous userspace buffer to hold the entire response. // TODO: vectored socket IO would be great, but pgb_writer doesn't support that. - // - // Since we're flushing multiple times in the loop, but only have access to the per-op - // timers inside the loop, we capture the flush start time here and reuse it to finish - // each op timer. - let flushing_start_time = Instant::now(); + let flush_timers = { + let flushing_start_time = Instant::now(); + let mut flush_timers = Vec::with_capacity(handler_results.len()); + for handler_result in &mut handler_results { + let flush_timer = match handler_result { + Ok((_, timer)) => Some( + timer + .observe_execution_end(flushing_start_time) + .expect("we are the first caller"), + ), + Err(_) => { + // TODO: measure errors + None + } + }; + flush_timers.push(flush_timer); + } + assert_eq!(flush_timers.len(), handler_results.len()); + flush_timers + }; // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. - for handler_result in handler_results { - let (response_msg, timer) = match handler_result { + for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { + let response_msg = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1252,16 +1267,14 @@ impl PageServerHandler { span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); - ( - PagestreamBeMessage::Error(PagestreamErrorResponse { - req: e.req, - message: e.err.to_string(), - }), - None, // TODO: measure errors - ) + + PagestreamBeMessage::Error(PagestreamErrorResponse { + req: e.req, + message: e.err.to_string(), + }) } }, - Ok((response_msg, timer)) => (response_msg, Some(timer)), + Ok((response_msg, _op_timer_already_observed)) => response_msg, }; // @@ -1272,18 +1285,12 @@ impl PageServerHandler { &response_msg.serialize(protocol_version), ))?; - let flushing_timer = timer.map(|mut timer| { - timer - .observe_execution_end_flush_start(flushing_start_time) - .expect("we are the first caller") - }); - // what we want to do let flush_fut = pgb_writer.flush(); // metric for how long flushing takes let flush_fut = match flushing_timer { Some(flushing_timer) => { - futures::future::Either::Left(flushing_timer.measure(flush_fut)) + futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut)) } None => futures::future::Either::Right(flush_fut), };