mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
pageserver: ability to use a single runtime
This PR allows running the pageserver with a single tokio runtime.
This commit is contained in:
@@ -26,8 +26,8 @@ pub mod auth;
|
||||
// utility functions and helper traits for unified unique id generation/serialization etc.
|
||||
pub mod id;
|
||||
|
||||
mod hex;
|
||||
pub mod env_config;
|
||||
mod hex;
|
||||
pub use hex::Hex;
|
||||
|
||||
// http endpoint utils
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
use std::{
|
||||
io::BufWriter,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
use super::env_config;
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
|
||||
@@ -391,7 +391,7 @@ fn start_pageserver(
|
||||
conf,
|
||||
);
|
||||
if let Some(deletion_workers) = deletion_workers {
|
||||
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
|
||||
deletion_workers.spawn_with(*BACKGROUND_RUNTIME);
|
||||
}
|
||||
|
||||
// Up to this point no significant I/O has been done: this should have been fast. Record
|
||||
@@ -569,7 +569,7 @@ fn start_pageserver(
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
|
||||
task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
*MGMT_REQUEST_RUNTIME,
|
||||
TaskKind::HttpEndpointListener,
|
||||
None,
|
||||
None,
|
||||
@@ -594,7 +594,7 @@ fn start_pageserver(
|
||||
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
|
||||
|
||||
task_mgr::spawn(
|
||||
crate::BACKGROUND_RUNTIME.handle(),
|
||||
*crate::BACKGROUND_RUNTIME,
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
@@ -647,7 +647,7 @@ fn start_pageserver(
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
COMPUTE_REQUEST_RUNTIME.handle(),
|
||||
*COMPUTE_REQUEST_RUNTIME,
|
||||
TaskKind::LibpqEndpointListener,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -64,7 +64,7 @@ pub async fn collect_metrics(
|
||||
let worker_ctx =
|
||||
ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::CalculateSyntheticSize,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -201,7 +201,7 @@ pub fn launch_disk_usage_global_eviction_task(
|
||||
info!("launching disk usage based eviction task");
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -39,7 +39,6 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::task_local;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -48,6 +47,7 @@ use tracing::{debug, error, info, warn};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use utils::env_config;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
//
|
||||
@@ -98,42 +98,49 @@ use utils::id::TimelineId;
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
|
||||
static USE_SINGLE_RUNTIME: Lazy<bool> = Lazy::new(|| {
|
||||
env_config::var("NEON_PAGESERVER_USE_SINGLE_RUNTIME", || {
|
||||
env_config::Bool::new(false)
|
||||
})
|
||||
.into()
|
||||
});
|
||||
|
||||
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
static SINGLE_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.thread_name("pageserver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
.expect("failed to create single runtime")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
});
|
||||
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("background op worker")
|
||||
// if you change the number of worker threads please change the constant below
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create background op runtime")
|
||||
});
|
||||
macro_rules! single_runtime_or_multi_thread_enable_all {
|
||||
($varname:ident, $name:literal) => {
|
||||
pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| {
|
||||
if *USE_SINGLE_RUNTIME {
|
||||
SINGLE_RUNTIME.handle()
|
||||
} else {
|
||||
static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name($name)
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect(std::concat!("Failed to create runtime ", $name))
|
||||
});
|
||||
RUNTIME.handle()
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
single_runtime_or_multi_thread_enable_all!(COMPUTE_REQUEST_RUNTIME, "compute request worker");
|
||||
single_runtime_or_multi_thread_enable_all!(MGMT_REQUEST_RUNTIME, "mgmt request worker");
|
||||
single_runtime_or_multi_thread_enable_all!(WALRECEIVER_RUNTIME, "walreceiver worker");
|
||||
// if you change the number of worker threads please change the constant below
|
||||
single_runtime_or_multi_thread_enable_all!(BACKGROUND_RUNTIME, "background op worker");
|
||||
pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(|| {
|
||||
// force init and thus panics
|
||||
let _ = BACKGROUND_RUNTIME.handle();
|
||||
let _ = *BACKGROUND_RUNTIME;
|
||||
// replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly
|
||||
// tokio would had already panicked for parsing errors or NotUnicode
|
||||
//
|
||||
|
||||
@@ -485,7 +485,7 @@ impl DeleteTenantFlow {
|
||||
let tenant_shard_id = tenant.tenant_shard_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
|
||||
@@ -1849,7 +1849,7 @@ impl TenantManager {
|
||||
let task_tenant_id = None;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
TaskKind::MgmtRequest,
|
||||
task_tenant_id,
|
||||
None,
|
||||
|
||||
@@ -341,7 +341,7 @@ impl RemoteTimelineClient {
|
||||
// remote_timeline_client.rs tests rely on current-thread runtime
|
||||
tokio::runtime::Handle::current()
|
||||
} else {
|
||||
BACKGROUND_RUNTIME.handle().clone()
|
||||
BACKGROUND_RUNTIME.clone()
|
||||
},
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
|
||||
@@ -317,7 +317,7 @@ pub fn spawn_tasks(
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::SecondaryDownloads,
|
||||
None,
|
||||
None,
|
||||
@@ -338,7 +338,7 @@ pub fn spawn_tasks(
|
||||
);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::SecondaryUploads,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -85,7 +85,7 @@ pub fn start_background_loops(
|
||||
) {
|
||||
let tenant_shard_id = tenant.tenant_shard_id;
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
@@ -109,7 +109,7 @@ pub fn start_background_loops(
|
||||
},
|
||||
);
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::GarbageCollector,
|
||||
Some(tenant_shard_id),
|
||||
None,
|
||||
|
||||
@@ -1962,7 +1962,7 @@ impl Timeline {
|
||||
initdb_optimization_count: 0,
|
||||
};
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
task_mgr::TaskKind::LayerFlushTask,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -2324,7 +2324,7 @@ impl Timeline {
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
task_mgr::TaskKind::InitialLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -2502,7 +2502,7 @@ impl Timeline {
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -4484,7 +4484,7 @@ impl Timeline {
|
||||
|
||||
let self_clone = Arc::clone(&self);
|
||||
let task_id = task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
task_mgr::TaskKind::DownloadAllRemoteLayers,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
|
||||
@@ -383,7 +383,7 @@ impl DeleteTimelineFlow {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
*task_mgr::BACKGROUND_RUNTIME,
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_shard_id),
|
||||
Some(timeline_id),
|
||||
|
||||
@@ -57,7 +57,7 @@ impl Timeline {
|
||||
let self_clone = Arc::clone(self);
|
||||
let background_tasks_can_start = background_tasks_can_start.cloned();
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
*BACKGROUND_RUNTIME,
|
||||
TaskKind::Eviction,
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
|
||||
Reference in New Issue
Block a user