mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
rename "single runtime" to "one runtime", allow configuring current_thread and multi_thread:$num_workers
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
|
||||
pub fn var<V, E, D>(varname: &str, default: D) -> V
|
||||
pub fn var_or_else<V, E, D>(varname: &str, default: D) -> V
|
||||
where
|
||||
V: FromStr<Err = E>,
|
||||
E: Display,
|
||||
@@ -18,6 +18,24 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn var<V, E>(varname: &str) -> Option<V>
|
||||
where
|
||||
V: FromStr<Err = E>,
|
||||
E: Display,
|
||||
{
|
||||
match std::env::var(varname) {
|
||||
Ok(s) => Some(
|
||||
s.parse()
|
||||
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
|
||||
.unwrap(),
|
||||
),
|
||||
Err(std::env::VarError::NotPresent) => None,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {varname} is not unicode")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Bool(bool);
|
||||
|
||||
impl Bool {
|
||||
|
||||
@@ -33,7 +33,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@@ -99,58 +101,107 @@ use utils::id::TimelineId;
|
||||
// happen, but still.
|
||||
//
|
||||
|
||||
static USE_SINGLE_RUNTIME: Lazy<bool> = Lazy::new(|| {
|
||||
env_config::var("NEON_PAGESERVER_USE_SINGLE_RUNTIME", || {
|
||||
env_config::Bool::new(false)
|
||||
})
|
||||
.into()
|
||||
});
|
||||
|
||||
static SINGLE_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("pageserver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create single runtime")
|
||||
});
|
||||
|
||||
macro_rules! single_runtime_or_multi_thread_enable_all {
|
||||
($varname:ident, $name:literal) => {
|
||||
pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| {
|
||||
if *USE_SINGLE_RUNTIME {
|
||||
SINGLE_RUNTIME.handle()
|
||||
} else {
|
||||
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name($name)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect(std::concat!("Failed to create runtime ", $name))
|
||||
});
|
||||
RUNTIME.handle()
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
single_runtime_or_multi_thread_enable_all!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
|
||||
single_runtime_or_multi_thread_enable_all!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
single_runtime_or_multi_thread_enable_all!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
// if you change the number of worker threads please change the constant below
|
||||
single_runtime_or_multi_thread_enable_all!(BACKGROUND_RUNTIME, "background op worker");
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = *BACKGROUND_RUNTIME;
|
||||
pub(crate) static TOKIO_WORKER_THREADS: Lazy<NonZeroUsize> = Lazy::new(|| {
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
// this will be wrong if any of the runtimes gets their worker threads configured to something
|
||||
// else, but that has not been needed in a long time.
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
|
||||
NonZeroUsize::new(
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get())),
|
||||
)
|
||||
.expect("the max() ensures that this is not zero")
|
||||
});
|
||||
|
||||
enum TokioRuntimeMode {
|
||||
SingleThreaded,
|
||||
MultiThreaded { num_workers: NonZeroUsize },
|
||||
}
|
||||
|
||||
impl FromStr for TokioRuntimeMode {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"current_thread" => Ok(TokioRuntimeMode::SingleThreaded),
|
||||
"multi_thread:default" => Ok(TokioRuntimeMode::MultiThreaded {
|
||||
num_workers: *TOKIO_WORKER_THREADS,
|
||||
}),
|
||||
s => match s.strip_prefix("multi:") {
|
||||
Some(suffix) => {
|
||||
let num_workers = suffix.parse::<NonZeroUsize>().map_err(|e| {
|
||||
format!(
|
||||
"invalid number of multi-threaded runtime workers ({suffix:?}): {e}",
|
||||
)
|
||||
})?;
|
||||
Ok(TokioRuntimeMode::MultiThreaded { num_workers })
|
||||
}
|
||||
None => Err(format!("invalid runtime config: {}", s)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
|
||||
let thread_name = "pageserver worker";
|
||||
let Some(mode) = env_config::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
|
||||
// If the env var is not set, leave this static as None.
|
||||
// The single_
|
||||
return None;
|
||||
};
|
||||
Some(match mode {
|
||||
TokioRuntimeMode::SingleThreaded => tokio::runtime::Builder::new_current_thread()
|
||||
.thread_name(thread_name)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create single runtime"),
|
||||
TokioRuntimeMode::MultiThreaded { num_workers } => {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name(thread_name)
|
||||
.enable_all()
|
||||
.worker_threads(num_workers.get())
|
||||
.build()
|
||||
.expect("failed to create single runtime")
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
/// Declare a lazy static variable named `$varname` that will resolve
|
||||
/// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME`
|
||||
/// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation
|
||||
/// declares a separate runtime and the lazy static variable `$varname`
|
||||
/// will resolve to that separate runtime.
|
||||
///
|
||||
/// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if
|
||||
/// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime
|
||||
/// otherwise.
|
||||
macro_rules! pageserver_runtime {
|
||||
($varname:ident, $name:literal) => {
|
||||
pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| {
|
||||
if let Some(runtime) = &*ONE_RUNTIME {
|
||||
return runtime.handle();
|
||||
}
|
||||
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name($name)
|
||||
.worker_threads(TOKIO_WORKER_THREADS.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect(std::concat!("Failed to create runtime ", $name))
|
||||
});
|
||||
RUNTIME.handle()
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
|
||||
pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
// if you change the number of worker threads please change the constant below
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{backoff, completion};
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
@@ -72,6 +72,7 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
|
||||
);
|
||||
|
||||
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
|
||||
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
|
||||
Ok(permit) => permit,
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
|
||||
Reference in New Issue
Block a user