page_service: add slow-future-logging to the pipelined impl to debug #10309

This commit is contained in:
Christian Schwarz
2025-01-08 21:29:22 +01:00
parent fcfff72454
commit 321846d1e4
2 changed files with 56 additions and 3 deletions

View File

@@ -375,6 +375,48 @@ async fn timed_after_cancellation<Fut: std::future::Future>(
}
}
async fn log_if_slow<Fut: std::future::Future>(
name: &str,
warn_after: std::time::Duration,
fut: Fut,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();
let mut fut = std::pin::pin!(fut);
match tokio::time::timeout(warn_after, &mut fut).await {
Ok(ret) => ret,
Err(_) => {
tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future"
);
let res = fut.await;
tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future completed"
);
res
}
}
}
pub(crate) trait LogIfSlowFutureExt: std::future::Future {
async fn log_if_slow(self, name: &'static str, warn_after: std::time::Duration) -> Self::Output
where
Self: Sized,
{
log_if_slow(name, warn_after, self).await
}
}
impl<Fut> LogIfSlowFutureExt for Fut where Fut: std::future::Future {}
#[cfg(test)]
mod timed_tests {
use super::timed;

View File

@@ -1,6 +1,7 @@
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.
use crate::LogIfSlowFutureExt;
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
@@ -1360,9 +1361,10 @@ impl PageServerHandler {
&ctx,
request_span.clone(),
)
.log_if_slow("pagestream_read_message", Duration::from_secs(10))
.await;
let Some(read_res) = read_res.transpose() else {
debug!("client-initiated shutdown");
info!("client-initiated shutdown");
break;
};
exit |= read_res.is_err();
@@ -1370,6 +1372,7 @@ impl PageServerHandler {
.send(read_res, |batch, res| {
Self::pagestream_do_batch(max_batch_size, batch, res)
})
.log_if_slow("batch_tx.send", Duration::from_secs(10))
.await;
exit |= could_send.is_err();
}
@@ -1386,11 +1389,14 @@ impl PageServerHandler {
async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let maybe_batch = batch_rx.recv().await;
let maybe_batch = batch_rx
.recv()
.log_if_slow("batch_rx.recv", Duration::from_secs(10))
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
info!("upstream gone");
return Ok(());
}
};
@@ -1402,8 +1408,13 @@ impl PageServerHandler {
};
batch
.throttle_and_record_start_processing(&self.cancel)
.log_if_slow(
"throttle_and_record_start_processing",
Duration::from_secs(10),
)
.await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.log_if_slow("pagesteam_handle_batched_message", Duration::from_secs(10))
.await?;
}
}