diff --git a/.github/actions/setup-greptimedb-cluster/with-disk.yaml b/.github/actions/setup-greptimedb-cluster/with-disk.yaml
index 2b5b855476..1cbd22dbba 100644
--- a/.github/actions/setup-greptimedb-cluster/with-disk.yaml
+++ b/.github/actions/setup-greptimedb-cluster/with-disk.yaml
@@ -1,18 +1,13 @@
meta:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
datanode:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
+ compact_rt_size = 2
frontend:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
\ No newline at end of file
diff --git a/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml b/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml
index acf99adf26..fc89bd5422 100644
--- a/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml
+++ b/.github/actions/setup-greptimedb-cluster/with-minio-and-cache.yaml
@@ -1,19 +1,16 @@
meta:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
-
+ global_rt_size = 4
+
[datanode]
[datanode.client]
timeout = "60s"
datanode:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
+ compact_rt_size = 2
[storage]
cache_path = "/data/greptimedb/s3cache"
@@ -21,9 +18,7 @@ datanode:
frontend:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
[meta_client]
ddl_timeout = "60s"
diff --git a/.github/actions/setup-greptimedb-cluster/with-minio.yaml b/.github/actions/setup-greptimedb-cluster/with-minio.yaml
index d5ddcddba5..b0b1c6b757 100644
--- a/.github/actions/setup-greptimedb-cluster/with-minio.yaml
+++ b/.github/actions/setup-greptimedb-cluster/with-minio.yaml
@@ -1,9 +1,7 @@
meta:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
[datanode]
[datanode.client]
@@ -11,15 +9,12 @@ meta:
datanode:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
+ compact_rt_size = 2
frontend:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
[meta_client]
ddl_timeout = "60s"
diff --git a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
index bf4d3da65c..e5fc71cfe3 100644
--- a/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
+++ b/.github/actions/setup-greptimedb-cluster/with-remote-wal.yaml
@@ -1,9 +1,7 @@
meta:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
[wal]
provider = "kafka"
@@ -17,9 +15,8 @@ meta:
datanode:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
+ compact_rt_size = 2
[wal]
provider = "kafka"
@@ -28,9 +25,7 @@ datanode:
frontend:
config: |-
[runtime]
- read_rt_size = 8
- write_rt_size = 8
- bg_rt_size = 8
+ global_rt_size = 4
[meta_client]
ddl_timeout = "60s"
diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml
index b92244c317..61e27352e9 100644
--- a/.github/workflows/develop.yml
+++ b/.github/workflows/develop.yml
@@ -141,6 +141,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
+ fail-fast: false
matrix:
target: [ "fuzz_create_table", "fuzz_alter_table", "fuzz_create_database", "fuzz_create_logical_table", "fuzz_alter_logical_table", "fuzz_insert", "fuzz_insert_logical_table" ]
steps:
diff --git a/config/config.md b/config/config.md
index d865a83ad7..b2a96860ec 100644
--- a/config/config.md
+++ b/config/config.md
@@ -16,9 +16,8 @@
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
-| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
-| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
-| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
+| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
+| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -170,9 +169,8 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
-| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
-| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
-| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
+| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
+| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
@@ -262,9 +260,8 @@
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.
This feature is only available on GreptimeDB running on cluster mode and
- Using Remote WAL
- Using shared storage (e.g., s3). |
| `runtime` | -- | -- | The runtime options. |
-| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
-| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
-| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
+| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
+| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
@@ -338,9 +335,8 @@
| `grpc.tls.key_path` | String | `None` | Private key file path. |
| `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.
For now, gRPC tls config does not support auto reload. |
| `runtime` | -- | -- | The runtime options. |
-| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
-| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
-| `runtime.bg_rt_size` | Integer | `4` | The number of threads to execute the runtime for global background operations. |
+| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
+| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index ea8c95cf70..97e4fae1d5 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -73,11 +73,9 @@ watch = false
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
-read_rt_size = 8
+global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
-write_rt_size = 8
-## The number of threads to execute the runtime for global background operations.
-bg_rt_size = 4
+compact_rt_size = 4
## The heartbeat options.
[heartbeat]
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 3def69cfac..8f6a1c859e 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -5,11 +5,9 @@ default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
-read_rt_size = 8
+global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
-write_rt_size = 8
-## The number of threads to execute the runtime for global background operations.
-bg_rt_size = 4
+compact_rt_size = 4
## The heartbeat options.
[heartbeat]
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index e1169ba174..494e89a1c2 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -34,11 +34,9 @@ enable_region_failover = false
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
-read_rt_size = 8
+global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
-write_rt_size = 8
-## The number of threads to execute the runtime for global background operations.
-bg_rt_size = 4
+compact_rt_size = 4
## Procedure storage options.
[procedure]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 7d40927703..36a46e9ed9 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -11,11 +11,9 @@ default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
-read_rt_size = 8
+global_rt_size = 8
## The number of threads to execute the runtime for global write operations.
-write_rt_size = 8
-## The number of threads to execute the runtime for global background operations.
-bg_rt_size = 4
+compact_rt_size = 4
## The HTTP server options.
[http]
diff --git a/docs/how-to/how-to-write-fuzz-tests.md b/docs/how-to/how-to-write-fuzz-tests.md
index 88d6c0914a..113b71027a 100644
--- a/docs/how-to/how-to-write-fuzz-tests.md
+++ b/docs/how-to/how-to-write-fuzz-tests.md
@@ -105,7 +105,7 @@ use tests_fuzz::utils::{init_greptime_connections, Connections};
fuzz_target!(|input: FuzzInput| {
common_telemetry::init_default_ut_logging();
- common_runtime::block_on_write(async {
+ common_runtime::block_on_global(async {
let Connections { mysql } = init_greptime_connections().await;
let mut rng = ChaChaRng::seed_from_u64(input.seed);
let columns = rng.gen_range(2..30);
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index fff3f3e8c9..a6a6328059 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -46,9 +46,8 @@ fn test_load_datanode_example_config() {
let expected = GreptimeOptions:: {
runtime: RuntimeOptions {
- read_rt_size: 8,
- write_rt_size: 8,
- bg_rt_size: 4,
+ global_rt_size: 8,
+ compact_rt_size: 4,
},
component: DatanodeOptions {
node_id: Some(42),
@@ -119,9 +118,8 @@ fn test_load_frontend_example_config() {
.unwrap();
let expected = GreptimeOptions:: {
runtime: RuntimeOptions {
- read_rt_size: 8,
- write_rt_size: 8,
- bg_rt_size: 4,
+ global_rt_size: 8,
+ compact_rt_size: 4,
},
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
@@ -167,9 +165,8 @@ fn test_load_metasrv_example_config() {
.unwrap();
let expected = GreptimeOptions:: {
runtime: RuntimeOptions {
- read_rt_size: 8,
- write_rt_size: 8,
- bg_rt_size: 4,
+ global_rt_size: 8,
+ compact_rt_size: 4,
},
component: MetasrvOptions {
selector: SelectorType::LeaseBased,
@@ -200,9 +197,8 @@ fn test_load_standalone_example_config() {
.unwrap();
let expected = GreptimeOptions:: {
runtime: RuntimeOptions {
- read_rt_size: 8,
- write_rt_size: 8,
- bg_rt_size: 4,
+ global_rt_size: 8,
+ compact_rt_size: 4,
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs
index 1172004a9e..66a2fc3c3f 100644
--- a/src/common/datasource/src/file_format/csv.rs
+++ b/src/common/datasource/src/file_format/csv.rs
@@ -185,7 +185,7 @@ impl FileFormat for CsvFormat {
let schema_infer_max_record = self.schema_infer_max_record;
let has_header = self.has_header;
- common_runtime::spawn_blocking_read(move || {
+ common_runtime::spawn_blocking_global(move || {
let reader = SyncIoBridge::new(decoded);
let (schema, _records_read) =
diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs
index 3599fcd4ec..c70a9beebb 100644
--- a/src/common/datasource/src/file_format/json.rs
+++ b/src/common/datasource/src/file_format/json.rs
@@ -101,7 +101,7 @@ impl FileFormat for JsonFormat {
let schema_infer_max_record = self.schema_infer_max_record;
- common_runtime::spawn_blocking_read(move || {
+ common_runtime::spawn_blocking_global(move || {
let mut reader = BufReader::new(SyncIoBridge::new(decoded));
let iter = ValueIter::new(&mut reader, schema_infer_max_record);
diff --git a/src/common/greptimedb-telemetry/src/lib.rs b/src/common/greptimedb-telemetry/src/lib.rs
index 1f02c524e7..d681b20092 100644
--- a/src/common/greptimedb-telemetry/src/lib.rs
+++ b/src/common/greptimedb-telemetry/src/lib.rs
@@ -72,7 +72,7 @@ impl GreptimeDBTelemetryTask {
match self {
GreptimeDBTelemetryTask::Enable((task, _)) => {
print_anonymous_usage_data_disclaimer();
- task.start(common_runtime::bg_runtime())
+ task.start(common_runtime::global_runtime())
}
GreptimeDBTelemetryTask::Disable => Ok(()),
}
diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs
index 0b77fa326e..eee173c3aa 100644
--- a/src/common/grpc/src/channel_manager.rs
+++ b/src/common/grpc/src/channel_manager.rs
@@ -225,7 +225,7 @@ impl ChannelManager {
}
let pool = self.pool.clone();
- let _handle = common_runtime::spawn_bg(async {
+ let _handle = common_runtime::spawn_global(async {
recycle_channel_in_loop(pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
});
info!(
diff --git a/src/common/macro/src/admin_fn.rs b/src/common/macro/src/admin_fn.rs
index 042343aec0..843b2c4066 100644
--- a/src/common/macro/src/admin_fn.rs
+++ b/src/common/macro/src/admin_fn.rs
@@ -228,7 +228,7 @@ fn build_struct(
.create_mutable_vector(rows_num);
if columns_num == 0 {
- let result = common_runtime::block_on_read(async move {
+ let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &[]).await
})?;
@@ -239,7 +239,7 @@ fn build_struct(
.map(|vector| vector.get_ref(i))
.collect();
- let result = common_runtime::block_on_read(async move {
+ let result = common_runtime::block_on_global(async move {
#fn_name(handler, query_ctx, &args).await
})?;
diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs
index 5fdd81a956..54ae88a5cf 100644
--- a/src/common/procedure/src/local.rs
+++ b/src/common/procedure/src/local.rs
@@ -461,7 +461,7 @@ impl LocalManager {
let tracing_context = TracingContext::from_current_span();
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
@@ -593,7 +593,7 @@ impl ProcedureManager for LocalManager {
let task_inner = self.build_remove_outdated_meta_task();
task_inner
- .start(common_runtime::bg_runtime())
+ .start(common_runtime::global_runtime())
.context(StartRemoveOutdatedMetaTaskSnafu)?;
*task = Some(task_inner);
diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs
index 1d1439e24c..d4ed983421 100644
--- a/src/common/procedure/src/local/runner.rs
+++ b/src/common/procedure/src/local/runner.rs
@@ -393,7 +393,7 @@ impl Runner {
// Add the id of the subprocedure to the metadata.
self.meta.push_child(procedure_id);
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
// Run the root procedure.
runner.run().await
});
diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs
index c0bf1eabb0..b7d78badeb 100644
--- a/src/common/runtime/src/global.rs
+++ b/src/common/runtime/src/global.rs
@@ -23,29 +23,25 @@ use serde::{Deserialize, Serialize};
use crate::{Builder, JoinHandle, Runtime};
-const READ_WORKERS: usize = 8;
-const WRITE_WORKERS: usize = 8;
-const BG_WORKERS: usize = 4;
+const GLOBAL_WORKERS: usize = 8;
+const COMPACT_WORKERS: usize = 4;
const HB_WORKERS: usize = 2;
/// The options for the global runtimes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RuntimeOptions {
- /// The number of threads to execute the runtime for global read operations.
- pub read_rt_size: usize,
- /// The number of threads to execute the runtime for global write operations.
- pub write_rt_size: usize,
- /// The number of threads to execute the runtime for global background operations.
- pub bg_rt_size: usize,
+ /// The number of threads for the global default runtime.
+ pub global_rt_size: usize,
+ /// The number of threads to execute the runtime for compact operations.
+ pub compact_rt_size: usize,
}
impl Default for RuntimeOptions {
fn default() -> Self {
let cpus = num_cpus::get();
Self {
- read_rt_size: cpus,
- write_rt_size: cpus,
- bg_rt_size: usize::max(cpus / 2, 1),
+ global_rt_size: cpus,
+ compact_rt_size: usize::max(cpus / 2, 1),
}
}
}
@@ -61,9 +57,8 @@ pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usi
}
struct GlobalRuntimes {
- read_runtime: Runtime,
- write_runtime: Runtime,
- bg_runtime: Runtime,
+ global_runtime: Runtime,
+ compact_runtime: Runtime,
hb_runtime: Runtime,
}
@@ -95,48 +90,38 @@ macro_rules! define_spawn {
}
impl GlobalRuntimes {
- define_spawn!(read);
- define_spawn!(write);
- define_spawn!(bg);
+ define_spawn!(global);
+ define_spawn!(compact);
define_spawn!(hb);
- fn new(
- read: Option,
- write: Option,
- background: Option,
- heartbeat: Option,
- ) -> Self {
+ fn new(global: Option, compact: Option, heartbeat: Option) -> Self {
Self {
- read_runtime: read
- .unwrap_or_else(|| create_runtime("global-read", "read-worker", READ_WORKERS)),
- write_runtime: write
- .unwrap_or_else(|| create_runtime("global-write", "write-worker", WRITE_WORKERS)),
- bg_runtime: background
- .unwrap_or_else(|| create_runtime("global-bg", "bg-worker", BG_WORKERS)),
+ global_runtime: global
+ .unwrap_or_else(|| create_runtime("global", "global-worker", GLOBAL_WORKERS)),
+ compact_runtime: compact
+ .unwrap_or_else(|| create_runtime("compact", "compact-worker", COMPACT_WORKERS)),
hb_runtime: heartbeat
- .unwrap_or_else(|| create_runtime("global-hb", "hb-worker", HB_WORKERS)),
+ .unwrap_or_else(|| create_runtime("heartbeat", "hb-worker", HB_WORKERS)),
}
}
}
#[derive(Default)]
struct ConfigRuntimes {
- read_runtime: Option,
- write_runtime: Option,
- bg_runtime: Option,
+ global_runtime: Option,
+ compact_runtime: Option,
hb_runtime: Option,
already_init: bool,
}
static GLOBAL_RUNTIMES: Lazy = Lazy::new(|| {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
- let read = c.read_runtime.take();
- let write = c.write_runtime.take();
- let background = c.bg_runtime.take();
+ let global = c.global_runtime.take();
+ let compact = c.compact_runtime.take();
let heartbeat = c.hb_runtime.take();
c.already_init = true;
- GlobalRuntimes::new(read, write, background, heartbeat)
+ GlobalRuntimes::new(global, compact, heartbeat)
});
static CONFIG_RUNTIMES: Lazy> =
@@ -152,22 +137,17 @@ pub fn init_global_runtimes(options: &RuntimeOptions) {
START.call_once(move || {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
assert!(!c.already_init, "Global runtimes already initialized");
- c.read_runtime = Some(create_runtime(
- "global-read",
- "global-read-worker",
- options.read_rt_size,
+ c.global_runtime = Some(create_runtime(
+ "global",
+ "global-worker",
+ options.global_rt_size,
));
- c.write_runtime = Some(create_runtime(
- "global-write",
- "global-write-worker",
- options.write_rt_size,
+ c.compact_runtime = Some(create_runtime(
+ "compact",
+ "compact-worker",
+ options.compact_rt_size,
));
- c.bg_runtime = Some(create_runtime(
- "global-bg",
- "global-bg-worker",
- options.bg_rt_size,
- ));
- c.hb_runtime = Some(create_runtime("global-hb", "global-hb-worker", HB_WORKERS));
+ c.hb_runtime = Some(create_runtime("hreartbeat", "hb-worker", HB_WORKERS));
});
}
@@ -205,9 +185,8 @@ macro_rules! define_global_runtime_spawn {
};
}
-define_global_runtime_spawn!(read);
-define_global_runtime_spawn!(write);
-define_global_runtime_spawn!(bg);
+define_global_runtime_spawn!(global);
+define_global_runtime_spawn!(compact);
define_global_runtime_spawn!(hb);
#[cfg(test)]
@@ -218,16 +197,13 @@ mod tests {
#[test]
fn test_spawn_block_on() {
- let handle = spawn_read(async { 1 + 1 });
- assert_eq!(2, block_on_read(handle).unwrap());
+ let handle = spawn_global(async { 1 + 1 });
+ assert_eq!(2, block_on_global(handle).unwrap());
- let handle = spawn_write(async { 2 + 2 });
- assert_eq!(4, block_on_write(handle).unwrap());
+ let handle = spawn_compact(async { 2 + 2 });
+ assert_eq!(4, block_on_compact(handle).unwrap());
- let handle = spawn_bg(async { 3 + 3 });
- assert_eq!(6, block_on_bg(handle).unwrap());
-
- let handle = spawn_bg(async { 4 + 4 });
+ let handle = spawn_hb(async { 4 + 4 });
assert_eq!(8, block_on_hb(handle).unwrap());
}
@@ -253,8 +229,7 @@ mod tests {
};
}
- define_spawn_blocking_test!(read);
- define_spawn_blocking_test!(write);
- define_spawn_blocking_test!(bg);
+ define_spawn_blocking_test!(global);
+ define_spawn_blocking_test!(compact);
define_spawn_blocking_test!(hb);
}
diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs
index 868c2e70fc..4429f6fa71 100644
--- a/src/common/runtime/src/lib.rs
+++ b/src/common/runtime/src/lib.rs
@@ -19,9 +19,9 @@ mod repeated_task;
pub mod runtime;
pub use global::{
- bg_runtime, block_on_bg, block_on_read, block_on_write, create_runtime, init_global_runtimes,
- read_runtime, spawn_bg, spawn_blocking_bg, spawn_blocking_hb, spawn_blocking_read,
- spawn_blocking_write, spawn_hb, spawn_read, spawn_write, write_runtime,
+ block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime,
+ init_global_runtimes, spawn_blocking_compact, spawn_blocking_global, spawn_blocking_hb,
+ spawn_compact, spawn_global, spawn_hb,
};
pub use crate::repeated_task::{BoxedTaskFunction, RepeatedTask, TaskFunction};
diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs
index cf9f02ffdd..2431a2ee17 100644
--- a/src/common/runtime/src/repeated_task.rs
+++ b/src/common/runtime/src/repeated_task.rs
@@ -200,7 +200,7 @@ mod tests {
let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn));
- task.start(crate::bg_runtime()).unwrap();
+ task.start(crate::global_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();
@@ -217,7 +217,7 @@ mod tests {
let task = RepeatedTask::new(Duration::from_millis(100), Box::new(task_fn))
.with_initial_delay(Some(Duration::ZERO));
- task.start(crate::bg_runtime()).unwrap();
+ task.start(crate::global_runtime()).unwrap();
tokio::time::sleep(Duration::from_millis(550)).await;
task.stop().await.unwrap();
diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs
index 0368d1eec5..c6ef6cb3f6 100644
--- a/src/datanode/src/alive_keeper.rs
+++ b/src/datanode/src/alive_keeper.rs
@@ -187,7 +187,7 @@ impl RegionAliveKeeper {
let running = self.started.clone();
// Watches changes
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
loop {
if !running.load(Ordering::Relaxed) {
info!("RegionAliveKeeper stopped! Quits the watch loop!");
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 77c337e8fc..ceb40081d1 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -25,7 +25,6 @@ use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::prepare_wal_options;
pub use common_procedure::options::ProcedureConfig;
-use common_runtime::Runtime;
use common_telemetry::{error, info, warn};
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
@@ -55,8 +54,8 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
- MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
- ShutdownServerSnafu, StartServerSnafu,
+ MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, ShutdownServerSnafu,
+ StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -224,7 +223,7 @@ impl DatanodeBuilder {
if self.opts.init_regions_in_background {
// Opens regions in background.
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
if let Err(err) = open_all_regions.await {
error!(err; "Failed to open regions during the startup.");
}
@@ -327,18 +326,10 @@ impl DatanodeBuilder {
);
let query_engine = query_engine_factory.query_engine();
- let runtime = Arc::new(
- Runtime::builder()
- .worker_threads(opts.grpc.runtime_size)
- .thread_name("io-handlers")
- .build()
- .context(RuntimeResourceSnafu)?,
- );
-
let table_provider_factory = Arc::new(DummyTableProviderFactory);
let mut region_server = RegionServer::with_table_provider(
query_engine,
- runtime,
+ common_runtime::global_runtime(),
event_listener,
table_provider_factory,
);
diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs
index 6b581e89ed..48320f9b11 100644
--- a/src/datanode/src/heartbeat/handler.rs
+++ b/src/datanode/src/heartbeat/handler.rs
@@ -108,7 +108,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let handler = Self::build_handler(instruction)?;
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
diff --git a/src/datanode/src/heartbeat/task_tracker.rs b/src/datanode/src/heartbeat/task_tracker.rs
index 6267547222..977054661c 100644
--- a/src/datanode/src/heartbeat/task_tracker.rs
+++ b/src/datanode/src/heartbeat/task_tracker.rs
@@ -156,7 +156,7 @@ impl TaskTracker {
} else {
let moved_inner = self.inner.clone();
let (tx, rx) = watch::channel(TaskState::::Running);
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
match fut.await {
Ok(result) => {
let _ = tx.send(TaskState::Done(result));
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index 83225334ea..20441b86f6 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -82,7 +82,7 @@ pub struct RegionStat {
impl RegionServer {
pub fn new(
query_engine: QueryEngineRef,
- runtime: Arc,
+ runtime: Runtime,
event_listener: RegionServerEventListenerRef,
) -> Self {
Self::with_table_provider(
@@ -95,7 +95,7 @@ impl RegionServer {
pub fn with_table_provider(
query_engine: QueryEngineRef,
- runtime: Arc,
+ runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
) -> Self {
@@ -286,7 +286,7 @@ impl RegionServer {
}
}
- pub fn runtime(&self) -> Arc {
+ pub fn runtime(&self) -> Runtime {
self.inner.runtime.clone()
}
@@ -447,7 +447,7 @@ struct RegionServerInner {
engines: RwLock>,
region_map: DashMap,
query_engine: QueryEngineRef,
- runtime: Arc,
+ runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
}
@@ -475,7 +475,7 @@ impl Debug for CurrentEngine {
impl RegionServerInner {
pub fn new(
query_engine: QueryEngineRef,
- runtime: Arc,
+ runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
) -> Self {
@@ -860,7 +860,7 @@ impl RegionServerInner {
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
- // The walkaround is to put the async functions in the `common_runtime::spawn_bg`. Or like
+ // The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// it here, collect the values first then use later separately.
let regions = self
diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs
index b115b366c4..645f871acd 100644
--- a/src/datanode/src/tests.rs
+++ b/src/datanode/src/tests.rs
@@ -92,7 +92,7 @@ impl QueryEngine for MockQueryEngine {
pub fn mock_region_server() -> RegionServer {
RegionServer::new(
Arc::new(MockQueryEngine),
- Arc::new(Runtime::builder().build().unwrap()),
+ Runtime::builder().build().unwrap(),
Box::new(NoopRegionServerEventListener),
)
}
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 5ad69feb5f..60336f548f 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -467,7 +467,7 @@ impl FlowWorkerManager {
shutdown: Option>,
) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
self.run(shutdown).await;
})
}
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index 51ebdda1d6..a8c850349f 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -181,7 +181,7 @@ impl servers::server::Server for FlownodeServer {
let builder = tonic::transport::Server::builder().add_service(self.create_flow_service());
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
let _result = builder
.serve_with_incoming_shutdown(incoming, rx_server.recv().map(drop))
.await
diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs
index 98c138cfb4..9660b27620 100644
--- a/src/frontend/src/server.rs
+++ b/src/frontend/src/server.rs
@@ -18,7 +18,6 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::{Configurable, Mode};
-use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
@@ -65,20 +64,12 @@ where
}
pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result {
- let grpc_runtime = Arc::new(
- RuntimeBuilder::default()
- .worker_threads(opts.runtime_size)
- .thread_name("grpc-handlers")
- .build()
- .context(error::RuntimeResourceSnafu)?,
- );
-
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
tls: opts.tls.clone(),
};
- let builder = GrpcServerBuilder::new(grpc_config, grpc_runtime)
+ let builder = GrpcServerBuilder::new(grpc_config, common_runtime::global_runtime())
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
Ok(builder)
@@ -224,15 +215,8 @@ where
// will not watch if watch is disabled in tls option
maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
- let mysql_io_runtime = Arc::new(
- RuntimeBuilder::default()
- .worker_threads(opts.runtime_size)
- .thread_name("mysql-io-handlers")
- .build()
- .context(error::RuntimeResourceSnafu)?,
- );
let mysql_server = MysqlServer::create_server(
- mysql_io_runtime,
+ common_runtime::global_runtime(),
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
user_provider.clone(),
@@ -257,19 +241,11 @@ where
maybe_watch_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
- let pg_io_runtime = Arc::new(
- RuntimeBuilder::default()
- .worker_threads(opts.runtime_size)
- .thread_name("pg-io-handlers")
- .build()
- .context(error::RuntimeResourceSnafu)?,
- );
-
let pg_server = Box::new(PostgresServer::new(
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
opts.tls.should_force_tls(),
tls_server_config,
- pg_io_runtime,
+ common_runtime::global_runtime(),
user_provider.clone(),
)) as Box;
diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs
index c9632e6ea3..d2a210fb42 100644
--- a/src/log-store/src/raft_engine/log_store.rs
+++ b/src/log-store/src/raft_engine/log_store.rs
@@ -111,7 +111,7 @@ impl RaftEngineLogStore {
fn start(&self) -> Result<()> {
self.gc_task
- .start(common_runtime::bg_runtime())
+ .start(common_runtime::global_runtime())
.context(StartGcTaskSnafu)
}
@@ -279,7 +279,7 @@ impl LogStore for RaftEngineLogStore {
);
let max_batch_size = self.config.read_batch_size;
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
- let _handle = common_runtime::spawn_read(async move {
+ let _handle = common_runtime::spawn_global(async move {
while start_index <= last_index {
let mut vec = Vec::with_capacity(max_batch_size);
match engine
diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs
index bb9a9984f8..fef7e928a7 100644
--- a/src/meta-srv/src/election/etcd.rs
+++ b/src/meta-srv/src/election/etcd.rs
@@ -68,7 +68,7 @@ impl EtcdElection {
let leader_ident = leader_value.clone();
let (tx, mut rx) = broadcast::channel(100);
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => match msg {
diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs
index 07a32cc294..d74d49027d 100644
--- a/src/meta-srv/src/handler.rs
+++ b/src/meta-srv/src/handler.rs
@@ -316,7 +316,7 @@ impl HeartbeatMailbox {
let mailbox = Arc::new(Self::new(pushers, sequence));
let timeout_checker = mailbox.clone();
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
timeout_checker.check_timeout_bg(10).await;
});
diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs
index 4024a77af8..f8acdd75c2 100644
--- a/src/meta-srv/src/handler/failure_handler.rs
+++ b/src/meta-srv/src/handler/failure_handler.rs
@@ -31,7 +31,7 @@ impl RegionFailureHandler {
heartbeat_acceptor: HeartbeatAcceptor,
) -> Self {
info!("Starting region supervisor");
- common_runtime::spawn_bg(async move { region_supervisor.run().await });
+ common_runtime::spawn_global(async move { region_supervisor.run().await });
Self { heartbeat_acceptor }
}
}
diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs
index 5eceddac04..53451591da 100644
--- a/src/meta-srv/src/lock.rs
+++ b/src/meta-srv/src/lock.rs
@@ -89,7 +89,7 @@ impl Drop for DistLockGuard<'_> {
if let Some(key) = self.key.take() {
let lock = self.lock.clone();
let name = self.name.clone();
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
if let Err(e) = lock.unlock(key).await {
error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name));
}
diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs
index d92798a113..138490fb8c 100644
--- a/src/meta-srv/src/metasrv.rs
+++ b/src/meta-srv/src/metasrv.rs
@@ -400,7 +400,7 @@ impl Metasrv {
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
region_supervisor_ticker,
};
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
loop {
match rx.recv().await {
Ok(msg) => {
@@ -436,7 +436,7 @@ impl Metasrv {
let election = election.clone();
let started = self.started.clone();
let node_info = self.node_info();
- let _handle = common_runtime::spawn_bg(async move {
+ let _handle = common_runtime::spawn_global(async move {
while started.load(Ordering::Relaxed) {
let res = election.register_candidate(&node_info).await;
if let Err(e) = res {
@@ -450,7 +450,7 @@ impl Metasrv {
{
let election = election.clone();
let started = self.started.clone();
- let _handle = common_runtime::spawn_write(async move {
+ let _handle = common_runtime::spawn_global(async move {
while started.load(Ordering::Relaxed) {
let res = election.campaign().await;
if let Err(e) = res {
diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
index 340ef375a6..22b25492e2 100644
--- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs
@@ -396,7 +396,7 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
// retry: 0.
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
@@ -445,7 +445,7 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
for _ in 0..3 {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs
index e9080e7fd5..73aa4371f4 100644
--- a/src/meta-srv/src/procedure/region_migration/manager.rs
+++ b/src/meta-srv/src/procedure/region_migration/manager.rs
@@ -347,7 +347,7 @@ impl RegionMigrationManager {
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");
let procedure_manager = self.procedure_manager.clone();
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs
index 6cc8ea12a5..1fb63b4a34 100644
--- a/src/meta-srv/src/procedure/region_migration/test_util.rs
+++ b/src/meta-srv/src/procedure/region_migration/test_util.rs
@@ -291,7 +291,7 @@ pub fn send_mock_reply(
mut rx: MockHeartbeatReceiver,
msg: impl Fn(u64) -> Result + Send + 'static,
) {
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
while let Some(Ok(resp)) = rx.recv().await {
let reply_id = resp.mailbox_message.unwrap().id;
mailbox.on_recv(reply_id, msg(reply_id)).await.unwrap();
diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
index 5591427d0d..5a80cebb65 100644
--- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
+++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs
@@ -438,7 +438,7 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
@@ -497,7 +497,7 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
.await;
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let resp = rx.recv().await.unwrap().unwrap();
let reply_id = resp.mailbox_message.unwrap().id;
mailbox
diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs
index 67b0f496c5..09f0400ba1 100644
--- a/src/meta-srv/src/procedure/utils.rs
+++ b/src/meta-srv/src/procedure/utils.rs
@@ -34,7 +34,7 @@ pub mod mock {
/// An mock implementation of region server that simply echoes the request.
#[derive(Clone)]
pub struct EchoRegionServer {
- runtime: Arc,
+ runtime: Runtime,
received_requests: mpsc::Sender,
}
@@ -43,7 +43,7 @@ pub mod mock {
let (tx, rx) = mpsc::channel(10);
(
Self {
- runtime: Arc::new(RuntimeBuilder::default().worker_threads(2).build().unwrap()),
+ runtime: RuntimeBuilder::default().worker_threads(2).build().unwrap(),
received_requests: tx,
},
rx,
diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs
index 4f738e38f3..358644c510 100644
--- a/src/meta-srv/src/service/heartbeat.rs
+++ b/src/meta-srv/src/service/heartbeat.rs
@@ -46,7 +46,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
let (tx, rx) = mpsc::channel(128);
let handler_group = self.handler_group().clone();
let ctx = self.new_ctx();
- let _handle = common_runtime::spawn_write(async move {
+ let _handle = common_runtime::spawn_global(async move {
let mut pusher_key = None;
while let Some(msg) = in_stream.next().await {
let mut is_not_leader = false;
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index cd49af7b11..9c8c0e02bd 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -334,7 +334,7 @@ impl Compactor for DefaultCompactor {
Vec::with_capacity(crate::compaction::task::MAX_PARALLEL_COMPACTION);
for _ in 0..crate::compaction::task::MAX_PARALLEL_COMPACTION {
if let Some(task) = futs.pop() {
- task_chunk.push(common_runtime::spawn_bg(task));
+ task_chunk.push(common_runtime::spawn_compact(task));
}
}
let metas = futures::future::try_join_all(task_chunk)
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index d92cd1044d..d7f2ea034d 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -337,7 +337,7 @@ impl EngineInner {
// Waits for entries distribution.
let distribution =
- common_runtime::spawn_read(async move { distributor.distribute().await });
+ common_runtime::spawn_global(async move { distributor.distribute().await });
// Waits for worker returns.
let responses = join_all(responses).await;
diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs
index c0e5b0d35a..c9ca65bfdd 100644
--- a/src/mito2/src/manifest/checkpointer.rs
+++ b/src/mito2/src/manifest/checkpointer.rs
@@ -165,7 +165,7 @@ impl Checkpointer {
self.inner.set_doing_checkpoint();
let inner = self.inner.clone();
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
inner.do_checkpoint(checkpoint).await;
});
}
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index d336dcc2e6..dfe78795ca 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -665,7 +665,7 @@ impl ScanInput {
semaphore: Arc,
sender: mpsc::Sender>,
) {
- common_runtime::spawn_read(async move {
+ common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
diff --git a/src/mito2/src/schedule/scheduler.rs b/src/mito2/src/schedule/scheduler.rs
index 9c2d5c20ab..1b2d34cb58 100644
--- a/src/mito2/src/schedule/scheduler.rs
+++ b/src/mito2/src/schedule/scheduler.rs
@@ -71,7 +71,7 @@ impl LocalScheduler {
let child = token.child_token();
let receiver = rx.clone();
let state_clone = state.clone();
- let handle = common_runtime::spawn_bg(async move {
+ let handle = common_runtime::spawn_global(async move {
while state_clone.load(Ordering::Relaxed) == STATE_RUNNING {
tokio::select! {
_ = child.cancelled() => {
diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs
index df49db75b6..b83101e3fb 100644
--- a/src/mito2/src/sst/index/puffin_manager.rs
+++ b/src/mito2/src/sst/index/puffin_manager.rs
@@ -95,7 +95,7 @@ impl PuffinManagerFactory {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let f = Self::new(tempdir.path().to_path_buf(), 1024, None);
- let factory = common_runtime::block_on_bg(f).unwrap();
+ let factory = common_runtime::block_on_global(f).unwrap();
(tempdir, factory)
}
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index 35b65ad0d7..c8aa1bb340 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -438,7 +438,7 @@ impl WorkerStarter {
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
};
- let handle = common_runtime::spawn_write(async move {
+ let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
});
@@ -830,7 +830,7 @@ impl RegionWorkerLoop {
) {
if let Some(region) = self.regions.get_region(region_id) {
// We need to do this in background as we need the manifest lock.
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
region.set_readonly_gracefully().await;
let last_entry_id = region.version_control.current().last_entry_id;
diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs
index 06a439cc5e..ca14662497 100644
--- a/src/mito2/src/worker/handle_drop.rs
+++ b/src/mito2/src/worker/handle_drop.rs
@@ -100,7 +100,7 @@ where
let object_store = region.access_layer.object_store().clone();
let dropping_regions = self.dropping_regions.clone();
let listener = self.listener.clone();
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let gc_duration = listener
.on_later_drop_begin(region_id)
.unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs
index 60ace00cd5..e12f139b5b 100644
--- a/src/mito2/src/worker/handle_manifest.rs
+++ b/src/mito2/src/worker/handle_manifest.rs
@@ -57,7 +57,7 @@ impl RegionWorkerLoop {
let request_sender = self.sender.clone();
// Now the region is in editing state.
// Updates manifest in background.
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let result = edit_region(®ion, edit.clone()).await;
let notify = WorkerRequest::Background {
region_id,
@@ -125,7 +125,7 @@ impl RegionWorkerLoop {
let manifest_ctx = region.manifest_ctx.clone();
// Updates manifest in background.
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
// Write region truncated to manifest.
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
@@ -167,7 +167,7 @@ impl RegionWorkerLoop {
let request_sender = self.sender.clone();
// Now the region is in altering state.
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
let new_meta = change.metadata.clone();
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs
index d87f531a72..7fe1d3c322 100644
--- a/src/mito2/src/worker/handle_open.rs
+++ b/src/mito2/src/worker/handle_open.rs
@@ -113,7 +113,7 @@ impl RegionWorkerLoop {
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
opening_regions.insert_sender(region_id, sender);
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs
index 20c7b5381f..ac78350a50 100644
--- a/src/operator/src/delete.rs
+++ b/src/operator/src/delete.rs
@@ -134,7 +134,7 @@ impl Deleter {
.map(|(peer, deletes)| {
let request = request_factory.build_delete(deletes);
let node_manager = self.node_manager.clone();
- common_runtime::spawn_write(async move {
+ common_runtime::spawn_global(async move {
node_manager
.datanode(&peer)
.await
diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs
index 38e79de6c9..4abda29c2e 100644
--- a/src/operator/src/insert.rs
+++ b/src/operator/src/insert.rs
@@ -283,7 +283,7 @@ impl Inserter {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
- common_runtime::spawn_bg(async move {
+ common_runtime::spawn_global(async move {
node_manager
.flownode(&peer)
.await
@@ -320,7 +320,7 @@ impl Inserter {
.map(|(peer, inserts)| {
let node_manager = self.node_manager.clone();
let request = request_factory.build_insert(inserts);
- common_runtime::spawn_write(async move {
+ common_runtime::spawn_global(async move {
node_manager
.datanode(&peer)
.await
diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs
index 9611bbfab1..64a6a75c31 100644
--- a/src/operator/src/request.rs
+++ b/src/operator/src/request.rs
@@ -171,7 +171,7 @@ impl Requester {
let request = request_factory.build_request(req_body.clone());
let partition_manager = self.partition_manager.clone();
let node_manager = self.node_manager.clone();
- common_runtime::spawn_write(async move {
+ common_runtime::spawn_global(async move {
let peer =
Self::find_region_leader_by_request(partition_manager, &req_body).await?;
node_manager
diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
index b59c72bad7..3d3d2c8804 100644
--- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
+++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
@@ -184,7 +184,7 @@ where
let reader = accessor.reader(&puffin_file_name).await?;
let writer = writer_provider.writer(&file_meta.relative_path).await?;
- let task = common_runtime::spawn_read(async move {
+ let task = common_runtime::spawn_global(async move {
let mut file = PuffinFileReader::new(reader);
let reader = file.blob_reader(&blob_meta)?;
let compression = blob_meta.compression_codec;
diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs
index 9294497a06..c41df95c25 100644
--- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs
+++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs
@@ -89,7 +89,7 @@ impl BoundedStager {
.build();
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
- common_runtime::bg_runtime().spawn(Self::delete_routine(rx));
+ common_runtime::global_runtime().spawn(Self::delete_routine(rx));
let stager = Self {
cache,
diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs
index a82b10418d..4662922f14 100644
--- a/src/script/src/python/utils.rs
+++ b/src/script/src/python/utils.rs
@@ -36,7 +36,7 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
- common_runtime::spawn_blocking_bg(f)
+ common_runtime::spawn_blocking_global(f)
}
/// Please only use this method because you are calling from (optionally first as async) to sync then to a async
@@ -50,7 +50,7 @@ where
F: Future