diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index ca2325d3b3..b73eca49b5 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -23,18 +23,18 @@ use tracing::*; use utils::pid_file; use metrics::set_build_info_metric; +use safekeeper::broker; +use safekeeper::control_file; use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PG_LISTEN_ADDR, }; +use safekeeper::http; +use safekeeper::remove_wal; +use safekeeper::wal_backup; +use safekeeper::wal_service; use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; -use safekeeper::{broker, WAL_SERVICE_RUNTIME}; -use safekeeper::{control_file, BROKER_RUNTIME}; -use safekeeper::{http, WAL_REMOVER_RUNTIME}; -use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME}; -use safekeeper::{wal_backup, HTTP_RUNTIME}; -use safekeeper::{wal_service, METRICS_SHIFTER_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; use utils::auth::JwtAuth; use utils::{ @@ -241,41 +241,36 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { FuturesUnordered::new(); let conf_ = conf.clone(); - let wal_service_handle = WAL_SERVICE_RUNTIME - .spawn(wal_service::task_main(conf_, pg_listener)) + let wal_service_handle = tokio::spawn(wal_service::task_main(conf_, pg_listener)) // wrap with task name for error reporting .map(|res| ("WAL service main".to_owned(), res)); tasks_handles.push(Box::pin(wal_service_handle)); let conf_ = conf.clone(); - let http_handle = HTTP_RUNTIME - .spawn(http::task_main(conf_, http_listener)) + let http_handle = tokio::spawn(http::task_main(conf_, http_listener)) .map(|res| ("HTTP service main".to_owned(), res)); tasks_handles.push(Box::pin(http_handle)); let conf_ = conf.clone(); - let broker_task_handle = BROKER_RUNTIME - .spawn(broker::task_main(conf_).instrument(info_span!("broker"))) - .map(|res| ("broker main".to_owned(), res)); + let broker_task_handle = + tokio::spawn(broker::task_main(conf_).instrument(info_span!("broker"))) + .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); let conf_ = conf.clone(); - let wal_remover_handle = WAL_REMOVER_RUNTIME - .spawn(remove_wal::task_main(conf_)) - .map(|res| ("WAL remover".to_owned(), res)); + let wal_remover_handle = + tokio::spawn(remove_wal::task_main(conf_)).map(|res| ("WAL remover".to_owned(), res)); tasks_handles.push(Box::pin(wal_remover_handle)); let conf_ = conf.clone(); - let wal_backup_handle = WAL_BACKUP_RUNTIME - .spawn(wal_backup::wal_backup_launcher_task_main( - conf_, - wal_backup_launcher_rx, - )) - .map(|res| ("WAL backup launcher".to_owned(), res)); + let wal_backup_handle = tokio::spawn(wal_backup::wal_backup_launcher_task_main( + conf_, + wal_backup_launcher_rx, + )) + .map(|res| ("WAL backup launcher".to_owned(), res)); tasks_handles.push(Box::pin(wal_backup_handle)); - let metrics_shifter_handle = METRICS_SHIFTER_RUNTIME - .spawn(safekeeper::metrics::metrics_shifter()) + let metrics_shifter_handle = tokio::spawn(safekeeper::metrics::metrics_shifter()) .map(|res| ("metrics shifter".to_owned(), res)); tasks_handles.push(Box::pin(metrics_shifter_handle)); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 1c24f470bf..955fd55700 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,6 +1,4 @@ -use once_cell::sync::Lazy; use remote_storage::RemoteStorageConfig; -use tokio::runtime::Runtime; use std::path::PathBuf; use std::time::Duration; @@ -98,55 +96,3 @@ impl SafeKeeperConf { } } } - -// Tokio runtimes. -pub static WAL_SERVICE_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("WAL service worker") - .enable_all() - .build() - .expect("Failed to create WAL service runtime") -}); - -pub static HTTP_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("HTTP worker") - .enable_all() - .build() - .expect("Failed to create WAL service runtime") -}); - -pub static BROKER_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("broker worker") - .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense - .enable_all() - .build() - .expect("Failed to create broker runtime") -}); - -pub static WAL_REMOVER_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("WAL remover") - .worker_threads(1) - .enable_all() - .build() - .expect("Failed to create broker runtime") -}); - -pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("WAL backup worker") - .enable_all() - .build() - .expect("Failed to create WAL backup runtime") -}); - -pub static METRICS_SHIFTER_RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .thread_name("metric shifter") - .worker_threads(1) - .enable_all() - .build() - .expect("Failed to create broker runtime") -});