From 6390b1e8dc22fa7435d6c816e363789f4891ef91 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 25 Apr 2025 21:29:24 +0200 Subject: [PATCH] hack: support attach-wq --- Cargo.lock | 2 - Cargo.toml | 2 +- pageserver/src/metrics.rs | 11 +++-- pageserver/src/virtual_file/io_engine.rs | 2 + .../io_engine/tokio_epoll_uring_ext.rs | 46 ++++++++++++++++--- 5 files changed, 51 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c464c62b8..06856f16c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7197,7 +7197,6 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497" dependencies = [ "futures", "nix 0.26.4", @@ -7808,7 +7807,6 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497" dependencies = [ "bytes", "io-uring", diff --git a/Cargo.toml b/Cargo.toml index 1c203af9e0..55b0bbe071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -187,7 +187,7 @@ thiserror = "1.0" tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] } tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] } tokio = { version = "1.43.1", features = ["macros"] } -tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } +tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" } tokio-io-timeout = "1.2.0" tokio-postgres-rustls = "0.12.0" tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b16970c911..d5c5107a43 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3867,6 +3867,8 @@ pub mod tokio_epoll_uring { use metrics::{Histogram, LocalHistogram, UIntGauge, register_histogram, register_int_counter}; use once_cell::sync::Lazy; + use crate::virtual_file::io_engine::TokioEpollUringExtThreadLocalStateId; + /// Shared storage for tokio-epoll-uring thread local metrics. pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy = Lazy::new(|| { @@ -3886,7 +3888,7 @@ pub mod tokio_epoll_uring { pub struct ThreadLocalMetricsStorage { /// List of thread local metrics observers. - observers: Mutex>>, + observers: Mutex>>, /// A histogram shared between all thread local systems /// for collecting slots submission queue depth. slots_submission_queue_depth: Histogram, @@ -3908,7 +3910,10 @@ pub mod tokio_epoll_uring { impl ThreadLocalMetricsStorage { /// Registers a new thread local system. Returns a thread local metrics observer. - pub fn register_system(&self, id: u64) -> Arc { + pub fn register_system( + &self, + id: TokioEpollUringExtThreadLocalStateId, + ) -> Arc { let per_system_metrics = Arc::new(ThreadLocalMetrics::new( self.slots_submission_queue_depth.local(), )); @@ -3919,7 +3924,7 @@ pub mod tokio_epoll_uring { /// Removes metrics observer for a thread local system. /// This should be called before dropping a thread local system. - pub fn remove_system(&self, id: u64) { + pub fn remove_system(&self, id: TokioEpollUringExtThreadLocalStateId) { let mut g = self.observers.lock().unwrap(); g.remove(&id); } diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index dd04fb561a..8dc26f4121 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -11,6 +11,8 @@ #[cfg(target_os = "linux")] pub(super) mod tokio_epoll_uring_ext; +#[cfg(target_os = "linux")] +pub(crate) use tokio_epoll_uring_ext::ThreadLocalStateId as TokioEpollUringExtThreadLocalStateId; use tokio_epoll_uring::IoBuf; use tracing::Instrument; diff --git a/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs index ad17405b64..5be92eceec 100644 --- a/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs +++ b/pageserver/src/virtual_file/io_engine/tokio_epoll_uring_ext.rs @@ -5,8 +5,9 @@ //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series. //! See for more details. -use std::sync::Arc; +use std::collections::HashMap; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::{Arc, Weak}; use tokio_epoll_uring::{System, SystemHandle}; use tokio_util::sync::CancellationToken; @@ -22,13 +23,14 @@ struct ThreadLocalState(Arc); struct ThreadLocalStateInner { cell: tokio::sync::OnceCell>, launch_attempts: AtomicU32, - /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`] - thread_local_state_id: u64, + thread_local_state_id: ThreadLocalStateId, } impl Drop for ThreadLocalStateInner { fn drop(&mut self) { THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id); + let mut launched_systems = ALL_THREAD_LOCALS.lock().unwrap(); + launched_systems.remove(&self.thread_local_state_id); } } @@ -37,7 +39,7 @@ impl ThreadLocalState { Self(Arc::new(ThreadLocalStateInner { cell: tokio::sync::OnceCell::default(), launch_attempts: AtomicU32::new(0), - thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed), + thread_local_state_id: ThreadLocalStateId::new(), })) } @@ -46,7 +48,23 @@ impl ThreadLocalState { } } -static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ThreadLocalStateId(u64); +impl ThreadLocalStateId { + pub fn new() -> Self { + static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0); + Self(THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed)) + } +} +impl std::fmt::Display for ThreadLocalStateId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +static ALL_THREAD_LOCALS: std::sync::LazyLock< + std::sync::Mutex>>, +> = std::sync::LazyLock::new(|| Default::default()); thread_local! { static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new(); @@ -76,7 +94,23 @@ pub async fn thread_local_system() -> Handle { ) .await; let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id); - let res = System::launch_with_metrics(per_system_metrics) + let res = System::launch_with_metrics(per_system_metrics, + if utils::env::var("NEON_PAGESERVER_TOKIO_EPOLL_URING_SHARE_IO_WQ_POOL").unwrap_or(false) { + ALL_THREAD_LOCALS.lock().unwrap().values().filter_map(|other: &Weak| { + let Some(other) = other.upgrade() else { + return None; + }; + if other.thread_local_state_id == inner.thread_local_state_id { + return None; + } + let Some(system) = inner.cell.get() else { + return None; + }; + Some(system) + }).next() + } else { + None + },) // this might move us to another executor thread => loop outside the get_or_try_init, not inside it .await; match res {