mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
pageserver: option to run with just one tokio runtime (#7331)
This PR is an off-by-default revision v2 of the (since-reverted) PR #6555 / commit `3220f830b7fbb785d6db8a93775f46314f10a99b`. See that PR for details on why running with a single runtime is desirable and why we should be ready. We reverted #6555 because it showed regressions in prodlike cloudbench, see the revert commit message `ad072de4209193fd21314cf7f03f14df4fa55eb1` for more context. This PR makes it an opt-in choice via an env var. The default is to use the 4 separate runtimes that we have today, there shouldn't be any performance change. I tested manually that the env var & added metric works. ``` # undefined env var => no change to before this PR, uses 4 runtimes ./target/debug/neon_local start # defining the env var enables one-runtime mode, value defines that one runtime's configuration NEON_PAGESERVER_USE_ONE_RUNTIME=current_thread ./target/debug/neon_local start NEON_PAGESERVER_USE_ONE_RUNTIME=multi_thread:1 ./target/debug/neon_local start NEON_PAGESERVER_USE_ONE_RUNTIME=multi_thread:2 ./target/debug/neon_local start NEON_PAGESERVER_USE_ONE_RUNTIME=multi_thread:default ./target/debug/neon_local start ``` I want to use this change to do more manualy testing and potentially testing in staging. Future Work ----------- Testing / deployment ergonomics would be better if this were a variable in `pageserver.toml`. It can be done, but, I don't need it right now, so let's stick with the env var.
This commit is contained in:
committed by
GitHub
parent
47b705cffe
commit
1081a4d246
@@ -86,7 +86,10 @@ where
|
||||
.stdout(process_log_file)
|
||||
.stderr(same_file_for_stderr)
|
||||
.args(args);
|
||||
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
|
||||
|
||||
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
|
||||
fill_rust_env_vars(background_command),
|
||||
));
|
||||
filled_cmd.envs(envs);
|
||||
|
||||
let pid_file_to_check = match &initial_pid_file {
|
||||
@@ -268,6 +271,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
cmd
|
||||
}
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_PAGESERVER_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
cmd
|
||||
}
|
||||
|
||||
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
|
||||
/// 1. Claims a pidfile with a fcntl lock on it and
|
||||
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)
|
||||
|
||||
21
libs/utils/src/env.rs
Normal file
21
libs/utils/src/env.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
//! Wrapper around `std::env::var` for parsing environment variables.
|
||||
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
|
||||
pub fn var<V, E>(varname: &str) -> Option<V>
|
||||
where
|
||||
V: FromStr<Err = E>,
|
||||
E: Display,
|
||||
{
|
||||
match std::env::var(varname) {
|
||||
Ok(s) => Some(
|
||||
s.parse()
|
||||
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
|
||||
.unwrap(),
|
||||
),
|
||||
Err(std::env::VarError::NotPresent) => None,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {varname} is not unicode")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -89,6 +89,8 @@ pub mod yielding_loop;
|
||||
|
||||
pub mod zstd;
|
||||
|
||||
pub mod env;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
@@ -2100,6 +2100,7 @@ pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
|
||||
use futures::Future;
|
||||
use pin_project_lite::pin_project;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
@@ -2669,6 +2670,26 @@ pub(crate) mod disk_usage_based_eviction {
|
||||
pub(crate) static METRICS: Lazy<Metrics> = Lazy::new(Metrics::default);
|
||||
}
|
||||
|
||||
static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tokio_executor_thread_configured_count",
|
||||
"Total number of configued tokio executor threads in the process.
|
||||
The `setup` label denotes whether we're running with multiple runtimes or a single runtime.",
|
||||
&["setup"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
|
||||
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||
let _guard = SERIALIZE.lock().unwrap();
|
||||
TOKIO_EXECUTOR_THREAD_COUNT.reset();
|
||||
TOKIO_EXECUTOR_THREAD_COUNT
|
||||
.get_metric_with_label_values(&[setup])
|
||||
.unwrap()
|
||||
.set(u64::try_from(num_threads.get()).unwrap());
|
||||
}
|
||||
|
||||
pub fn preinitialize_metrics() {
|
||||
// Python tests need these and on some we do alerting.
|
||||
//
|
||||
|
||||
@@ -33,13 +33,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::task_local;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -48,8 +49,11 @@ use tracing::{debug, error, info, warn};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use utils::env;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::metrics::set_tokio_runtime_setup;
|
||||
|
||||
//
|
||||
// There are four runtimes:
|
||||
//
|
||||
@@ -98,52 +102,119 @@ use utils::id::TimelineId;
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
});
|
||||
|
||||
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
});
|
||||
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
pub(crate) static TOKIO_WORKER_THREADS: Lazy<NonZeroUsize> = Lazy::new(|| {
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
// this will be wrong if any of the runtimes gets their worker threads configured to something
|
||||
// else, but that has not been needed in a long time.
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
|
||||
NonZeroUsize::new(
|
||||
std::env::var("TOKIO_WORKER_THREADS")
|
||||
.map(|s| s.parse::<usize>().unwrap())
|
||||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get())),
|
||||
)
|
||||
.expect("the max() ensures that this is not zero")
|
||||
});
|
||||
|
||||
enum TokioRuntimeMode {
|
||||
SingleThreaded,
|
||||
MultiThreaded { num_workers: NonZeroUsize },
|
||||
}
|
||||
|
||||
impl FromStr for TokioRuntimeMode {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s {
|
||||
"current_thread" => Ok(TokioRuntimeMode::SingleThreaded),
|
||||
s => match s.strip_prefix("multi_thread:") {
|
||||
Some("default") => Ok(TokioRuntimeMode::MultiThreaded {
|
||||
num_workers: *TOKIO_WORKER_THREADS,
|
||||
}),
|
||||
Some(suffix) => {
|
||||
let num_workers = suffix.parse::<NonZeroUsize>().map_err(|e| {
|
||||
format!(
|
||||
"invalid number of multi-threaded runtime workers ({suffix:?}): {e}",
|
||||
)
|
||||
})?;
|
||||
Ok(TokioRuntimeMode::MultiThreaded { num_workers })
|
||||
}
|
||||
None => Err(format!("invalid runtime config: {s:?}")),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
|
||||
let thread_name = "pageserver-tokio";
|
||||
let Some(mode) = env::var("NEON_PAGESERVER_USE_ONE_RUNTIME") else {
|
||||
// If the env var is not set, leave this static as None.
|
||||
set_tokio_runtime_setup(
|
||||
"multiple-runtimes",
|
||||
NUM_MULTIPLE_RUNTIMES
|
||||
.checked_mul(*TOKIO_WORKER_THREADS)
|
||||
.unwrap(),
|
||||
);
|
||||
return None;
|
||||
};
|
||||
Some(match mode {
|
||||
TokioRuntimeMode::SingleThreaded => {
|
||||
set_tokio_runtime_setup("one-runtime-single-threaded", NonZeroUsize::new(1).unwrap());
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.thread_name(thread_name)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create one single runtime")
|
||||
}
|
||||
TokioRuntimeMode::MultiThreaded { num_workers } => {
|
||||
set_tokio_runtime_setup("one-runtime-multi-threaded", num_workers);
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name(thread_name)
|
||||
.enable_all()
|
||||
.worker_threads(num_workers.get())
|
||||
.build()
|
||||
.expect("failed to create one multi-threaded runtime")
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
/// Declare a lazy static variable named `$varname` that will resolve
|
||||
/// to a tokio runtime handle. If the env var `NEON_PAGESERVER_USE_ONE_RUNTIME`
|
||||
/// is set, this will resolve to `ONE_RUNTIME`. Otherwise, the macro invocation
|
||||
/// declares a separate runtime and the lazy static variable `$varname`
|
||||
/// will resolve to that separate runtime.
|
||||
///
|
||||
/// The result is is that `$varname.spawn()` will use `ONE_RUNTIME` if
|
||||
/// `NEON_PAGESERVER_USE_ONE_RUNTIME` is set, and will use the separate runtime
|
||||
/// otherwise.
|
||||
macro_rules! pageserver_runtime {
|
||||
($varname:ident, $name:literal) => {
|
||||
pub static $varname: Lazy<&'static tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
if let Some(runtime) = &*ONE_RUNTIME {
|
||||
return runtime;
|
||||
}
|
||||
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name($name)
|
||||
.worker_threads(TOKIO_WORKER_THREADS.get())
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect(std::concat!("Failed to create runtime ", $name))
|
||||
});
|
||||
&*RUNTIME
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
pageserver_runtime!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
|
||||
pageserver_runtime!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
pageserver_runtime!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
pageserver_runtime!(BACKGROUND_RUNTIME, "background op worker");
|
||||
// Bump this number when adding a new pageserver_runtime!
|
||||
// SAFETY: it's obviously correct
|
||||
const NUM_MULTIPLE_RUNTIMES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(4) };
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{backoff, completion};
|
||||
|
||||
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let total_threads = task_mgr::TOKIO_WORKER_THREADS.get();
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
@@ -72,6 +72,7 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
|
||||
);
|
||||
|
||||
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
|
||||
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
|
||||
Ok(permit) => permit,
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
|
||||
Reference in New Issue
Block a user