From dc03f7a44f6a4a1200d6f82200b538b3bf9db5fb Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Apr 2024 16:31:16 +0200 Subject: [PATCH] pageserver: ability to use a single runtime This PR allows running the pageserver with a single tokio runtime. --- libs/utils/src/lib.rs | 2 +- libs/utils/src/logging.rs | 8 +-- pageserver/src/bin/pageserver.rs | 8 +-- pageserver/src/consumption_metrics.rs | 2 +- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/task_mgr.rs | 61 +++++++++++-------- pageserver/src/tenant/delete.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/tenant/secondary.rs | 4 +- pageserver/src/tenant/tasks.rs | 4 +- pageserver/src/tenant/timeline.rs | 8 +-- pageserver/src/tenant/timeline/delete.rs | 2 +- .../src/tenant/timeline/eviction_task.rs | 2 +- 14 files changed, 55 insertions(+), 54 deletions(-) diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index bd04eeea05..673c48e450 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -26,8 +26,8 @@ pub mod auth; // utility functions and helper traits for unified unique id generation/serialization etc. pub mod id; -mod hex; pub mod env_config; +mod hex; pub use hex::Hex; // http endpoint utils diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index d9531d0438..f7b73dc984 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,16 +1,10 @@ -use std::{ - io::BufWriter, - str::FromStr, - sync::{Arc, Mutex}, -}; +use std::str::FromStr; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, EnumVariantNames}; -use super::env_config; - #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] #[strum(serialize_all = "snake_case")] pub enum LogFormat { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index c80230d4d7..073655a598 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -391,7 +391,7 @@ fn start_pageserver( conf, ); if let Some(deletion_workers) = deletion_workers { - deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle()); + deletion_workers.spawn_with(*BACKGROUND_RUNTIME); } // Up to this point no significant I/O has been done: this should have been fast. Record @@ -569,7 +569,7 @@ fn start_pageserver( .with_graceful_shutdown(task_mgr::shutdown_watcher()); task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), + *MGMT_REQUEST_RUNTIME, TaskKind::HttpEndpointListener, None, None, @@ -594,7 +594,7 @@ fn start_pageserver( let local_disk_storage = conf.workdir.join("last_consumption_metrics.json"); task_mgr::spawn( - crate::BACKGROUND_RUNTIME.handle(), + *crate::BACKGROUND_RUNTIME, TaskKind::MetricsCollection, None, None, @@ -647,7 +647,7 @@ fn start_pageserver( DownloadBehavior::Error, ); task_mgr::spawn( - COMPUTE_REQUEST_RUNTIME.handle(), + *COMPUTE_REQUEST_RUNTIME, TaskKind::LibpqEndpointListener, None, None, diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index f5540e896f..a09da56ee4 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -64,7 +64,7 @@ pub async fn collect_metrics( let worker_ctx = ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::CalculateSyntheticSize, None, None, diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 6248424cee..e44ae21a36 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -201,7 +201,7 @@ pub fn launch_disk_usage_global_eviction_task( info!("launching disk usage based eviction task"); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::DiskUsageEviction, None, None, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 0cc5611a12..864b186e63 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -39,7 +39,6 @@ use std::sync::{Arc, Mutex}; use futures::FutureExt; use pageserver_api::shard::TenantShardId; -use tokio::runtime::Runtime; use tokio::task::JoinHandle; use tokio::task_local; use tokio_util::sync::CancellationToken; @@ -48,6 +47,7 @@ use tracing::{debug, error, info, warn}; use once_cell::sync::Lazy; +use utils::env_config; use utils::id::TimelineId; // @@ -98,42 +98,49 @@ use utils::id::TimelineId; // other operations, if the upload tasks e.g. get blocked on locks. It shouldn't // happen, but still. // -pub static COMPUTE_REQUEST_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("compute request worker") - .enable_all() - .build() - .expect("Failed to create compute request runtime") + +static USE_SINGLE_RUNTIME: Lazy = Lazy::new(|| { + env_config::var("NEON_PAGESERVER_USE_SINGLE_RUNTIME", || { + env_config::Bool::new(false) + }) + .into() }); -pub static MGMT_REQUEST_RUNTIME: Lazy = Lazy::new(|| { +static SINGLE_RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() - .thread_name("mgmt request worker") + .thread_name("pageserver worker") .enable_all() .build() - .expect("Failed to create mgmt request runtime") + .expect("failed to create single runtime") }); -pub static WALRECEIVER_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("walreceiver worker") - .enable_all() - .build() - .expect("Failed to create walreceiver runtime") -}); - -pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("background op worker") - // if you change the number of worker threads please change the constant below - .enable_all() - .build() - .expect("Failed to create background op runtime") -}); +macro_rules! single_runtime_or_multi_thread_enable_all { + ($varname:ident, $name:literal) => { + pub static $varname: Lazy<&'static tokio::runtime::Handle> = Lazy::new(|| { + if *USE_SINGLE_RUNTIME { + SINGLE_RUNTIME.handle() + } else { + static RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name($name) + .enable_all() + .build() + .expect(std::concat!("Failed to create runtime ", $name)) + }); + RUNTIME.handle() + } + }); + }; +} +single_runtime_or_multi_thread_enable_all!(COMPUTE_REQUEST_RUNTIME, "compute request worker"); +single_runtime_or_multi_thread_enable_all!(MGMT_REQUEST_RUNTIME, "mgmt request worker"); +single_runtime_or_multi_thread_enable_all!(WALRECEIVER_RUNTIME, "walreceiver worker"); +// if you change the number of worker threads please change the constant below +single_runtime_or_multi_thread_enable_all!(BACKGROUND_RUNTIME, "background op worker"); pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy = Lazy::new(|| { // force init and thus panics - let _ = BACKGROUND_RUNTIME.handle(); + let _ = *BACKGROUND_RUNTIME; // replicates tokio-1.28.1::loom::sys::num_cpus which is not available publicly // tokio would had already panicked for parsing errors or NotUnicode // diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index d1881f3897..55e6704835 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -485,7 +485,7 @@ impl DeleteTenantFlow { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index b1b46d487b..76973efaa8 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1849,7 +1849,7 @@ impl TenantManager { let task_tenant_id = None; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::MgmtRequest, task_tenant_id, None, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 9b1b5e7ed5..754395ed0c 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -341,7 +341,7 @@ impl RemoteTimelineClient { // remote_timeline_client.rs tests rely on current-thread runtime tokio::runtime::Handle::current() } else { - BACKGROUND_RUNTIME.handle().clone() + BACKGROUND_RUNTIME.clone() }, tenant_shard_id, timeline_id, diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 19f36c722e..f74ed8dbe5 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -317,7 +317,7 @@ pub fn spawn_tasks( tokio::sync::mpsc::channel::>(16); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::SecondaryDownloads, None, None, @@ -338,7 +338,7 @@ pub fn spawn_tasks( ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::SecondaryUploads, None, None, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index e4f5f75132..eeb170b260 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -85,7 +85,7 @@ pub fn start_background_loops( ) { let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::Compaction, Some(tenant_shard_id), None, @@ -109,7 +109,7 @@ pub fn start_background_loops( }, ); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::GarbageCollector, Some(tenant_shard_id), None, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d3c8c5f66c..9c2cf666ac 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1962,7 +1962,7 @@ impl Timeline { initdb_optimization_count: 0, }; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::LayerFlushTask, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2324,7 +2324,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::InitialLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -2502,7 +2502,7 @@ impl Timeline { DownloadBehavior::Download, ); task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::OndemandLogicalSizeCalculation, Some(self.tenant_shard_id), Some(self.timeline_id), @@ -4484,7 +4484,7 @@ impl Timeline { let self_clone = Arc::clone(&self); let task_id = task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, task_mgr::TaskKind::DownloadAllRemoteLayers, Some(self.tenant_shard_id), Some(self.timeline_id), diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index af10c1c84b..f4fcbbdeda 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -383,7 +383,7 @@ impl DeleteTimelineFlow { let timeline_id = timeline.timeline_id; task_mgr::spawn( - task_mgr::BACKGROUND_RUNTIME.handle(), + *task_mgr::BACKGROUND_RUNTIME, TaskKind::TimelineDeletionWorker, Some(tenant_shard_id), Some(timeline_id), diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 522c5b57de..78b9dfff47 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -57,7 +57,7 @@ impl Timeline { let self_clone = Arc::clone(self); let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), + *BACKGROUND_RUNTIME, TaskKind::Eviction, Some(self.tenant_shard_id), Some(self.timeline_id),