diff --git a/Cargo.lock b/Cargo.lock index 12232eaece..f62026696e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7616,6 +7616,7 @@ dependencies = [ "once_cell", "pin-project-lite", "postgres_connection", + "pprof", "pq_proto", "rand 0.8.5", "regex", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 62e0f4cfba..5020d82adf 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -27,7 +27,7 @@ humantime.workspace = true fail.workspace = true futures = { workspace = true } jsonwebtoken.workspace = true -nix = {workspace = true, features = [ "ioctl" ] } +nix = { workspace = true, features = ["ioctl"] } once_cell.workspace = true pin-project-lite.workspace = true regex.workspace = true @@ -61,6 +61,7 @@ bytes.workspace = true criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true +pprof.workspace = true serde_assert.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/libs/utils/benches/README.md b/libs/utils/benches/README.md new file mode 100644 index 0000000000..e23ec268c2 --- /dev/null +++ b/libs/utils/benches/README.md @@ -0,0 +1,26 @@ +## Utils Benchmarks + +To run benchmarks: + +```sh +# All benchmarks. +cargo bench --package utils + +# Specific file. +cargo bench --package utils --bench benchmarks + +# Specific benchmark. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true + +# List available benchmarks. +cargo bench --package utils --benches -- --list + +# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds. +# Output in target/criterion/*/profile/flamegraph.svg. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10 +``` + +Additional charts and statistics are available in `target/criterion/report/index.html`. + +Benchmarks are automatically compared against the previous run. To compare against other runs, see +`--baseline` and `--save-baseline`. \ No newline at end of file diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index 44eb36387c..cff3792f3a 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -1,5 +1,18 @@ -use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use pprof::criterion::{Output, PProfProfiler}; use utils::id; +use utils::logging::warn_slow; + +// Register benchmarks with Criterion. +criterion_group!( + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_id_stringify, + bench_warn_slow, +); +criterion_main!(benches); pub fn bench_id_stringify(c: &mut Criterion) { // Can only use public methods. @@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) { }); } -criterion_group!(benches, bench_id_stringify); -criterion_main!(benches); +pub fn bench_warn_slow(c: &mut Criterion) { + for enabled in [false, true] { + c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| { + run_bench(b, enabled).unwrap() + }); + } + + // The actual benchmark. + fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> { + const THRESHOLD: Duration = Duration::from_secs(1); + + // Use a multi-threaded runtime to avoid thread parking overhead when yielding. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + // Test both with and without warn_slow, since we're essentially measuring Tokio scheduling + // performance too. Use a simple noop future that yields once, to avoid any scheduler fast + // paths for a ready future. + if enabled { + b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now()))); + } else { + b.iter(|| runtime.block_on(tokio::task::yield_now())); + } + + Ok(()) + } +} diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 4a6069294d..95c69ac8ba 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,9 +1,13 @@ +use std::future::Future; use std::str::FromStr; +use std::time::Duration; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; +use tokio::time::Instant; +use tracing::warn; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString { } } +/// Logs a periodic warning if a future is slow to complete. +/// +/// This is performance-sensitive as it's used on the GetPage read path. +#[inline] +pub async fn warn_slow(name: &str, threshold: Duration, f: impl Future) -> O { + // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and + // won't fit on the stack. + let mut f = Box::pin(f); + + let started = Instant::now(); + let mut attempt = 1; + + loop { + // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common + // case where the timeout doesn't fire. + let deadline = started + attempt * threshold; + if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await { + // NB: we check if we exceeded the threshold even if the timeout never fired, because + // scheduling or execution delays may cause the future to succeed even if it exceeds the + // timeout. This costs an extra unconditional clock reading, but seems worth it to avoid + // false negatives. + let elapsed = started.elapsed(); + if elapsed >= threshold { + warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); + } + return output; + } + + let elapsed = started.elapsed().as_secs_f64(); + warn!("slow {name} still running after {elapsed:.3}s",); + + attempt += 1; + } +} + #[cfg(test)] mod tests { use metrics::{core::Opts, IntCounterVec}; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0c8da6f2a8..7285697040 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -34,11 +34,13 @@ use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; use std::time::{Duration, Instant}; +use strum_macros::IntoStaticStr; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::logging::warn_slow; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; use utils::{ @@ -81,6 +83,9 @@ use std::os::fd::AsRawFd; /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); +/// Threshold at which to log a warning about slow GetPage requests. +const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); + /////////////////////////////////////////////////////////////////////////////// pub struct Listener { @@ -594,6 +599,7 @@ struct BatchedTestRequest { /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum, /// so that we don't keep the [`Timeline::gate`] open while the batch /// is being built up inside the [`spsc_fold`] (pagestream pipelining). +#[derive(IntoStaticStr)] enum BatchedFeMessage { Exists { span: Span, @@ -638,6 +644,10 @@ enum BatchedFeMessage { } impl BatchedFeMessage { + fn as_static_str(&self) -> &'static str { + self.into() + } + fn observe_execution_start(&mut self, at: Instant) { match self { BatchedFeMessage::Exists { timer, .. } @@ -1463,17 +1473,20 @@ impl PageServerHandler { } }; - let err = self - .pagesteam_handle_batched_message( + let result = warn_slow( + msg.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( pgb_writer, msg, io_concurrency.clone(), &cancel, protocol_version, ctx, - ) - .await; - match err { + ), + ) + .await; + match result { Ok(()) => {} Err(e) => break e, } @@ -1636,13 +1649,17 @@ impl PageServerHandler { return Err(e); } }; - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - io_concurrency.clone(), - &cancel, - protocol_version, - &ctx, + warn_slow( + batch.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + io_concurrency.clone(), + &cancel, + protocol_version, + &ctx, + ), ) .await?; }