pageserver: tweak slow GetPage logging

This commit is contained in:
Erik Grinaker
2025-02-24 19:18:00 +01:00
parent 9c0fefee25
commit 6c6e5bfc2b
4 changed files with 168 additions and 154 deletions

View File

@@ -10,14 +10,14 @@ cargo bench --package utils
cargo bench --package utils --bench benchmarks
# Specific benchmark.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
cargo bench --package utils --bench benchmarks log_slow/enabled=true
# List available benchmarks.
cargo bench --package utils --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
cargo bench --package utils --bench benchmarks log_slow/enabled=true --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.

View File

@@ -3,14 +3,14 @@ use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::warn_slow;
use utils::logging::log_slow;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_id_stringify,
bench_warn_slow,
bench_log_slow,
);
criterion_main!(benches);
@@ -29,9 +29,9 @@ pub fn bench_id_stringify(c: &mut Criterion) {
});
}
pub fn bench_warn_slow(c: &mut Criterion) {
pub fn bench_log_slow(c: &mut Criterion) {
for enabled in [false, true] {
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
c.bench_function(&format!("log_slow/enabled={enabled}"), |b| {
run_bench(b, enabled).unwrap()
});
}
@@ -45,11 +45,11 @@ pub fn bench_warn_slow(c: &mut Criterion) {
.enable_all()
.build()?;
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
// Test both with and without log_slow, since we're essentially measuring Tokio scheduling
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
// paths for a ready future.
if enabled {
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
b.iter(|| runtime.block_on(log_slow("ready", THRESHOLD, tokio::task::yield_now())));
} else {
b.iter(|| runtime.block_on(tokio::task::yield_now()));
}

View File

@@ -7,7 +7,7 @@ use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::warn;
use tracing::info;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -322,11 +322,13 @@ impl std::fmt::Debug for SecretString {
}
}
/// Logs a periodic warning if a future is slow to complete.
/// Logs a periodic message if a future is slow to complete.
///
/// This is performance-sensitive as it's used on the GetPage read path.
///
/// TODO: consider upgrading this to a warning, but currently it fires too often.
#[inline]
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut f = Box::pin(f);
@@ -345,13 +347,13 @@ pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output
// false negatives.
let elapsed = started.elapsed();
if elapsed >= threshold {
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
info!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
}
return output;
}
let elapsed = started.elapsed().as_secs_f64();
warn!("slow {name} still running after {elapsed:.3}s",);
info!("slow {name} still running after {elapsed:.3}s",);
attempt += 1;
}

View File

@@ -40,7 +40,7 @@ use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::logging::warn_slow;
use utils::logging::log_slow;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
@@ -83,8 +83,8 @@ use std::os::fd::AsRawFd;
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log a warning about slow GetPage requests.
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
///////////////////////////////////////////////////////////////////////////////
@@ -1086,11 +1086,145 @@ impl PageServerHandler {
batch
};
// invoke handler function
let (mut handler_results, span): (
// Dispatch the batch to the appropriate request handler.
let (mut handler_results, span) = log_slow(
batch.as_static_str(),
LOG_SLOW_GETPAGE_THRESHOLD,
self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx),
)
.await?;
// We purposefully don't count flush time into the smgr operation timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
//
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
let flush_timers = {
let flushing_start_time = Instant::now();
let mut flush_timers = Vec::with_capacity(handler_results.len());
for handler_result in &mut handler_results {
let flush_timer = match handler_result {
Ok((_, timer)) => Some(
timer
.observe_execution_end(flushing_start_time)
.expect("we are the first caller"),
),
Err(_) => {
// TODO: measure errors
None
}
};
flush_timers.push(flush_timer);
}
assert_eq!(flush_timers.len(), handler_results.len());
flush_timers
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
let response_msg = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e.err);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
})
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
};
//
// marshal & transmit response message
//
pgb_writer.write_message_noflush(&BeMessage::CopyData(
&response_msg.serialize(protocol_version),
))?;
// what we want to do
let socket_fd = pgb_writer.socket_fd;
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
Instant::now(),
flush_fut,
socket_fd,
)),
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
.await?;
}
Ok(())
}
/// Helper which dispatches a batched message to the appropriate handler.
/// Returns a vec of results, along with the extracted trace span.
async fn pagestream_dispatch_batched_message(
&mut self,
batch: BatchedFeMessage,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Result<
(
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
_,
) = match batch {
Span,
),
QueryError,
> {
Ok(match batch {
BatchedFeMessage::Exists {
span,
timer,
@@ -1212,122 +1346,7 @@ impl PageServerHandler {
// call the handler.
(vec![Err(error)], span)
}
};
// We purposefully don't count flush time into the smgr operation timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
//
// We put each response in the batch onto the wire in a separate pgb_writer.flush()
// call, which (all unmeasured) adds syscall overhead but reduces time to first byte
// and avoids building up a "giant" contiguous userspace buffer to hold the entire response.
// TODO: vectored socket IO would be great, but pgb_writer doesn't support that.
let flush_timers = {
let flushing_start_time = Instant::now();
let mut flush_timers = Vec::with_capacity(handler_results.len());
for handler_result in &mut handler_results {
let flush_timer = match handler_result {
Ok((_, timer)) => Some(
timer
.observe_execution_end(flushing_start_time)
.expect("we are the first caller"),
),
Err(_) => {
// TODO: measure errors
None
}
};
flush_timers.push(flush_timer);
}
assert_eq!(flush_timers.len(), handler_results.len());
flush_timers
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
let response_msg = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e.err);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
})
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
};
//
// marshal & transmit response message
//
pgb_writer.write_message_noflush(&BeMessage::CopyData(
&response_msg.serialize(protocol_version),
))?;
// what we want to do
let socket_fd = pgb_writer.socket_fd;
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
Instant::now(),
flush_fut,
socket_fd,
)),
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
.await?;
}
Ok(())
})
}
/// Pagestream sub-protocol handler.
@@ -1473,19 +1492,16 @@ impl PageServerHandler {
}
};
let result = warn_slow(
msg.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
let result = self
.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
protocol_version,
ctx,
),
)
.await;
)
.await;
match result {
Ok(()) => {}
Err(e) => break e,
@@ -1649,17 +1665,13 @@ impl PageServerHandler {
return Err(e);
}
};
warn_slow(
batch.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
),
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
)
.await?;
}