diff --git a/Cargo.lock b/Cargo.lock
index 5b3291408d..33cb55bbf6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3949,6 +3949,7 @@ dependencies = [
"common-telemetry",
"common-time",
"common-version",
+ "config",
"datafusion",
"datafusion-common",
"datafusion-expr",
diff --git a/config/config.md b/config/config.md
index 5a0e46763f..f14148bfcc 100644
--- a/config/config.md
+++ b/config/config.md
@@ -91,6 +91,8 @@
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
+| `flow` | -- | -- | flow engine options. |
+| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `/tmp/greptimedb/` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.
- `File`: the data is stored in the local file system.
- `S3`: the data is stored in the S3 object storage.
- `Gcs`: the data is stored in the Google Cloud Storage.
- `Azblob`: the data is stored in the Azure Blob Storage.
- `Oss`: the data is stored in the Aliyun OSS. |
@@ -536,6 +538,8 @@
| --- | -----| ------- | ----------- |
| `mode` | String | `distributed` | The running mode of the flownode. It can be `standalone` or `distributed`. |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
+| `flow` | -- | -- | flow engine options. |
+| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname advertised to the metasrv,
and used for connections from outside the host |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index ffa9924365..b27076a4c8 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -5,6 +5,12 @@ mode = "distributed"
## @toml2docs:none-default
node_id = 14
+## flow engine options.
+[flow]
+## The number of flow worker in flownode.
+## Not setting(or set to 0) this value will use the number of CPU cores divided by 2.
+#+num_workers=0
+
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index bd5b7b073b..275abf40e4 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -284,6 +284,12 @@ max_retry_times = 3
## Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
+## flow engine options.
+[flow]
+## The number of flow worker in flownode.
+## Not setting(or set to 0) this value will use the number of CPU cores divided by 2.
+#+num_workers=0
+
# Example of using S3 as the storage.
# [storage]
# type = "S3"
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index d72ec5ef43..c52499eccf 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -54,7 +54,7 @@ use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, Sto
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use file_engine::config::EngineConfig as FileEngineConfig;
-use flow::{FlowWorkerManager, FlownodeBuilder, FrontendInvoker};
+use flow::{FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeOptions, FrontendInvoker};
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
@@ -145,6 +145,7 @@ pub struct StandaloneOptions {
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
+ pub flow: FlowConfig,
pub logging: LoggingOptions,
pub user_provider: Option,
/// Options for different store engines.
@@ -173,6 +174,7 @@ impl Default for StandaloneOptions {
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
+ flow: FlowConfig::default(),
logging: LoggingOptions::default(),
export_metrics: ExportMetricsOption::default(),
user_provider: None,
@@ -523,8 +525,12 @@ impl StartCommand {
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
+ let flownode_options = FlownodeOptions {
+ flow: opts.flow.clone(),
+ ..Default::default()
+ };
let flow_builder = FlownodeBuilder::new(
- Default::default(),
+ flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
diff --git a/src/common/config/src/config.rs b/src/common/config/src/config.rs
index e0816fbd56..f3cefa90b5 100644
--- a/src/common/config/src/config.rs
+++ b/src/common/config/src/config.rs
@@ -73,14 +73,21 @@ pub trait Configurable: Serialize + DeserializeOwned + Default + Sized {
layered_config = layered_config.add_source(File::new(config_file, FileFormat::Toml));
}
- let opts = layered_config
+ let mut opts: Self = layered_config
.build()
.and_then(|x| x.try_deserialize())
.context(LoadLayeredConfigSnafu)?;
+ opts.validate_sanitize()?;
+
Ok(opts)
}
+ /// Validate(and possibly sanitize) the configuration.
+ fn validate_sanitize(&mut self) -> Result<()> {
+ Ok(())
+ }
+
/// List of toml keys that should be parsed as a list.
fn env_list_keys() -> Option<&'static [&'static str]> {
None
diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml
index c306a59a1a..6038d0f47a 100644
--- a/src/flow/Cargo.toml
+++ b/src/flow/Cargo.toml
@@ -32,6 +32,7 @@ common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-version.workspace = true
+config.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 6e16b7340a..8a3c4494b9 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -47,7 +47,7 @@ use tokio::sync::{broadcast, watch, Mutex, RwLock};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
-use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
+pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
@@ -80,6 +80,21 @@ pub const UPDATE_AT_TS_COL: &str = "update_at";
pub type FlowId = u64;
pub type TableName = [String; 3];
+/// Flow config that exists both in standalone&distributed mode
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[serde(default)]
+pub struct FlowConfig {
+ pub num_workers: usize,
+}
+
+impl Default for FlowConfig {
+ fn default() -> Self {
+ Self {
+ num_workers: (common_config::utils::get_cpus() / 2).max(1),
+ }
+ }
+}
+
/// Options for flow node
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
@@ -87,6 +102,7 @@ pub struct FlownodeOptions {
pub mode: Mode,
pub cluster_id: Option,
pub node_id: Option,
+ pub flow: FlowConfig,
pub grpc: GrpcOptions,
pub meta_client: Option,
pub logging: LoggingOptions,
@@ -100,6 +116,7 @@ impl Default for FlownodeOptions {
mode: servers::Mode::Standalone,
cluster_id: None,
node_id: None,
+ flow: FlowConfig::default(),
grpc: GrpcOptions::default().with_addr("127.0.0.1:3004"),
meta_client: None,
logging: LoggingOptions::default(),
@@ -109,7 +126,14 @@ impl Default for FlownodeOptions {
}
}
-impl Configurable for FlownodeOptions {}
+impl Configurable for FlownodeOptions {
+ fn validate_sanitize(&mut self) -> common_config::error::Result<()> {
+ if self.flow.num_workers == 0 {
+ self.flow.num_workers = (common_config::utils::get_cpus() / 2).max(1);
+ }
+ Ok(())
+ }
+}
/// Arc-ed FlowNodeManager, cheaper to clone
pub type FlowWorkerManagerRef = Arc;
@@ -121,6 +145,8 @@ pub struct FlowWorkerManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec>,
+ /// The selector to select a worker to run the dataflow
+ worker_selector: Mutex,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
pub query_engine: Arc,
/// Getting table name and table schema from table info manager
@@ -162,6 +188,7 @@ impl FlowWorkerManager {
let worker_handles = Vec::new();
FlowWorkerManager {
worker_handles,
+ worker_selector: Mutex::new(0),
query_engine,
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
@@ -181,15 +208,22 @@ impl FlowWorkerManager {
}
/// Create a flownode manager with one worker
- pub fn new_with_worker<'s>(
+ pub fn new_with_workers<'s>(
node_id: Option,
query_engine: Arc,
table_meta: TableMetadataManagerRef,
- ) -> (Self, Worker<'s>) {
+ num_workers: usize,
+ ) -> (Self, Vec>) {
let mut zelf = Self::new(node_id, query_engine, table_meta);
- let (handle, worker) = create_worker();
- zelf.add_worker_handle(handle);
- (zelf, worker)
+
+ let workers: Vec<_> = (0..num_workers)
+ .map(|_| {
+ let (handle, worker) = create_worker();
+ zelf.add_worker_handle(handle);
+ worker
+ })
+ .collect();
+ (zelf, workers)
}
/// add a worker handler to manager, meaning this corresponding worker is under it's manage
@@ -830,7 +864,8 @@ impl FlowWorkerManager {
.write()
.await
.insert(flow_id, err_collector.clone());
- let handle = &self.worker_handles[0].lock().await;
+ // TODO(discord9): load balance?
+ let handle = &self.get_worker_handle_for_create_flow().await;
let create_request = worker::Request::Create {
flow_id,
plan: flow_plan,
diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs
index b851cf0e70..26b96f75b9 100644
--- a/src/flow/src/adapter/util.rs
+++ b/src/flow/src/adapter/util.rs
@@ -28,12 +28,28 @@ use snafu::{OptionExt, ResultExt};
use table::table_reference::TableReference;
use crate::adapter::table_source::TableDesc;
-use crate::adapter::{TableName, AUTO_CREATED_PLACEHOLDER_TS_COL};
+use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::FlowWorkerManager;
impl FlowWorkerManager {
+ /// Get a worker handle for creating flow, using round robin to select a worker
+ pub(crate) async fn get_worker_handle_for_create_flow(
+ &self,
+ ) -> tokio::sync::MutexGuard {
+ let mut selector = self.worker_selector.lock().await;
+
+ *selector += 1;
+ if *selector >= self.worker_handles.len() {
+ *selector = 0
+ };
+
+ // Safety: selector is always in bound
+ let handle = &self.worker_handles[*selector];
+ handle.lock().await
+ }
+
/// Create table from given schema(will adjust to add auto column if needed), return true if table is created
pub(crate) async fn create_table_from_relation(
&self,
diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs
index 8d07369afa..a186e57d89 100644
--- a/src/flow/src/lib.rs
+++ b/src/flow/src/lib.rs
@@ -41,6 +41,6 @@ mod utils;
#[cfg(test)]
mod test_utils;
-pub use adapter::{FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
+pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions};
pub use error::{Error, Result};
pub use server::{FlownodeBuilder, FlownodeInstance, FlownodeServer, FrontendInvoker};
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index 2cc2f5644f..4ecda0b66f 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -48,7 +48,7 @@ use tonic::codec::CompressionEncoding;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
-use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
+use crate::adapter::{create_worker, CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
@@ -414,24 +414,30 @@ impl FlownodeBuilder {
register_function_to_query_engine(&query_engine);
- let (tx, rx) = oneshot::channel();
+ let num_workers = self.opts.flow.num_workers;
let node_id = self.opts.node_id.map(|id| id as u32);
- let _handle = std::thread::Builder::new()
- .name("flow-worker".to_string())
- .spawn(move || {
- let (flow_node_manager, mut worker) =
- FlowWorkerManager::new_with_worker(node_id, query_engine, table_meta);
- let _ = tx.send(flow_node_manager);
- info!("Flow Worker started in new thread");
- worker.run();
- });
- let mut man = rx.await.map_err(|_e| {
- UnexpectedSnafu {
- reason: "sender is dropped, failed to create flow node manager",
- }
- .build()
- })?;
+
+ let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta);
+ for worker_id in 0..num_workers {
+ let (tx, rx) = oneshot::channel();
+
+ let _handle = std::thread::Builder::new()
+ .name(format!("flow-worker-{}", worker_id))
+ .spawn(move || {
+ let (handle, mut worker) = create_worker();
+ let _ = tx.send(handle);
+ info!("Flow Worker started in new thread");
+ worker.run();
+ });
+ let worker_handle = rx.await.map_err(|e| {
+ UnexpectedSnafu {
+ reason: format!("Failed to receive worker handle: {}", e),
+ }
+ .build()
+ })?;
+ man.add_worker_handle(worker_handle);
+ }
if let Some(handler) = self.state_report_handler.take() {
man = man.with_state_report_handler(handler).await;
}
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 917660a5c3..8aa3254d2a 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -874,6 +874,8 @@ purge_threshold = "4GiB"
max_retry_times = 3
retry_delay = "500ms"
+[flow]
+
[logging]
max_log_files = 720
append_stdout = true