diff --git a/Cargo.lock b/Cargo.lock index ad29fa4634..7fa5df29fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6272,7 +6272,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168" dependencies = [ "futures", "nix 0.26.4", @@ -6788,7 +6788,7 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#08ccfa94ff5507727bf4d8d006666b5b192e04c6" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#cb2dcea2058034bc209e7917b01c5097712a3168" dependencies = [ "bytes", "io-uring", diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index cd4526c089..64e56cb691 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -19,6 +19,7 @@ use once_cell::sync::Lazy; use prometheus::core::{ Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, }; +pub use prometheus::local::LocalHistogram; pub use prometheus::opts; pub use prometheus::register; pub use prometheus::Error; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 8f697558d6..1473729186 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3040,13 +3040,111 @@ impl>, O, E> Future for MeasuredRemoteOp { } pub mod tokio_epoll_uring { - use metrics::{register_int_counter, UIntGauge}; + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; + + use metrics::{register_histogram, register_int_counter, Histogram, LocalHistogram, UIntGauge}; use once_cell::sync::Lazy; + /// Shared storage for tokio-epoll-uring thread local metrics. + pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy = + Lazy::new(|| { + let slots_submission_queue_depth = register_histogram!( + "pageserver_tokio_epoll_uring_slots_submission_queue_depth", + "The slots waiters queue depth of each tokio_epoll_uring system", + vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0], + ) + .expect("failed to define a metric"); + ThreadLocalMetricsStorage { + observers: Mutex::new(HashMap::new()), + slots_submission_queue_depth, + } + }); + + pub struct ThreadLocalMetricsStorage { + /// List of thread local metrics observers. + observers: Mutex>>, + /// A histogram shared between all thread local systems + /// for collecting slots submission queue depth. + slots_submission_queue_depth: Histogram, + } + + /// Each thread-local [`tokio_epoll_uring::System`] gets one of these as its + /// [`tokio_epoll_uring::metrics::PerSystemMetrics`] generic. + /// + /// The System makes observations into [`Self`] and periodically, the collector + /// comes along and flushes [`Self`] into the shared storage [`THREAD_LOCAL_METRICS_STORAGE`]. + /// + /// [`LocalHistogram`] is `!Send`, so, we need to put it behind a [`Mutex`]. + /// But except for the periodic flush, the lock is uncontended so there's no waiting + /// for cache coherence protocol to get an exclusive cache line. + pub struct ThreadLocalMetrics { + /// Local observer of thread local tokio-epoll-uring system's slots waiters queue depth. + slots_submission_queue_depth: Mutex, + } + + impl ThreadLocalMetricsStorage { + /// Registers a new thread local system. Returns a thread local metrics observer. + pub fn register_system(&self, id: u64) -> Arc { + let per_system_metrics = Arc::new(ThreadLocalMetrics::new( + self.slots_submission_queue_depth.local(), + )); + let mut g = self.observers.lock().unwrap(); + g.insert(id, Arc::clone(&per_system_metrics)); + per_system_metrics + } + + /// Removes metrics observer for a thread local system. + /// This should be called before dropping a thread local system. + pub fn remove_system(&self, id: u64) { + let mut g = self.observers.lock().unwrap(); + g.remove(&id); + } + + /// Flush all thread local metrics to the shared storage. + pub fn flush_thread_local_metrics(&self) { + let g = self.observers.lock().unwrap(); + g.values().for_each(|local| { + local.flush(); + }); + } + } + + impl ThreadLocalMetrics { + pub fn new(slots_submission_queue_depth: LocalHistogram) -> Self { + ThreadLocalMetrics { + slots_submission_queue_depth: Mutex::new(slots_submission_queue_depth), + } + } + + /// Flushes the thread local metrics to shared aggregator. + pub fn flush(&self) { + let Self { + slots_submission_queue_depth, + } = self; + slots_submission_queue_depth.lock().unwrap().flush(); + } + } + + impl tokio_epoll_uring::metrics::PerSystemMetrics for ThreadLocalMetrics { + fn observe_slots_submission_queue_depth(&self, queue_depth: u64) { + let Self { + slots_submission_queue_depth, + } = self; + slots_submission_queue_depth + .lock() + .unwrap() + .observe(queue_depth as f64); + } + } + pub struct Collector { descs: Vec, systems_created: UIntGauge, systems_destroyed: UIntGauge, + thread_local_metrics_storage: &'static ThreadLocalMetricsStorage, } impl metrics::core::Collector for Collector { @@ -3056,7 +3154,7 @@ pub mod tokio_epoll_uring { fn collect(&self) -> Vec { let mut mfs = Vec::with_capacity(Self::NMETRICS); - let tokio_epoll_uring::metrics::Metrics { + let tokio_epoll_uring::metrics::GlobalMetrics { systems_created, systems_destroyed, } = tokio_epoll_uring::metrics::global(); @@ -3064,12 +3162,21 @@ pub mod tokio_epoll_uring { mfs.extend(self.systems_created.collect()); self.systems_destroyed.set(systems_destroyed); mfs.extend(self.systems_destroyed.collect()); + + self.thread_local_metrics_storage + .flush_thread_local_metrics(); + + mfs.extend( + self.thread_local_metrics_storage + .slots_submission_queue_depth + .collect(), + ); mfs } } impl Collector { - const NMETRICS: usize = 2; + const NMETRICS: usize = 3; #[allow(clippy::new_without_default)] pub fn new() -> Self { @@ -3101,6 +3208,7 @@ pub mod tokio_epoll_uring { descs, systems_created, systems_destroyed, + thread_local_metrics_storage: &THREAD_LOCAL_METRICS_STORAGE, } } } @@ -3460,6 +3568,7 @@ pub fn preinitialize_metrics() { Lazy::force(&RECONSTRUCT_TIME); Lazy::force(&BASEBACKUP_QUERY_TIME); Lazy::force(&COMPUTE_COMMANDS_COUNTERS); + Lazy::force(&tokio_epoll_uring::THREAD_LOCAL_METRICS_STORAGE); tenant_throttling::preinitialize_global_metrics(); } diff --git a/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs index 6ea19d6b2d..c67215492f 100644 --- a/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs +++ b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs @@ -16,18 +16,24 @@ use tokio_epoll_uring::{System, SystemHandle}; use crate::virtual_file::on_fatal_io_error; -use crate::metrics::tokio_epoll_uring as metrics; +use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE}; #[derive(Clone)] struct ThreadLocalState(Arc); struct ThreadLocalStateInner { - cell: tokio::sync::OnceCell, + cell: tokio::sync::OnceCell>, launch_attempts: AtomicU32, /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`] thread_local_state_id: u64, } +impl Drop for ThreadLocalStateInner { + fn drop(&mut self) { + THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id); + } +} + impl ThreadLocalState { pub fn new() -> Self { Self(Arc::new(ThreadLocalStateInner { @@ -71,7 +77,8 @@ pub async fn thread_local_system() -> Handle { &fake_cancel, ) .await; - let res = System::launch() + let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id); + let res = System::launch_with_metrics(per_system_metrics) // this might move us to another executor thread => loop outside the get_or_try_init, not inside it .await; match res { @@ -86,6 +93,7 @@ pub async fn thread_local_system() -> Handle { emit_launch_failure_process_stats(); }); metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc(); + metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id); Err(()) } // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere. @@ -115,7 +123,7 @@ fn emit_launch_failure_process_stats() { // number of threads // rss / system memory usage generally - let tokio_epoll_uring::metrics::Metrics { + let tokio_epoll_uring::metrics::GlobalMetrics { systems_created, systems_destroyed, } = tokio_epoll_uring::metrics::global(); @@ -182,7 +190,7 @@ fn emit_launch_failure_process_stats() { pub struct Handle(ThreadLocalState); impl std::ops::Deref for Handle { - type Target = SystemHandle; + type Target = SystemHandle; fn deref(&self) -> &Self::Target { self.0 diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index e056ea77d4..39c8f70a9c 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -150,6 +150,7 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = ( counter("pageserver_tenant_throttling_count_accounted_finish_global"), counter("pageserver_tenant_throttling_wait_usecs_sum_global"), counter("pageserver_tenant_throttling_count_global"), + *histogram("pageserver_tokio_epoll_uring_slots_submission_queue_depth"), ) PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (