diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3c4f7ca2fd..c412311afc 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::num::NonZeroUsize; -use std::os::fd::RawFd; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -1354,95 +1353,9 @@ impl SmgrOpTimer { pub(crate) fn observe_execution_start(&mut self, _at: Instant) { } - /// For all but the first caller, this is a no-op. - /// The first callers receives Some, subsequent ones None. - /// - /// See [`SmgrOpTimerState`] for more context. - pub(crate) fn observe_execution_end(&mut self, _at: Instant) -> Option { - None - } } -/// The last stage of request processing is serializing and flushing the request -/// into the TCP connection. We want to make slow flushes observable -/// _while they are occuring_, so this struct provides a wrapper method [`Self::measure`] -/// to periodically bump the metric. -/// -/// If in the future we decide that we're not interested in live updates, we can -/// add another `observe_*` method to [`SmgrOpTimer`], follow the existing pattern there, -/// and remove this struct from the code base. -pub(crate) struct SmgrOpFlushInProgress { -} -impl SmgrOpFlushInProgress { - /// The caller must guarantee that `socket_fd`` outlives this function. - pub(crate) async fn measure( - self, - started_at: Instant, - mut fut: Fut, - socket_fd: RawFd, - ) -> O - where - Fut: std::future::Future, - { - let mut fut = std::pin::pin!(fut); - - let mut logged = false; - let mut last_counter_increment_at = started_at; - let mut observe_guard = scopeguard::guard( - |is_timeout| { - let now = Instant::now(); - - // Increment counter - { - last_counter_increment_at = now; - } - - // Log something on every timeout, and on completion but only if we hit a timeout. - if is_timeout || logged { - logged = true; - let elapsed_total = now - started_at; - let msg = if is_timeout { - "slow flush ongoing" - } else { - "slow flush completed or cancelled" - }; - - let (inq, outq) = { - // SAFETY: caller guarantees that `socket_fd` outlives this function. - #[cfg(target_os = "linux")] - unsafe { - ( - utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2), - utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2), - ) - } - #[cfg(not(target_os = "linux"))] - { - _ = socket_fd; // appease unused lint on macOS - (-1, -1) - } - }; - - let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64()); - tracing::info!(elapsed_total_secs, inq, outq, msg); - } - }, - |mut observe| { - observe(false); - }, - ); - - loop { - match tokio::time::timeout(Duration::from_secs(10), &mut fut).await { - Ok(v) => return v, - Err(_timeout) => { - (*observe_guard)(true); - } - } - } - } -} #[derive( Debug, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index bbef0df35c..2d2ac137b0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1281,7 +1281,7 @@ impl PageServerHandler { // Dispatch the batch to the appropriate request handler. let log_slow_name = batch.as_static_str(); - let (mut handler_results, span) = { + let (handler_results, span) = { // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and // won't fit on the stack. let mut boxpinned = @@ -1381,7 +1381,7 @@ impl PageServerHandler { failpoint_support::sleep_millis_async!("before-pagestream-msg-flush", cancel); // what we want to do - let socket_fd = pgb_writer.socket_fd; + let flush_fut = pgb_writer.flush(); // metric for how long flushing takes // let flush_fut = match flushing_timer {