hack: support attach-wq

This commit is contained in:
Christian Schwarz
2025-04-25 21:29:24 +02:00
parent 902d361107
commit 6390b1e8dc
5 changed files with 51 additions and 12 deletions

2
Cargo.lock generated
View File

@@ -7197,7 +7197,6 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"futures",
"nix 0.26.4",
@@ -7808,7 +7807,6 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"bytes",
"io-uring",

View File

@@ -187,7 +187,7 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}

View File

@@ -3867,6 +3867,8 @@ pub mod tokio_epoll_uring {
use metrics::{Histogram, LocalHistogram, UIntGauge, register_histogram, register_int_counter};
use once_cell::sync::Lazy;
use crate::virtual_file::io_engine::TokioEpollUringExtThreadLocalStateId;
/// Shared storage for tokio-epoll-uring thread local metrics.
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
Lazy::new(|| {
@@ -3886,7 +3888,7 @@ pub mod tokio_epoll_uring {
pub struct ThreadLocalMetricsStorage {
/// List of thread local metrics observers.
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
observers: Mutex<HashMap<TokioEpollUringExtThreadLocalStateId, Arc<ThreadLocalMetrics>>>,
/// A histogram shared between all thread local systems
/// for collecting slots submission queue depth.
slots_submission_queue_depth: Histogram,
@@ -3908,7 +3910,10 @@ pub mod tokio_epoll_uring {
impl ThreadLocalMetricsStorage {
/// Registers a new thread local system. Returns a thread local metrics observer.
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
pub fn register_system(
&self,
id: TokioEpollUringExtThreadLocalStateId,
) -> Arc<ThreadLocalMetrics> {
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
self.slots_submission_queue_depth.local(),
));
@@ -3919,7 +3924,7 @@ pub mod tokio_epoll_uring {
/// Removes metrics observer for a thread local system.
/// This should be called before dropping a thread local system.
pub fn remove_system(&self, id: u64) {
pub fn remove_system(&self, id: TokioEpollUringExtThreadLocalStateId) {
let mut g = self.observers.lock().unwrap();
g.remove(&id);
}

View File

@@ -11,6 +11,8 @@
#[cfg(target_os = "linux")]
pub(super) mod tokio_epoll_uring_ext;
#[cfg(target_os = "linux")]
pub(crate) use tokio_epoll_uring_ext::ThreadLocalStateId as TokioEpollUringExtThreadLocalStateId;
use tokio_epoll_uring::IoBuf;
use tracing::Instrument;

View File

@@ -5,8 +5,9 @@
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use tokio_epoll_uring::{System, SystemHandle};
use tokio_util::sync::CancellationToken;
@@ -22,13 +23,14 @@ struct ThreadLocalState(Arc<ThreadLocalStateInner>);
struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
thread_local_state_id: ThreadLocalStateId,
}
impl Drop for ThreadLocalStateInner {
fn drop(&mut self) {
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
let mut launched_systems = ALL_THREAD_LOCALS.lock().unwrap();
launched_systems.remove(&self.thread_local_state_id);
}
}
@@ -37,7 +39,7 @@ impl ThreadLocalState {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
thread_local_state_id: ThreadLocalStateId::new(),
}))
}
@@ -46,7 +48,23 @@ impl ThreadLocalState {
}
}
static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ThreadLocalStateId(u64);
impl ThreadLocalStateId {
pub fn new() -> Self {
static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
Self(THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed))
}
}
impl std::fmt::Display for ThreadLocalStateId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
static ALL_THREAD_LOCALS: std::sync::LazyLock<
std::sync::Mutex<HashMap<ThreadLocalStateId, Weak<ThreadLocalStateInner>>>,
> = std::sync::LazyLock::new(|| Default::default());
thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
@@ -76,7 +94,23 @@ pub async fn thread_local_system() -> Handle {
)
.await;
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
let res = System::launch_with_metrics(per_system_metrics)
let res = System::launch_with_metrics(per_system_metrics,
if utils::env::var("NEON_PAGESERVER_TOKIO_EPOLL_URING_SHARE_IO_WQ_POOL").unwrap_or(false) {
ALL_THREAD_LOCALS.lock().unwrap().values().filter_map(|other: &Weak<ThreadLocalStateInner>| {
let Some(other) = other.upgrade() else {
return None;
};
if other.thread_local_state_id == inner.thread_local_state_id {
return None;
}
let Some(system) = inner.cell.get() else {
return None;
};
Some(system)
}).next()
} else {
None
},)
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {