diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index 12c620ec87..35f3baaed1 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -49,7 +49,13 @@ pub fn bench_log_slow(c: &mut Criterion) { // performance too. Use a simple noop future that yields once, to avoid any scheduler fast // paths for a ready future. if enabled { - b.iter(|| runtime.block_on(log_slow("ready", THRESHOLD, tokio::task::yield_now()))); + b.iter(|| { + runtime.block_on(log_slow( + "ready", + THRESHOLD, + std::pin::pin!(tokio::task::yield_now()), + )) + }); } else { b.iter(|| runtime.block_on(tokio::task::yield_now())); } diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index f37f05692a..0ac8201795 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -331,37 +331,90 @@ impl std::fmt::Debug for SecretString { /// /// TODO: consider upgrading this to a warning, but currently it fires too often. #[inline] -pub async fn log_slow(name: &str, threshold: Duration, f: impl Future) -> O { - // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and - // won't fit on the stack. - let mut f = Box::pin(f); +pub async fn log_slow(name: &str, threshold: Duration, f: std::pin::Pin<&mut F>) -> O +where + F: Future, +{ + monitor_slow_future( + threshold, + threshold, // period = threshold + f, + |MonitorSlowFutureCallback { + ready, + is_slow, + elapsed_total, + elapsed_since_last_callback: _, + }| { + if !is_slow { + return; + } + if ready { + info!( + "slow {name} completed after {:.3}s", + elapsed_total.as_secs_f64() + ); + } else { + info!( + "slow {name} still running after {:.3}s", + elapsed_total.as_secs_f64() + ); + } + }, + ) + .await +} +/// Poll future `fut` to completion, invoking callback `cb` at the given `threshold` and every +/// `period` afterwards, and also unconditionally when the future completes. +#[inline] +pub async fn monitor_slow_future( + threshold: Duration, + period: Duration, + mut fut: std::pin::Pin<&mut F>, + mut cb: impl FnMut(MonitorSlowFutureCallback), +) -> O +where + F: Future, +{ let started = Instant::now(); let mut attempt = 1; - + let mut last_cb = started; loop { // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common // case where the timeout doesn't fire. - let deadline = started + attempt * threshold; - if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await { - // NB: we check if we exceeded the threshold even if the timeout never fired, because - // scheduling or execution delays may cause the future to succeed even if it exceeds the - // timeout. This costs an extra unconditional clock reading, but seems worth it to avoid - // false negatives. - let elapsed = started.elapsed(); - if elapsed >= threshold { - info!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); - } + let deadline = started + threshold + (attempt - 1) * period; + // TODO: still call the callback if the future panics? Copy how we do it for the page_service flush_in_progress counter. + let res = tokio::time::timeout_at(deadline, &mut fut).await; + let now = Instant::now(); + let elapsed_total = now - started; + cb(MonitorSlowFutureCallback { + ready: res.is_ok(), + is_slow: elapsed_total >= threshold, + elapsed_total, + elapsed_since_last_callback: now - last_cb, + }); + last_cb = now; + if let Ok(output) = res { return output; } - - let elapsed = started.elapsed().as_secs_f64(); - info!("slow {name} still running after {elapsed:.3}s",); - attempt += 1; } } +/// See [`monitor_slow_future`]. +pub struct MonitorSlowFutureCallback { + /// Whether the future completed. If true, there will be no more callbacks. + pub ready: bool, + /// Whether the future is taking `>=` the specififed threshold duration to complete. + /// Monotonic: if true in one callback invocation, true in all subsequent onces. + pub is_slow: bool, + /// The time elapsed since the [`monitor_slow_future`] was first polled. + pub elapsed_total: Duration, + /// The time elapsed since the last callback invocation. + /// For the initial callback invocation, the time elapsed since the [`monitor_slow_future`] was first polled. + pub elapsed_since_last_callback: Duration, +} + #[cfg(test)] mod tests { use metrics::IntCounterVec; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index fd90ef8cd7..f7afaae068 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -465,12 +465,40 @@ pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wait_lsn_seconds", - "Time spent waiting for WAL to arrive", + "Time spent waiting for WAL to arrive. Updated on completion of the wait_lsn operation.", CRITICAL_OP_BUCKETS.into(), ) .expect("failed to define a metric") }); +pub(crate) static WAIT_LSN_START_FINISH_COUNTERPAIR: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "pageserver_wait_lsn_started_count", + "Number of wait_lsn operations started.", + "pageserver_wait_lsn_finished_count", + "Number of wait_lsn operations finished.", + &["tenant_id", "shard_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + +pub(crate) static WAIT_LSN_IN_PROGRESS_MICROS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_wait_lsn_in_progress_micros", + "Time spent waiting for WAL to arrive, by timeline_id. Updated periodically while waiting.", + &["tenant_id", "shard_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + +pub(crate) static WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_wait_lsn_in_progress_micros_global", + "Time spent waiting for WAL to arrive, globally. Updated periodically while waiting." + ) + .expect("failed to define a metric") +}); + static FLUSH_WAIT_UPLOAD_TIME: Lazy = Lazy::new(|| { register_gauge_vec!( "pageserver_flush_wait_upload_seconds", @@ -2830,7 +2858,6 @@ impl StorageTimeMetrics { } } -#[derive(Debug)] pub(crate) struct TimelineMetrics { tenant_id: String, shard_id: String, @@ -2863,6 +2890,8 @@ pub(crate) struct TimelineMetrics { pub valid_lsn_lease_count_gauge: UIntGauge, pub wal_records_received: IntCounter, pub storage_io_size: StorageIoSizeMetrics, + pub wait_lsn_in_progress_micros: GlobalAndPerTenantIntCounter, + pub wait_lsn_start_finish_counterpair: IntCounterPair, shutdown: std::sync::atomic::AtomicBool, } @@ -3000,6 +3029,17 @@ impl TimelineMetrics { let storage_io_size = StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id); + let wait_lsn_in_progress_micros = GlobalAndPerTenantIntCounter { + global: WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS.clone(), + per_tenant: WAIT_LSN_IN_PROGRESS_MICROS + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(), + }; + + let wait_lsn_start_finish_counterpair = WAIT_LSN_START_FINISH_COUNTERPAIR + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(); + TimelineMetrics { tenant_id, shard_id, @@ -3032,6 +3072,8 @@ impl TimelineMetrics { storage_io_size, valid_lsn_lease_count_gauge, wal_records_received, + wait_lsn_in_progress_micros, + wait_lsn_start_finish_counterpair, shutdown: std::sync::atomic::AtomicBool::default(), } } @@ -3224,6 +3266,15 @@ impl TimelineMetrics { let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]); } + let _ = + WAIT_LSN_IN_PROGRESS_MICROS.remove_label_values(&[tenant_id, shard_id, timeline_id]); + + { + let mut res = [Ok(()), Ok(())]; + WAIT_LSN_START_FINISH_COUNTERPAIR + .remove_label_values(&mut res, &[tenant_id, shard_id, timeline_id]); + } + let _ = SMGR_QUERY_STARTED_PER_TENANT_TIMELINE.remove_label_values(&[ SmgrQueryType::GetPageAtLsn.into(), tenant_id, @@ -3836,27 +3887,29 @@ pub mod tokio_epoll_uring { }); } +pub(crate) struct GlobalAndPerTenantIntCounter { + global: IntCounter, + per_tenant: IntCounter, +} + +impl GlobalAndPerTenantIntCounter { + #[inline(always)] + pub(crate) fn inc(&self) { + self.inc_by(1) + } + #[inline(always)] + pub(crate) fn inc_by(&self, n: u64) { + self.global.inc_by(n); + self.per_tenant.inc_by(n); + } +} + pub(crate) mod tenant_throttling { - use metrics::{IntCounter, register_int_counter_vec}; + use metrics::register_int_counter_vec; use once_cell::sync::Lazy; use utils::shard::TenantShardId; - pub(crate) struct GlobalAndPerTenantIntCounter { - global: IntCounter, - per_tenant: IntCounter, - } - - impl GlobalAndPerTenantIntCounter { - #[inline(always)] - pub(crate) fn inc(&self) { - self.inc_by(1) - } - #[inline(always)] - pub(crate) fn inc_by(&self, n: u64) { - self.global.inc_by(n); - self.per_tenant.inc_by(n); - } - } + use super::GlobalAndPerTenantIntCounter; pub(crate) struct Metrics { pub(super) count_accounted_start: GlobalAndPerTenantIntCounter, @@ -4102,6 +4155,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) { &CIRCUIT_BREAKERS_BROKEN, &CIRCUIT_BREAKERS_UNBROKEN, &PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL, + &WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS, ] .into_iter() .for_each(|c| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f2d2ab05ad..94571cbaaa 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1106,12 +1106,19 @@ impl PageServerHandler { }; // Dispatch the batch to the appropriate request handler. - let (mut handler_results, span) = log_slow( - batch.as_static_str(), - LOG_SLOW_GETPAGE_THRESHOLD, - self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx), - ) - .await?; + let log_slow_name = batch.as_static_str(); + let (mut handler_results, span) = { + // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and + // won't fit on the stack. + let mut boxpinned = + Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx)); + log_slow( + log_slow_name, + LOG_SLOW_GETPAGE_THRESHOLD, + boxpinned.as_mut(), + ) + .await? + }; // We purposefully don't count flush time into the smgr operation timer. // diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 61542409f7..6cca8cc407 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -67,6 +67,7 @@ use tracing::*; use utils::generation::Generation; use utils::guard_arc_swap::GuardArcSwap; use utils::id::TimelineId; +use utils::logging::{MonitorSlowFutureCallback, monitor_slow_future}; use utils::lsn::{AtomicLsn, Lsn, RecordLsn}; use utils::postgres_client::PostgresClientProtocol; use utils::rate_limit::RateLimit; @@ -439,6 +440,8 @@ pub struct Timeline { heatmap_layers_downloader: Mutex>, pub(crate) rel_size_v2_status: ArcSwapOption, + + wait_lsn_log_slow: tokio::sync::Semaphore, } pub(crate) enum PreviousHeatmap { @@ -1479,17 +1482,67 @@ impl Timeline { WaitLsnTimeout::Default => self.conf.wait_lsn_timeout, }; - let _timer = crate::metrics::WAIT_LSN_TIME.start_timer(); + let timer = crate::metrics::WAIT_LSN_TIME.start_timer(); + let start_finish_counterpair_guard = self.metrics.wait_lsn_start_finish_counterpair.guard(); - match self.last_record_lsn.wait_for_timeout(lsn, timeout).await { + let wait_for_timeout = self.last_record_lsn.wait_for_timeout(lsn, timeout); + let wait_for_timeout = std::pin::pin!(wait_for_timeout); + // Use threshold of 1 because even 1 second of wait for ingest is very much abnormal. + let log_slow_threshold = Duration::from_secs(1); + // Use period of 10 to avoid flooding logs during an outage that affects all timelines. + let log_slow_period = Duration::from_secs(10); + let mut logging_permit = None; + let wait_for_timeout = monitor_slow_future( + log_slow_threshold, + log_slow_period, + wait_for_timeout, + |MonitorSlowFutureCallback { + ready, + is_slow, + elapsed_total, + elapsed_since_last_callback, + }| { + self.metrics + .wait_lsn_in_progress_micros + .inc_by(u64::try_from(elapsed_since_last_callback.as_micros()).unwrap()); + if !is_slow { + return; + } + // It's slow, see if we should log it. + // (We limit the logging to one per invocation per timeline to avoid excessive + // logging during an extended broker / networking outage that affects all timelines.) + if logging_permit.is_none() { + logging_permit = self.wait_lsn_log_slow.try_acquire().ok(); + } + if logging_permit.is_none() { + return; + } + // We log it. + if ready { + info!( + "slow wait_lsn completed after {:.3}s", + elapsed_total.as_secs_f64() + ); + } else { + info!( + "slow wait_lsn still running for {:.3}s", + elapsed_total.as_secs_f64() + ); + } + }, + ); + let res = wait_for_timeout.await; + // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo + drop(logging_permit); + drop(start_finish_counterpair_guard); + drop(timer); + match res { Ok(()) => Ok(()), Err(e) => { use utils::seqwait::SeqWaitError::*; match e { Shutdown => Err(WaitLsnError::Shutdown), Timeout => { - // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo - drop(_timer); let walreceiver_status = self.walreceiver_status(); Err(WaitLsnError::Timeout(format!( "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}", @@ -2821,6 +2874,8 @@ impl Timeline { heatmap_layers_downloader: Mutex::new(None), rel_size_v2_status: ArcSwapOption::from_pointee(rel_size_v2_status), + + wait_lsn_log_slow: tokio::sync::Semaphore::new(1), }; result.repartition_threshold = diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 83a1a87611..54e6458ac6 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -175,6 +175,9 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( counter("pageserver_tenant_throttling_count"), counter("pageserver_timeline_wal_records_received"), counter("pageserver_page_service_pagestream_flush_in_progress_micros"), + counter("pageserver_wait_lsn_in_progress_micros"), + counter("pageserver_wait_lsn_started_count"), + counter("pageserver_wait_lsn_finished_count"), *histogram("pageserver_page_service_batch_size"), *histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,