diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 80075b846e..a25f1883c2 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -41,7 +41,8 @@ fn test_load_datanode_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, + hb_rt_size: 1, }, component: DatanodeOptions { node_id: Some(42), @@ -100,7 +101,8 @@ fn test_load_frontend_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, + hb_rt_size: 1, }, component: FrontendOptions { default_timezone: Some("UTC".to_string()), @@ -148,7 +150,8 @@ fn test_load_metasrv_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, + hb_rt_size: 1, }, component: MetasrvOptions { selector: SelectorType::LeaseBased, @@ -181,7 +184,8 @@ fn test_load_standalone_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, + hb_rt_size: 1, }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 6b21851e16..092ffc190b 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -25,7 +25,8 @@ use crate::{Builder, JoinHandle, Runtime}; const READ_WORKERS: usize = 8; const WRITE_WORKERS: usize = 8; -const BG_WORKERS: usize = 8; +const BG_WORKERS: usize = 4; +const HB_WORKERS: usize = 1; /// The options for the global runtimes. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -36,6 +37,9 @@ pub struct RuntimeOptions { pub write_rt_size: usize, /// The number of threads to execute the runtime for global background operations. pub bg_rt_size: usize, + /// The number of threads to execute the runtime for heartbeat between meta + /// and this node + pub hb_rt_size: usize, } impl Default for RuntimeOptions { @@ -44,7 +48,8 @@ impl Default for RuntimeOptions { Self { read_rt_size: cpus, write_rt_size: cpus, - bg_rt_size: cpus, + bg_rt_size: usize::max(cpus / 2, 1), + hb_rt_size: 1, } } } @@ -63,6 +68,7 @@ struct GlobalRuntimes { read_runtime: Runtime, write_runtime: Runtime, bg_runtime: Runtime, + hb_runtime: Runtime, } macro_rules! define_spawn { @@ -96,8 +102,14 @@ impl GlobalRuntimes { define_spawn!(read); define_spawn!(write); define_spawn!(bg); + define_spawn!(hb); - fn new(read: Option, write: Option, background: Option) -> Self { + fn new( + read: Option, + write: Option, + background: Option, + heartbeat: Option, + ) -> Self { Self { read_runtime: read .unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)), @@ -105,6 +117,8 @@ impl GlobalRuntimes { .unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)), bg_runtime: background .unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)), + hb_runtime: heartbeat + .unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)), } } } @@ -114,6 +128,7 @@ struct ConfigRuntimes { read_runtime: Option, write_runtime: Option, bg_runtime: Option, + hb_runtime: Option, already_init: bool, } @@ -122,9 +137,10 @@ static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| { let read = c.read_runtime.take(); let write = c.write_runtime.take(); let background = c.bg_runtime.take(); + let heartbeat = c.hb_runtime.take(); c.already_init = true; - GlobalRuntimes::new(read, write, background) + GlobalRuntimes::new(read, write, background, heartbeat) }); static CONFIG_RUNTIMES: Lazy> = @@ -155,6 +171,11 @@ pub fn init_global_runtimes(options: &RuntimeOptions) { "global-bg-worker", options.bg_rt_size, )); + c.hb_runtime = Some(create_runtime( + "global-hb", + "global-hb-worker", + options.hb_rt_size, + )); }); } @@ -195,6 +216,7 @@ macro_rules! define_global_runtime_spawn { define_global_runtime_spawn!(read); define_global_runtime_spawn!(write); define_global_runtime_spawn!(bg); +define_global_runtime_spawn!(hb); #[cfg(test)] mod tests { @@ -212,6 +234,9 @@ mod tests { let handle = spawn_bg(async { 3 + 3 }); assert_eq!(6, block_on_bg(handle).unwrap()); + + let handle = spawn_bg(async { 4 + 4 }); + assert_eq!(8, block_on_hb(handle).unwrap()); } macro_rules! define_spawn_blocking_test { @@ -239,4 +264,5 @@ mod tests { define_spawn_blocking_test!(read); define_spawn_blocking_test!(write); define_spawn_blocking_test!(bg); + define_spawn_blocking_test!(hb); } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index ba6f74c96c..868c2e70fc 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -20,8 +20,8 @@ pub mod runtime; pub use global::{ bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes, - read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write, - spawn_read, spawn_write, write_runtime, + read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read, + spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime, }; pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction}; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ed518fcebe..dded1cf43a 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -108,7 +108,7 @@ impl HeartbeatTask { let mut last_received_lease = Instant::now(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_hb(async move { while let Some(res) = rx.message().await.unwrap_or_else(|e| { error!(e; "Error while reading heartbeat response"); None @@ -215,7 +215,7 @@ impl HeartbeatTask { self.region_alive_keeper.start(Some(event_receiver)).await?; let mut last_sent = Instant::now(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); loop {