Compare commits

...

3 Commits

3 changed files with 66 additions and 4 deletions

View File

@@ -375,6 +375,48 @@ async fn timed_after_cancellation<Fut: std::future::Future>(
}
}
async fn log_if_slow<Fut: std::future::Future>(
name: &str,
warn_after: std::time::Duration,
fut: Fut,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();
let mut fut = std::pin::pin!(fut);
match tokio::time::timeout(warn_after, &mut fut).await {
Ok(ret) => ret,
Err(_) => {
tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future"
);
let res = fut.await;
tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future completed"
);
res
}
}
}
pub(crate) trait LogIfSlowFutureExt: std::future::Future {
async fn log_if_slow(self, name: &'static str, warn_after: std::time::Duration) -> Self::Output
where
Self: Sized,
{
log_if_slow(name, warn_after, self).await
}
}
impl<Fut> LogIfSlowFutureExt for Fut where Fut: std::future::Future {}
#[cfg(test)]
mod timed_tests {
use super::timed;

View File

@@ -1,6 +1,7 @@
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.
use crate::LogIfSlowFutureExt;
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
@@ -931,6 +932,7 @@ impl PageServerHandler {
vec![self
.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_rel_exists_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
@@ -947,6 +949,7 @@ impl PageServerHandler {
vec![self
.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_nblocks_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
@@ -971,6 +974,10 @@ impl PageServerHandler {
ctx,
)
.instrument(span.clone())
.log_if_slow(
"handle_get_page_at_lsn_request_batched",
Duration::from_secs(10),
)
.await;
assert_eq!(res.len(), npages);
res
@@ -989,6 +996,7 @@ impl PageServerHandler {
vec![self
.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_db_size_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
@@ -1005,6 +1013,7 @@ impl PageServerHandler {
vec![self
.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_slru_segment_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
@@ -1103,6 +1112,7 @@ impl PageServerHandler {
}
// and log the info! line inside the request span
.instrument(span.clone())
.log_if_slow("flush_fut", Duration::from_secs(10))
.await?;
}
Ok(())
@@ -1360,9 +1370,10 @@ impl PageServerHandler {
&ctx,
request_span.clone(),
)
.log_if_slow("pagestream_read_message", Duration::from_secs(10))
.await;
let Some(read_res) = read_res.transpose() else {
debug!("client-initiated shutdown");
info!("client-initiated shutdown");
break;
};
exit |= read_res.is_err();
@@ -1370,6 +1381,7 @@ impl PageServerHandler {
.send(read_res, |batch, res| {
Self::pagestream_do_batch(max_batch_size, batch, res)
})
.log_if_slow("batch_tx.send", Duration::from_secs(10))
.await;
exit |= could_send.is_err();
}
@@ -1386,11 +1398,14 @@ impl PageServerHandler {
async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let maybe_batch = batch_rx.recv().await;
let maybe_batch = batch_rx
.recv()
.log_if_slow("batch_rx.recv", Duration::from_secs(10))
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
info!("upstream gone");
return Ok(());
}
};
@@ -1402,8 +1417,13 @@ impl PageServerHandler {
};
batch
.throttle_and_record_start_processing(&self.cancel)
.log_if_slow(
"throttle_and_record_start_processing",
Duration::from_secs(10),
)
.await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.log_if_slow("pagesteam_handle_batched_message", Duration::from_secs(10))
.await?;
}
}

View File

@@ -149,7 +149,7 @@ impl FromStr for TokioRuntimeMode {
static TOKIO_THREAD_STACK_SIZE: Lazy<NonZeroUsize> = Lazy::new(|| {
env::var("NEON_PAGESERVER_TOKIO_THREAD_STACK_SIZE")
// the default 2MiB are insufficent, especially in debug mode
.unwrap_or_else(|| NonZeroUsize::new(4 * 1024 * 1024).unwrap())
.unwrap_or_else(|| NonZeroUsize::new(8 * 1024 * 1024).unwrap())
});
static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {