From 6c6e5bfc2bb894099e2a73e3b923593a9b8cc65e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 24 Feb 2025 19:18:00 +0100 Subject: [PATCH] pageserver: tweak slow GetPage logging --- libs/utils/benches/README.md | 4 +- libs/utils/benches/benchmarks.rs | 12 +- libs/utils/src/logging.rs | 12 +- pageserver/src/page_service.rs | 294 ++++++++++++++++--------------- 4 files changed, 168 insertions(+), 154 deletions(-) diff --git a/libs/utils/benches/README.md b/libs/utils/benches/README.md index e23ec268c2..5afbe3cf2b 100644 --- a/libs/utils/benches/README.md +++ b/libs/utils/benches/README.md @@ -10,14 +10,14 @@ cargo bench --package utils cargo bench --package utils --bench benchmarks # Specific benchmark. -cargo bench --package utils --bench benchmarks warn_slow/enabled=true +cargo bench --package utils --bench benchmarks log_slow/enabled=true # List available benchmarks. cargo bench --package utils --benches -- --list # Generate flamegraph profiles using pprof-rs, profiling for 10 seconds. # Output in target/criterion/*/profile/flamegraph.svg. -cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10 +cargo bench --package utils --bench benchmarks log_slow/enabled=true --profile-time 10 ``` Additional charts and statistics are available in `target/criterion/report/index.html`. diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index cff3792f3a..348e27ac47 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -3,14 +3,14 @@ use std::time::Duration; use criterion::{criterion_group, criterion_main, Bencher, Criterion}; use pprof::criterion::{Output, PProfProfiler}; use utils::id; -use utils::logging::warn_slow; +use utils::logging::log_slow; // Register benchmarks with Criterion. criterion_group!( name = benches; config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); targets = bench_id_stringify, - bench_warn_slow, + bench_log_slow, ); criterion_main!(benches); @@ -29,9 +29,9 @@ pub fn bench_id_stringify(c: &mut Criterion) { }); } -pub fn bench_warn_slow(c: &mut Criterion) { +pub fn bench_log_slow(c: &mut Criterion) { for enabled in [false, true] { - c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| { + c.bench_function(&format!("log_slow/enabled={enabled}"), |b| { run_bench(b, enabled).unwrap() }); } @@ -45,11 +45,11 @@ pub fn bench_warn_slow(c: &mut Criterion) { .enable_all() .build()?; - // Test both with and without warn_slow, since we're essentially measuring Tokio scheduling + // Test both with and without log_slow, since we're essentially measuring Tokio scheduling // performance too. Use a simple noop future that yields once, to avoid any scheduler fast // paths for a ready future. if enabled { - b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now()))); + b.iter(|| runtime.block_on(log_slow("ready", THRESHOLD, tokio::task::yield_now()))); } else { b.iter(|| runtime.block_on(tokio::task::yield_now())); } diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 95c69ac8ba..2c36942f43 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -7,7 +7,7 @@ use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; use tokio::time::Instant; -use tracing::warn; +use tracing::info; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -322,11 +322,13 @@ impl std::fmt::Debug for SecretString { } } -/// Logs a periodic warning if a future is slow to complete. +/// Logs a periodic message if a future is slow to complete. /// /// This is performance-sensitive as it's used on the GetPage read path. +/// +/// TODO: consider upgrading this to a warning, but currently it fires too often. #[inline] -pub async fn warn_slow(name: &str, threshold: Duration, f: impl Future) -> O { +pub async fn log_slow(name: &str, threshold: Duration, f: impl Future) -> O { // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and // won't fit on the stack. let mut f = Box::pin(f); @@ -345,13 +347,13 @@ pub async fn warn_slow(name: &str, threshold: Duration, f: impl Future= threshold { - warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); + info!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); } return output; } let elapsed = started.elapsed().as_secs_f64(); - warn!("slow {name} still running after {elapsed:.3}s",); + info!("slow {name} still running after {elapsed:.3}s",); attempt += 1; } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7285697040..837bac474d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -40,7 +40,7 @@ use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::logging::warn_slow; +use utils::logging::log_slow; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; use utils::{ @@ -83,8 +83,8 @@ use std::os::fd::AsRawFd; /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); -/// Threshold at which to log a warning about slow GetPage requests. -const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); +/// Threshold at which to log slow GetPage requests. +const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); /////////////////////////////////////////////////////////////////////////////// @@ -1086,11 +1086,145 @@ impl PageServerHandler { batch }; - // invoke handler function - let (mut handler_results, span): ( + // Dispatch the batch to the appropriate request handler. + let (mut handler_results, span) = log_slow( + batch.as_static_str(), + LOG_SLOW_GETPAGE_THRESHOLD, + self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx), + ) + .await?; + + // We purposefully don't count flush time into the smgr operation timer. + // + // The reason is that current compute client will not perform protocol processing + // if the postgres backend process is doing things other than `->smgr_read()`. + // This is especially the case for prefetch. + // + // If the compute doesn't read from the connection, eventually TCP will backpressure + // all the way into our flush call below. + // + // The timer's underlying metric is used for a storage-internal latency SLO and + // we don't want to include latency in it that we can't control. + // And as pointed out above, in this case, we don't control the time that flush will take. + // + // We put each response in the batch onto the wire in a separate pgb_writer.flush() + // call, which (all unmeasured) adds syscall overhead but reduces time to first byte + // and avoids building up a "giant" contiguous userspace buffer to hold the entire response. + // TODO: vectored socket IO would be great, but pgb_writer doesn't support that. + let flush_timers = { + let flushing_start_time = Instant::now(); + let mut flush_timers = Vec::with_capacity(handler_results.len()); + for handler_result in &mut handler_results { + let flush_timer = match handler_result { + Ok((_, timer)) => Some( + timer + .observe_execution_end(flushing_start_time) + .expect("we are the first caller"), + ), + Err(_) => { + // TODO: measure errors + None + } + }; + flush_timers.push(flush_timer); + } + assert_eq!(flush_timers.len(), handler_results.len()); + flush_timers + }; + + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { + let response_msg = match handler_result { + Err(e) => match &e.err { + PageStreamError::Shutdown => { + // If we fail to fulfil a request during shutdown, which may be _because_ of + // shutdown, then do not send the error to the client. Instead just drop the + // connection. + span.in_scope(|| info!("dropping connection due to shutdown")); + return Err(QueryError::Shutdown); + } + PageStreamError::Reconnect(reason) => { + span.in_scope(|| info!("handler requested reconnect: {reason}")); + return Err(QueryError::Reconnect); + } + PageStreamError::Read(_) + | PageStreamError::LsnTimeout(_) + | PageStreamError::NotFound(_) + | PageStreamError::BadRequest(_) => { + // print the all details to the log with {:#}, but for the client the + // error message is enough. Do not log if shutting down, as the anyhow::Error + // here includes cancellation which is not an error. + let full = utils::error::report_compact_sources(&e.err); + span.in_scope(|| { + error!("error reading relation or page version: {full:#}") + }); + + PagestreamBeMessage::Error(PagestreamErrorResponse { + req: e.req, + message: e.err.to_string(), + }) + } + }, + Ok((response_msg, _op_timer_already_observed)) => response_msg, + }; + + // + // marshal & transmit response message + // + + pgb_writer.write_message_noflush(&BeMessage::CopyData( + &response_msg.serialize(protocol_version), + ))?; + + // what we want to do + let socket_fd = pgb_writer.socket_fd; + let flush_fut = pgb_writer.flush(); + // metric for how long flushing takes + let flush_fut = match flushing_timer { + Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure( + Instant::now(), + flush_fut, + socket_fd, + )), + None => futures::future::Either::Right(flush_fut), + }; + // do it while respecting cancellation + let _: () = async move { + tokio::select! { + biased; + _ = cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = flush_fut => { + res?; + } + } + Ok(()) + } + .await?; + } + Ok(()) + } + + /// Helper which dispatches a batched message to the appropriate handler. + /// Returns a vec of results, along with the extracted trace span. + async fn pagestream_dispatch_batched_message( + &mut self, + batch: BatchedFeMessage, + io_concurrency: IoConcurrency, + ctx: &RequestContext, + ) -> Result< + ( Vec>, - _, - ) = match batch { + Span, + ), + QueryError, + > { + Ok(match batch { BatchedFeMessage::Exists { span, timer, @@ -1212,122 +1346,7 @@ impl PageServerHandler { // call the handler. (vec![Err(error)], span) } - }; - - // We purposefully don't count flush time into the smgr operation timer. - // - // The reason is that current compute client will not perform protocol processing - // if the postgres backend process is doing things other than `->smgr_read()`. - // This is especially the case for prefetch. - // - // If the compute doesn't read from the connection, eventually TCP will backpressure - // all the way into our flush call below. - // - // The timer's underlying metric is used for a storage-internal latency SLO and - // we don't want to include latency in it that we can't control. - // And as pointed out above, in this case, we don't control the time that flush will take. - // - // We put each response in the batch onto the wire in a separate pgb_writer.flush() - // call, which (all unmeasured) adds syscall overhead but reduces time to first byte - // and avoids building up a "giant" contiguous userspace buffer to hold the entire response. - // TODO: vectored socket IO would be great, but pgb_writer doesn't support that. - let flush_timers = { - let flushing_start_time = Instant::now(); - let mut flush_timers = Vec::with_capacity(handler_results.len()); - for handler_result in &mut handler_results { - let flush_timer = match handler_result { - Ok((_, timer)) => Some( - timer - .observe_execution_end(flushing_start_time) - .expect("we are the first caller"), - ), - Err(_) => { - // TODO: measure errors - None - } - }; - flush_timers.push(flush_timer); - } - assert_eq!(flush_timers.len(), handler_results.len()); - flush_timers - }; - - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. - for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { - let response_msg = match handler_result { - Err(e) => match &e.err { - PageStreamError::Shutdown => { - // If we fail to fulfil a request during shutdown, which may be _because_ of - // shutdown, then do not send the error to the client. Instead just drop the - // connection. - span.in_scope(|| info!("dropping connection due to shutdown")); - return Err(QueryError::Shutdown); - } - PageStreamError::Reconnect(reason) => { - span.in_scope(|| info!("handler requested reconnect: {reason}")); - return Err(QueryError::Reconnect); - } - PageStreamError::Read(_) - | PageStreamError::LsnTimeout(_) - | PageStreamError::NotFound(_) - | PageStreamError::BadRequest(_) => { - // print the all details to the log with {:#}, but for the client the - // error message is enough. Do not log if shutting down, as the anyhow::Error - // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e.err); - span.in_scope(|| { - error!("error reading relation or page version: {full:#}") - }); - - PagestreamBeMessage::Error(PagestreamErrorResponse { - req: e.req, - message: e.err.to_string(), - }) - } - }, - Ok((response_msg, _op_timer_already_observed)) => response_msg, - }; - - // - // marshal & transmit response message - // - - pgb_writer.write_message_noflush(&BeMessage::CopyData( - &response_msg.serialize(protocol_version), - ))?; - - // what we want to do - let socket_fd = pgb_writer.socket_fd; - let flush_fut = pgb_writer.flush(); - // metric for how long flushing takes - let flush_fut = match flushing_timer { - Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure( - Instant::now(), - flush_fut, - socket_fd, - )), - None => futures::future::Either::Right(flush_fut), - }; - // do it while respecting cancellation - let _: () = async move { - tokio::select! { - biased; - _ = cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = flush_fut => { - res?; - } - } - Ok(()) - } - .await?; - } - Ok(()) + }) } /// Pagestream sub-protocol handler. @@ -1473,19 +1492,16 @@ impl PageServerHandler { } }; - let result = warn_slow( - msg.as_static_str(), - WARN_SLOW_GETPAGE_THRESHOLD, - self.pagesteam_handle_batched_message( + let result = self + .pagesteam_handle_batched_message( pgb_writer, msg, io_concurrency.clone(), &cancel, protocol_version, ctx, - ), - ) - .await; + ) + .await; match result { Ok(()) => {} Err(e) => break e, @@ -1649,17 +1665,13 @@ impl PageServerHandler { return Err(e); } }; - warn_slow( - batch.as_static_str(), - WARN_SLOW_GETPAGE_THRESHOLD, - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - io_concurrency.clone(), - &cancel, - protocol_version, - &ctx, - ), + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + io_concurrency.clone(), + &cancel, + protocol_version, + &ctx, ) .await?; }