Compare commits

..

1 Commits

Author SHA1 Message Date
Christian Schwarz
6390b1e8dc hack: support attach-wq 2025-04-25 21:30:41 +02:00
9 changed files with 65 additions and 24 deletions

2
Cargo.lock generated
View File

@@ -7197,7 +7197,6 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"futures",
"nix 0.26.4",
@@ -7808,7 +7807,6 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#781989bb540a1408b0b93daa1e9d1fa452195497"
dependencies = [
"bytes",
"io-uring",

View File

@@ -187,7 +187,7 @@ thiserror = "1.0"
tikv-jemallocator = { version = "0.6", features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
tokio = { version = "1.43.1", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { path = "../tokio-epoll-uring/tokio-epoll-uring" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}

View File

@@ -3867,6 +3867,8 @@ pub mod tokio_epoll_uring {
use metrics::{Histogram, LocalHistogram, UIntGauge, register_histogram, register_int_counter};
use once_cell::sync::Lazy;
use crate::virtual_file::io_engine::TokioEpollUringExtThreadLocalStateId;
/// Shared storage for tokio-epoll-uring thread local metrics.
pub(crate) static THREAD_LOCAL_METRICS_STORAGE: Lazy<ThreadLocalMetricsStorage> =
Lazy::new(|| {
@@ -3886,7 +3888,7 @@ pub mod tokio_epoll_uring {
pub struct ThreadLocalMetricsStorage {
/// List of thread local metrics observers.
observers: Mutex<HashMap<u64, Arc<ThreadLocalMetrics>>>,
observers: Mutex<HashMap<TokioEpollUringExtThreadLocalStateId, Arc<ThreadLocalMetrics>>>,
/// A histogram shared between all thread local systems
/// for collecting slots submission queue depth.
slots_submission_queue_depth: Histogram,
@@ -3908,7 +3910,10 @@ pub mod tokio_epoll_uring {
impl ThreadLocalMetricsStorage {
/// Registers a new thread local system. Returns a thread local metrics observer.
pub fn register_system(&self, id: u64) -> Arc<ThreadLocalMetrics> {
pub fn register_system(
&self,
id: TokioEpollUringExtThreadLocalStateId,
) -> Arc<ThreadLocalMetrics> {
let per_system_metrics = Arc::new(ThreadLocalMetrics::new(
self.slots_submission_queue_depth.local(),
));
@@ -3919,7 +3924,7 @@ pub mod tokio_epoll_uring {
/// Removes metrics observer for a thread local system.
/// This should be called before dropping a thread local system.
pub fn remove_system(&self, id: u64) {
pub fn remove_system(&self, id: TokioEpollUringExtThreadLocalStateId) {
let mut g = self.observers.lock().unwrap();
g.remove(&id);
}

View File

@@ -11,6 +11,8 @@
#[cfg(target_os = "linux")]
pub(super) mod tokio_epoll_uring_ext;
#[cfg(target_os = "linux")]
pub(crate) use tokio_epoll_uring_ext::ThreadLocalStateId as TokioEpollUringExtThreadLocalStateId;
use tokio_epoll_uring::IoBuf;
use tracing::Instrument;

View File

@@ -5,8 +5,9 @@
//! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
//! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use tokio_epoll_uring::{System, SystemHandle};
use tokio_util::sync::CancellationToken;
@@ -22,13 +23,14 @@ struct ThreadLocalState(Arc<ThreadLocalStateInner>);
struct ThreadLocalStateInner {
cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
launch_attempts: AtomicU32,
/// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
thread_local_state_id: u64,
thread_local_state_id: ThreadLocalStateId,
}
impl Drop for ThreadLocalStateInner {
fn drop(&mut self) {
THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
let mut launched_systems = ALL_THREAD_LOCALS.lock().unwrap();
launched_systems.remove(&self.thread_local_state_id);
}
}
@@ -37,7 +39,7 @@ impl ThreadLocalState {
Self(Arc::new(ThreadLocalStateInner {
cell: tokio::sync::OnceCell::default(),
launch_attempts: AtomicU32::new(0),
thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
thread_local_state_id: ThreadLocalStateId::new(),
}))
}
@@ -46,7 +48,23 @@ impl ThreadLocalState {
}
}
static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ThreadLocalStateId(u64);
impl ThreadLocalStateId {
pub fn new() -> Self {
static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
Self(THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed))
}
}
impl std::fmt::Display for ThreadLocalStateId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
static ALL_THREAD_LOCALS: std::sync::LazyLock<
std::sync::Mutex<HashMap<ThreadLocalStateId, Weak<ThreadLocalStateInner>>>,
> = std::sync::LazyLock::new(|| Default::default());
thread_local! {
static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
@@ -76,7 +94,23 @@ pub async fn thread_local_system() -> Handle {
)
.await;
let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
let res = System::launch_with_metrics(per_system_metrics)
let res = System::launch_with_metrics(per_system_metrics,
if utils::env::var("NEON_PAGESERVER_TOKIO_EPOLL_URING_SHARE_IO_WQ_POOL").unwrap_or(false) {
ALL_THREAD_LOCALS.lock().unwrap().values().filter_map(|other: &Weak<ThreadLocalStateInner>| {
let Some(other) = other.upgrade() else {
return None;
};
if other.thread_local_state_id == inner.thread_local_state_id {
return None;
}
let Some(system) = inner.cell.get() else {
return None;
};
Some(system)
}).next()
} else {
None
},)
// this might move us to another executor thread => loop outside the get_or_try_init, not inside it
.await;
match res {

View File

@@ -435,7 +435,7 @@ class NeonEnvBuilder:
pageserver_config_override: str | Callable[[dict[str, Any]], None] | None = None,
num_safekeepers: int = 1,
num_pageservers: int = 1,
num_azs: int = 3,
num_azs: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster

View File

@@ -44,10 +44,6 @@ def test_sharding_smoke(
"""
shard_count = 4
# Single AZ so that a sharded tenant can spread across all pageservers
neon_env_builder.num_azs = 1
neon_env_builder.num_pageservers = shard_count
# 1MiB stripes: enable getting some meaningful data distribution without
@@ -527,13 +523,16 @@ def test_sharding_split_smoke(
# In preferred AZ & other AZ we will end up with one shard per pageserver
neon_env_builder.num_pageservers = split_shard_count * 2
def ps_conf_hook(ps_cfg):
# Two AZs
def assign_az(ps_cfg):
az = f"az-{(ps_cfg['id'] - 1) % 2}"
ps_cfg["availability_zone"] = az
# We will run more pageservers than tests usually do, so give them tiny page caches
# in case we're on a test node under memory pressure.
ps_cfg["page_cache_size"] = 128
neon_env_builder.num_azs = 2
neon_env_builder.pageserver_config_override = ps_conf_hook
neon_env_builder.pageserver_config_override = assign_az
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given

View File

@@ -2212,7 +2212,6 @@ def test_graceful_cluster_restart(
tenants where we fill based on a target shard count.
"""
neon_env_builder.num_azs = num_azs
neon_env_builder.num_pageservers = 2
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
env = neon_env_builder.init_configs()
@@ -2476,6 +2475,7 @@ def test_storage_controller_node_deletion(
Test that deleting a node works & properly reschedules everything that was on the node.
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.num_azs = 3
env = neon_env_builder.init_configs()
env.start()
@@ -3512,7 +3512,12 @@ def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_shard_preferred_azs(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_azs = 2
def assign_az(ps_cfg):
az = f"az-{ps_cfg['id'] % 2}"
log.info("Assigned AZ {az}")
ps_cfg["availability_zone"] = az
neon_env_builder.pageserver_config_override = assign_az
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()

View File

@@ -1186,8 +1186,6 @@ def test_sharded_tad_interleaved_after_partial_success(neon_env_builder: NeonEnv
"""
shard_count = 2
# Single AZ so that a sharded tenant can spread across all pageservers
neon_env_builder.num_azs = 1
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)