feat: update default size of bgworkers, add hbworkers

This commit is contained in:
Ning Sun
2024-06-10 13:32:35 -07:00
parent d8b51cfaba
commit cb2047dacc
4 changed files with 42 additions and 12 deletions

View File

@@ -41,7 +41,8 @@ fn test_load_datanode_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
hb_rt_size: 1,
},
component: DatanodeOptions {
node_id: Some(42),
@@ -100,7 +101,8 @@ fn test_load_frontend_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
hb_rt_size: 1,
},
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
@@ -148,7 +150,8 @@ fn test_load_metasrv_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
hb_rt_size: 1,
},
component: MetasrvOptions {
selector: SelectorType::LeaseBased,
@@ -181,7 +184,8 @@ fn test_load_standalone_example_config() {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
bg_rt_size: 4,
hb_rt_size: 1,
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),

View File

@@ -25,7 +25,8 @@ use crate::{Builder, JoinHandle, Runtime};
const READ_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 8;
const BG_WORKERS: usize = 8;
const BG_WORKERS: usize = 4;
const HB_WORKERS: usize = 1;
/// The options for the global runtimes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
@@ -36,6 +37,9 @@ pub struct RuntimeOptions {
pub write_rt_size: usize,
/// The number of threads to execute the runtime for global background operations.
pub bg_rt_size: usize,
/// The number of threads to execute the runtime for heartbeat between meta
/// and this node
pub hb_rt_size: usize,
}
impl Default for RuntimeOptions {
@@ -44,7 +48,8 @@ impl Default for RuntimeOptions {
Self {
read_rt_size: cpus,
write_rt_size: cpus,
bg_rt_size: cpus,
bg_rt_size: usize::max(cpus / 2, 1),
hb_rt_size: 1,
}
}
}
@@ -63,6 +68,7 @@ struct GlobalRuntimes {
read_runtime: Runtime,
write_runtime: Runtime,
bg_runtime: Runtime,
hb_runtime: Runtime,
}
macro_rules! define_spawn {
@@ -96,8 +102,14 @@ impl GlobalRuntimes {
define_spawn!(read);
define_spawn!(write);
define_spawn!(bg);
define_spawn!(hb);
fn new(read: Option<Runtime>, write: Option<Runtime>, background: Option<Runtime>) -> Self {
fn new(
read: Option<Runtime>,
write: Option<Runtime>,
background: Option<Runtime>,
heartbeat: Option<Runtime>,
) -> Self {
Self {
read_runtime: read
.unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)),
@@ -105,6 +117,8 @@ impl GlobalRuntimes {
.unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)),
bg_runtime: background
.unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)),
hb_runtime: heartbeat
.unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)),
}
}
}
@@ -114,6 +128,7 @@ struct ConfigRuntimes {
read_runtime: Option<Runtime>,
write_runtime: Option<Runtime>,
bg_runtime: Option<Runtime>,
hb_runtime: Option<Runtime>,
already_init: bool,
}
@@ -122,9 +137,10 @@ static GLOBAL_RUNTIMES: Lazy<GlobalRuntimes> = Lazy::new(|| {
let read = c.read_runtime.take();
let write = c.write_runtime.take();
let background = c.bg_runtime.take();
let heartbeat = c.hb_runtime.take();
c.already_init = true;
GlobalRuntimes::new(read, write, background)
GlobalRuntimes::new(read, write, background, heartbeat)
});
static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
@@ -155,6 +171,11 @@ pub fn init_global_runtimes(options: &RuntimeOptions) {
"global-bg-worker",
options.bg_rt_size,
));
c.hb_runtime = Some(create_runtime(
"global-hb",
"global-hb-worker",
options.hb_rt_size,
));
});
}
@@ -195,6 +216,7 @@ macro_rules! define_global_runtime_spawn {
define_global_runtime_spawn!(read);
define_global_runtime_spawn!(write);
define_global_runtime_spawn!(bg);
define_global_runtime_spawn!(hb);
#[cfg(test)]
mod tests {
@@ -212,6 +234,9 @@ mod tests {
let handle = spawn_bg(async { 3 + 3 });
assert_eq!(6, block_on_bg(handle).unwrap());
let handle = spawn_bg(async { 4 + 4 });
assert_eq!(8, block_on_hb(handle).unwrap());
}
macro_rules! define_spawn_blocking_test {
@@ -239,4 +264,5 @@ mod tests {
define_spawn_blocking_test!(read);
define_spawn_blocking_test!(write);
define_spawn_blocking_test!(bg);
define_spawn_blocking_test!(hb);
}

View File

@@ -20,8 +20,8 @@ pub mod runtime;
pub use global::{
bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes,
read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_read, spawn_blocking_write,
spawn_read, spawn_write, write_runtime,
read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read,
spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime,
};
pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};

View File

@@ -108,7 +108,7 @@ impl HeartbeatTask {
let mut last_received_lease = Instant::now();
let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_hb(async move {
while let Some(res) = rx.message().await.unwrap_or_else(|e| {
error!(e; "Error while reading heartbeat response");
None
@@ -215,7 +215,7 @@ impl HeartbeatTask {
self.region_alive_keeper.start(Some(event_receiver)).await?;
let mut last_sent = Instant::now();
common_runtime::spawn_bg(async move {
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
loop {