diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index ff6af3566c..59396db4ff 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -375,6 +375,48 @@ async fn timed_after_cancellation( } } +async fn log_if_slow( + name: &str, + warn_after: std::time::Duration, + fut: Fut, +) -> ::Output { + let started = std::time::Instant::now(); + + let mut fut = std::pin::pin!(fut); + + match tokio::time::timeout(warn_after, &mut fut).await { + Ok(ret) => ret, + Err(_) => { + tracing::info!( + what = name, + elapsed_ms = started.elapsed().as_millis(), + "slow future" + ); + + let res = fut.await; + + tracing::info!( + what = name, + elapsed_ms = started.elapsed().as_millis(), + "slow future completed" + ); + + res + } + } +} + +pub(crate) trait LogIfSlowFutureExt: std::future::Future { + async fn log_if_slow(self, name: &'static str, warn_after: std::time::Duration) -> Self::Output + where + Self: Sized, + { + log_if_slow(name, warn_after, self).await + } +} + +impl LogIfSlowFutureExt for Fut where Fut: std::future::Future {} + #[cfg(test)] mod timed_tests { use super::timed; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d00ec11a76..40bab69ff4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1,6 +1,7 @@ //! The Page Service listens for client connections and serves their GetPage@LSN //! requests. +use crate::LogIfSlowFutureExt; use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; @@ -1360,9 +1361,10 @@ impl PageServerHandler { &ctx, request_span.clone(), ) + .log_if_slow("pagestream_read_message", Duration::from_secs(10)) .await; let Some(read_res) = read_res.transpose() else { - debug!("client-initiated shutdown"); + info!("client-initiated shutdown"); break; }; exit |= read_res.is_err(); @@ -1370,6 +1372,7 @@ impl PageServerHandler { .send(read_res, |batch, res| { Self::pagestream_do_batch(max_batch_size, batch, res) }) + .log_if_slow("batch_tx.send", Duration::from_secs(10)) .await; exit |= could_send.is_err(); } @@ -1386,11 +1389,14 @@ impl PageServerHandler { async move { let _cancel_batcher = cancel_batcher.drop_guard(); loop { - let maybe_batch = batch_rx.recv().await; + let maybe_batch = batch_rx + .recv() + .log_if_slow("batch_rx.recv", Duration::from_secs(10)) + .await; let batch = match maybe_batch { Ok(batch) => batch, Err(spsc_fold::RecvError::SenderGone) => { - debug!("upstream gone"); + info!("upstream gone"); return Ok(()); } }; @@ -1402,8 +1408,13 @@ impl PageServerHandler { }; batch .throttle_and_record_start_processing(&self.cancel) + .log_if_slow( + "throttle_and_record_start_processing", + Duration::from_secs(10), + ) .await?; self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) + .log_if_slow("pagesteam_handle_batched_message", Duration::from_secs(10)) .await?; } }