From 70d113a355439e2eaa54eac98c2d1ba5cba3b66f Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 17 Jun 2024 23:18:37 -0700 Subject: [PATCH] feat: update default size of bgworkers, add hbworkers (#4129) * feat: update default size of bgworkers, add hbworkers * feat: update frontend heartbeat as well * chore: update sample config files and default settings * chore: update config docs * Revert "chore: update config docs" This reverts commit 8107f4c1208869b863d7bf8ff8e1c453759ef761. * Revert "chore: update sample config files and default settings" This reverts commit f5ae701c8d319e18978cfc0510bab3932b6ec08b. * feat: use default heartbeat runtime size * chore: update config docs --- config/config.md | 8 ++++---- config/datanode.example.toml | 2 +- config/frontend.example.toml | 2 +- config/metasrv.example.toml | 2 +- config/standalone.example.toml | 2 +- src/cmd/tests/load_config_test.rs | 8 ++++---- src/common/runtime/src/global.rs | 26 ++++++++++++++++++++++---- src/common/runtime/src/lib.rs | 4 ++-- src/datanode/src/heartbeat.rs | 4 ++-- src/frontend/src/heartbeat.rs | 4 ++-- 10 files changed, 40 insertions(+), 22 deletions(-) diff --git a/config/config.md b/config/config.md index 97efadc317..a6f3dafc2d 100644 --- a/config/config.md +++ b/config/config.md @@ -16,7 +16,7 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | +| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `30s` | HTTP request timeout. | @@ -161,7 +161,7 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | +| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | @@ -251,7 +251,7 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | +| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | | `procedure` | -- | -- | Procedure storage options. | | `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. | | `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially | @@ -316,7 +316,7 @@ | `runtime` | -- | -- | The runtime options. | | `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. | -| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. | +| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. | | `heartbeat` | -- | -- | The heartbeat options. | | `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. | | `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 16ffbbb0a4..2d4306bfc5 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -42,7 +42,7 @@ read_rt_size = 8 ## The number of threads to execute the runtime for global write operations. write_rt_size = 8 ## The number of threads to execute the runtime for global background operations. -bg_rt_size = 8 +bg_rt_size = 4 ## The heartbeat options. [heartbeat] diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 4f4bd5bf3d..7ac52a672f 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -12,7 +12,7 @@ read_rt_size = 8 ## The number of threads to execute the runtime for global write operations. write_rt_size = 8 ## The number of threads to execute the runtime for global background operations. -bg_rt_size = 8 +bg_rt_size = 4 ## The heartbeat options. [heartbeat] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 239533bd58..71dc48077c 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -32,7 +32,7 @@ read_rt_size = 8 ## The number of threads to execute the runtime for global write operations. write_rt_size = 8 ## The number of threads to execute the runtime for global background operations. -bg_rt_size = 8 +bg_rt_size = 4 ## Procedure storage options. [procedure] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index d6fcc3e894..55050ba27c 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -15,7 +15,7 @@ read_rt_size = 8 ## The number of threads to execute the runtime for global write operations. write_rt_size = 8 ## The number of threads to execute the runtime for global background operations. -bg_rt_size = 8 +bg_rt_size = 4 ## The HTTP server options. [http] diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index c609e4c4a9..7f85d2795f 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -42,7 +42,7 @@ fn test_load_datanode_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, }, component: DatanodeOptions { node_id: Some(42), @@ -107,7 +107,7 @@ fn test_load_frontend_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, }, component: FrontendOptions { default_timezone: Some("UTC".to_string()), @@ -155,7 +155,7 @@ fn test_load_metasrv_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, }, component: MetasrvOptions { selector: SelectorType::LeaseBased, @@ -188,7 +188,7 @@ fn test_load_standalone_example_config() { runtime: RuntimeOptions { read_rt_size: 8, write_rt_size: 8, - bg_rt_size: 8, + bg_rt_size: 4, }, component: StandaloneOptions { default_timezone: Some("UTC".to_string()), diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index 6b21851e16..c0bf1eabb0 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -25,7 +25,8 @@ use crate::{Builder, JoinHandle, Runtime}; const READ_WORKERS: usize = 8; const WRITE_WORKERS: usize = 8; -const BG_WORKERS: usize = 8; +const BG_WORKERS: usize = 4; +const HB_WORKERS: usize = 2; /// The options for the global runtimes. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -44,7 +45,7 @@ impl Default for RuntimeOptions { Self { read_rt_size: cpus, write_rt_size: cpus, - bg_rt_size: cpus, + bg_rt_size: usize::max(cpus / 2, 1), } } } @@ -63,6 +64,7 @@ struct GlobalRuntimes { read_runtime: Runtime, write_runtime: Runtime, bg_runtime: Runtime, + hb_runtime: Runtime, } macro_rules! define_spawn { @@ -96,8 +98,14 @@ impl GlobalRuntimes { define_spawn!(read); define_spawn!(write); define_spawn!(bg); + define_spawn!(hb); - fn new(read: Option, write: Option, background: Option) -> Self { + fn new( + read: Option, + write: Option, + background: Option, + heartbeat: Option, + ) -> Self { Self { read_runtime: read .unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)), @@ -105,6 +113,8 @@ impl GlobalRuntimes { .unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)), bg_runtime: background .unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)), + hb_runtime: heartbeat + .unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)), } } } @@ -114,6 +124,7 @@ struct ConfigRuntimes { read_runtime: Option, write_runtime: Option, bg_runtime: Option, + hb_runtime: Option, already_init: bool, } @@ -122,9 +133,10 @@ static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| { let read = c.read_runtime.take(); let write = c.write_runtime.take(); let background = c.bg_runtime.take(); + let heartbeat = c.hb_runtime.take(); c.already_init = true; - GlobalRuntimes::new(read, write, background) + GlobalRuntimes::new(read, write, background, heartbeat) }); static CONFIG_RUNTIMES: Lazy> = @@ -155,6 +167,7 @@ pub fn init_global_runtimes(options: &RuntimeOptions) { "global-bg-worker", options.bg_rt_size, )); + c.hb_runtime = Some(create_runtime("global-hb", "global-hb-worker", HB_WORKERS)); }); } @@ -195,6 +208,7 @@ macro_rules! define_global_runtime_spawn { define_global_runtime_spawn!(read); define_global_runtime_spawn!(write); define_global_runtime_spawn!(bg); +define_global_runtime_spawn!(hb); #[cfg(test)] mod tests { @@ -212,6 +226,9 @@ mod tests { let handle = spawn_bg(async { 3 + 3 }); assert_eq!(6, block_on_bg(handle).unwrap()); + + let handle = spawn_bg(async { 4 + 4 }); + assert_eq!(8, block_on_hb(handle).unwrap()); } macro_rules! define_spawn_blocking_test { @@ -239,4 +256,5 @@ mod tests { define_spawn_blocking_test!(read); define_spawn_blocking_test!(write); define_spawn_blocking_test!(bg); + define_spawn_blocking_test!(hb); } diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index ba6f74c96c..868c2e70fc 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -20,8 +20,8 @@ pub mod runtime; pub use global::{ bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes, - read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write, - spawn_read, spawn_write, write_runtime, + read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read, + spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime, }; pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction}; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index b8474866bf..9b684b5368 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -108,7 +108,7 @@ impl HeartbeatTask { let mut last_received_lease = Instant::now(); - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_hb(async move { while let Some(res) = rx.message().await.unwrap_or_else(|e| { error!(e; "Error while reading heartbeat response"); None @@ -215,7 +215,7 @@ impl HeartbeatTask { self.region_alive_keeper.start(Some(event_receiver)).await?; let mut last_sent = Instant::now(); - common_runtime::spawn_bg(async move { + common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); loop { diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 3e744fb7bc..0ccbed35a6 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -85,7 +85,7 @@ impl HeartbeatTask { let capture_self = self.clone(); let retry_interval = self.retry_interval; - let _handle = common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_hb(async move { loop { match resp_stream.message().await { Ok(Some(resp)) => { @@ -132,7 +132,7 @@ impl HeartbeatTask { addr: self.server_addr.clone(), }); - common_runtime::spawn_bg(async move { + common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep);