From b3a911ff8c2867e5526a3dda56ac95d3f78562df Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 8 Feb 2025 10:28:09 +0100 Subject: [PATCH 1/2] fix(page_service / batching): smgr op latency metrics includes the flush time of preceding requests (#10728) Before this PR, if a batch contains N responses, the smgr op latency reported for response (N-i) would include the time we spent flushing the preceding requests. refs: - fixup of https://github.com/neondatabase/neon/pull/10042 - fixes https://github.com/neondatabase/neon/issues/10674 --- pageserver/src/page_service.rs | 37 ++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 24a350399d..0a2f851955 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1190,6 +1190,29 @@ impl PageServerHandler { } }; + // We purposefully don't count flush time into the smgr operaiton timer. + // + // The reason is that current compute client will not perform protocol processing + // if the postgres backend process is doing things other than `->smgr_read()`. + // This is especially the case for prefetch. + // + // If the compute doesn't read from the connection, eventually TCP will backpressure + // all the way into our flush call below. + // + // The timer's underlying metric is used for a storage-internal latency SLO and + // we don't want to include latency in it that we can't control. + // And as pointed out above, in this case, we don't control the time that flush will take. + // + // We put each response in the batch onto the wire in a separate pgb_writer.flush() + // call, which (all unmeasured) adds syscall overhead but reduces time to first byte + // and avoids building up a "giant" contiguous userspace buffer to hold the entire response. + // TODO: vectored socket IO would be great, but pgb_writer doesn't support that. + // + // Since we're flushing multiple times in the loop, but only have access to the per-op + // timers inside the loop, we capture the flush start time here and reuse it to finish + // each op timer. + let flushing_start_time = Instant::now(); + // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. @@ -1238,21 +1261,9 @@ impl PageServerHandler { &response_msg.serialize(protocol_version), ))?; - // We purposefully don't count flush time into the timer. - // - // The reason is that current compute client will not perform protocol processing - // if the postgres backend process is doing things other than `->smgr_read()`. - // This is especially the case for prefetch. - // - // If the compute doesn't read from the connection, eventually TCP will backpressure - // all the way into our flush call below. - // - // The timer's underlying metric is used for a storage-internal latency SLO and - // we don't want to include latency in it that we can't control. - // And as pointed out above, in this case, we don't control the time that flush will take. let flushing_timer = timer.map(|mut timer| { timer - .observe_execution_end_flush_start(Instant::now()) + .observe_execution_end_flush_start(flushing_start_time) .expect("we are the first caller") }); From d3d3bfc6d081cc8eeaa8ec9bca1b874d1fee24d4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Feb 2025 15:05:59 +0100 Subject: [PATCH 2/2] fix(page_service / batching): smgr op latency metric of dropped responses include flush time (#10756) # Problem Say we have a batch of 10 responses to send out. Then, even with - #10728 we've still only called observe_execution_end_flush_start for the first 3 responses. The remaining 7 response timers are still ticking. When compute now closes the connection, the waiting flush fails with an error and we `drop()` the remaining 7 responses' smgr op timers. The `impl Drop for SmgrOpTimer` will observe an execution time that includes the flush time. In practice, this is supsected to produce the `+Inf` observations in the smgr op latency histogram we've seen since the introduction of pipelining, even after shipping #10728. refs: - fixup of https://github.com/neondatabase/neon/pull/10042 - fixup of https://github.com/neondatabase/neon/pull/10728 - fixes https://github.com/neondatabase/neon/issues/10754 --- pageserver/src/metrics.rs | 16 ++++------ pageserver/src/page_service.rs | 55 +++++++++++++++++++--------------- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3b8612a3fa..983a3079e4 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1366,10 +1366,7 @@ impl SmgrOpTimer { /// The first callers receives Some, subsequent ones None. /// /// See [`SmgrOpTimerState`] for more context. - pub(crate) fn observe_execution_end_flush_start( - &mut self, - at: Instant, - ) -> Option { + pub(crate) fn observe_execution_end(&mut self, at: Instant) -> Option { // NB: unlike the other observe_* methods, this one take()s. #[allow(clippy::question_mark)] // maintain similar code pattern. let Some(mut inner) = self.0.take() else { @@ -1403,7 +1400,6 @@ impl SmgrOpTimer { .. } = inner; Some(SmgrOpFlushInProgress { - flush_started_at: at, global_micros: global_flush_in_progress_micros, per_timeline_micros: per_timeline_flush_in_progress_micros, }) @@ -1419,7 +1415,6 @@ impl SmgrOpTimer { /// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there, /// and remove this struct from the code base. pub(crate) struct SmgrOpFlushInProgress { - flush_started_at: Instant, global_micros: IntCounter, per_timeline_micros: IntCounter, } @@ -1438,12 +1433,13 @@ impl Drop for SmgrOpTimer { self.observe_throttle_start(now); self.observe_throttle_done(ThrottleResult::NotThrottled { end: now }); self.observe_execution_start(now); - self.observe_execution_end_flush_start(now); + let maybe_flush_timer = self.observe_execution_end(now); + drop(maybe_flush_timer); } } impl SmgrOpFlushInProgress { - pub(crate) async fn measure(mut self, mut fut: Fut) -> O + pub(crate) async fn measure(self, mut started_at: Instant, mut fut: Fut) -> O where Fut: std::future::Future, { @@ -1455,12 +1451,12 @@ impl SmgrOpFlushInProgress { let mut observe_guard = scopeguard::guard( || { let now = Instant::now(); - let elapsed = now - self.flush_started_at; + let elapsed = now - started_at; self.global_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); self.per_timeline_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.flush_started_at = now; + started_at = now; }, |mut observe| { observe(); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0a2f851955..310145e9ce 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1063,7 +1063,7 @@ impl PageServerHandler { }; // invoke handler function - let (handler_results, span): ( + let (mut handler_results, span): ( Vec>, _, ) = match batch { @@ -1190,7 +1190,7 @@ impl PageServerHandler { } }; - // We purposefully don't count flush time into the smgr operaiton timer. + // We purposefully don't count flush time into the smgr operation timer. // // The reason is that current compute client will not perform protocol processing // if the postgres backend process is doing things other than `->smgr_read()`. @@ -1207,17 +1207,32 @@ impl PageServerHandler { // call, which (all unmeasured) adds syscall overhead but reduces time to first byte // and avoids building up a "giant" contiguous userspace buffer to hold the entire response. // TODO: vectored socket IO would be great, but pgb_writer doesn't support that. - // - // Since we're flushing multiple times in the loop, but only have access to the per-op - // timers inside the loop, we capture the flush start time here and reuse it to finish - // each op timer. - let flushing_start_time = Instant::now(); + let flush_timers = { + let flushing_start_time = Instant::now(); + let mut flush_timers = Vec::with_capacity(handler_results.len()); + for handler_result in &mut handler_results { + let flush_timer = match handler_result { + Ok((_, timer)) => Some( + timer + .observe_execution_end(flushing_start_time) + .expect("we are the first caller"), + ), + Err(_) => { + // TODO: measure errors + None + } + }; + flush_timers.push(flush_timer); + } + assert_eq!(flush_timers.len(), handler_results.len()); + flush_timers + }; // Map handler result to protocol behavior. // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. - for handler_result in handler_results { - let (response_msg, timer) = match handler_result { + for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { + let response_msg = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1241,16 +1256,14 @@ impl PageServerHandler { span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); - ( - PagestreamBeMessage::Error(PagestreamErrorResponse { - req: e.req, - message: e.err.to_string(), - }), - None, // TODO: measure errors - ) + + PagestreamBeMessage::Error(PagestreamErrorResponse { + req: e.req, + message: e.err.to_string(), + }) } }, - Ok((response_msg, timer)) => (response_msg, Some(timer)), + Ok((response_msg, _op_timer_already_observed)) => response_msg, }; // @@ -1261,18 +1274,12 @@ impl PageServerHandler { &response_msg.serialize(protocol_version), ))?; - let flushing_timer = timer.map(|mut timer| { - timer - .observe_execution_end_flush_start(flushing_start_time) - .expect("we are the first caller") - }); - // what we want to do let flush_fut = pgb_writer.flush(); // metric for how long flushing takes let flush_fut = match flushing_timer { Some(flushing_timer) => { - futures::future::Either::Left(flushing_timer.measure(flush_fut)) + futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut)) } None => futures::future::Either::Right(flush_fut), };