diff --git a/config/config.md b/config/config.md
index 2e9d958735..780239142d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -598,3 +598,5 @@
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
+| `query` | -- | -- | -- |
+| `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode.
Default to 1, so it won't use too much cpu or memory |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index 76ddda2540..d1d4d9e341 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -108,3 +108,8 @@ default_ratio = 1.0
## The tokio console address.
## @toml2docs:none-default
#+ tokio_console_addr = "127.0.0.1"
+
+[query]
+## Parallelism of the query engine for query sent by flownode.
+## Default to 1, so it won't use too much cpu or memory
+parallelism = 1
diff --git a/src/client/src/database.rs b/src/client/src/database.rs
index ad6523cb67..8fabd7897d 100644
--- a/src/client/src/database.rs
+++ b/src/client/src/database.rs
@@ -196,12 +196,22 @@ impl Database {
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
/// is `max_retries * GRPC_CONN_TIMEOUT`
- pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result {
+ pub async fn handle_with_retry(
+ &self,
+ request: Request,
+ max_retries: u32,
+ hints: &[(&str, &str)],
+ ) -> Result {
let mut client = make_database_client(&self.client)?.inner;
let mut retries = 0;
+
let request = self.to_rpc_request(request);
+
loop {
- let raw_response = client.handle(request.clone()).await;
+ let mut tonic_request = tonic::Request::new(request.clone());
+ let metadata = tonic_request.metadata_mut();
+ Self::put_hints(metadata, hints)?;
+ let raw_response = client.handle(tonic_request).await;
match (raw_response, retries < max_retries) {
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
(Err(err), true) => {
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index fd3857a918..52a26857cb 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -371,8 +371,11 @@ impl StartCommand {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
- let frontend_client =
- FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
+ let frontend_client = FrontendClient::from_meta_client(
+ meta_client.clone(),
+ flow_auth_header,
+ opts.query.clone(),
+ );
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index ed9381b3f7..b211ad83b9 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -564,7 +564,7 @@ impl StartCommand {
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
- FrontendClient::from_empty_grpc_handler();
+ FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index be74da35f9..feb491c034 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -30,6 +30,7 @@ use meta_srv::metasrv::MetasrvOptions;
use meta_srv::selector::SelectorType;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
+use query::options::QueryOptions;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
@@ -216,12 +217,15 @@ fn test_load_flownode_example_config() {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
+ otlp_export_protocol: Some(common_telemetry::logging::OtlpExportProtocol::Http),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
- query: Default::default(),
+ // flownode deliberately use a slower query parallelism
+ // to avoid overwhelming the frontend with too many queries
+ query: QueryOptions { parallelism: 1 },
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 59974ab48f..9cf01f08b9 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -121,7 +121,9 @@ impl Default for FlownodeOptions {
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
- query: QueryOptions::default(),
+ // flownode's query option is set to 1 to throttle flow's query so
+ // that it won't use too much cpu or memory
+ query: QueryOptions { parallelism: 1 },
user_provider: None,
}
}
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index e70a58ebec..e32ab4e608 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -14,6 +14,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
+use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::SystemTime;
@@ -29,6 +30,8 @@ use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use meta_client::client::MetaClient;
+use query::datafusion::QUERY_PARALLELISM_HINT;
+use query::options::QueryOptions;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -84,27 +87,34 @@ pub enum FrontendClient {
meta_client: Arc,
chnl_mgr: ChannelManager,
auth: Option,
+ query: QueryOptions,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
database_client: HandlerMutable,
+ query: QueryOptions,
},
}
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
- pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
+ pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
(
Self::Standalone {
database_client: handler.clone(),
+ query,
},
handler,
)
}
- pub fn from_meta_client(meta_client: Arc, auth: Option) -> Self {
+ pub fn from_meta_client(
+ meta_client: Arc,
+ auth: Option,
+ query: QueryOptions,
+ ) -> Self {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
Self::Distributed {
meta_client,
@@ -115,12 +125,17 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
+ query,
}
}
- pub fn from_grpc_handler(grpc_handler: Weak) -> Self {
+ pub fn from_grpc_handler(
+ grpc_handler: Weak,
+ query: QueryOptions,
+ ) -> Self {
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
+ query,
}
}
}
@@ -193,6 +208,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
+ query: _,
} = self
else {
return UnexpectedSnafu {
@@ -281,7 +297,9 @@ impl FrontendClient {
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
- FrontendClient::Standalone { database_client } => {
+ FrontendClient::Standalone {
+ database_client, ..
+ } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
@@ -328,7 +346,7 @@ impl FrontendClient {
peer_desc: &mut Option,
) -> Result {
match self {
- FrontendClient::Distributed { .. } => {
+ FrontendClient::Distributed { query, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
@@ -336,16 +354,27 @@ impl FrontendClient {
});
db.database
- .handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
+ .handle_with_retry(
+ req.clone(),
+ GRPC_MAX_RETRIES,
+ &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
+ )
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
})
}
- FrontendClient::Standalone { database_client } => {
+ FrontendClient::Standalone {
+ database_client,
+ query,
+ } => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
+ .extensions(HashMap::from([(
+ QUERY_PARALLELISM_HINT.to_string(),
+ query.parallelism.to_string(),
+ )]))
.build();
let ctx = Arc::new(ctx);
{
diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs
index 0ecf68b488..7050f7ab94 100644
--- a/src/flow/src/batching_mode/state.rs
+++ b/src/flow/src/batching_mode/state.rs
@@ -76,38 +76,43 @@ impl TaskState {
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
- /// The delay is computed as follows:
- /// - If `time_window_size` is set, the delay is half the time window size, constrained to be
- /// at least `last_query_duration` and at most `max_timeout`.
- /// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
- /// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
///
- /// If there are dirty time windows, the function returns an immediate execution time to clean them.
- /// TODO: Make this behavior configurable.
+ /// next wait time is calculated as:
+ /// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
+ /// note at most wait for `max_timeout`.
+ ///
+ /// if current the dirty time range is longer than one query can handle,
+ /// execute immediately to faster clean up dirty time windows.
+ ///
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option,
max_timeout: Option,
) -> Instant {
- let last_duration = max_timeout
- .unwrap_or(self.last_query_duration)
- .min(self.last_query_duration)
- .max(MIN_REFRESH_DURATION);
+ // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
+ let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
+ let next_duration = self.last_query_duration.max(lower);
+ let next_duration = if let Some(max_timeout) = max_timeout {
+ next_duration.min(max_timeout)
+ } else {
+ next_duration
+ };
- let next_duration = time_window_size
- .map(|t| {
- let half = t / 2;
- half.max(last_duration)
- })
- .unwrap_or(last_duration);
-
- // if have dirty time window, execute immediately to clean dirty time window
- if self.dirty_time_windows.windows.is_empty() {
+ let cur_dirty_window_size = self.dirty_time_windows.window_size();
+ // compute how much time range can be handled in one query
+ let max_query_update_range = (*time_window_size)
+ .unwrap_or_default()
+ .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
+ // if dirty time range is more than one query can handle, execute immediately
+ // to faster clean up dirty time windows
+ if cur_dirty_window_size < max_query_update_range {
self.last_update_time + next_duration
} else {
+ // if dirty time windows can't be clean up in one query, execute immediately to faster
+ // clean up dirty time windows
debug!(
- "Flow id = {}, still have {} dirty time window({:?}), execute immediately",
+ "Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
flow_id,
self.dirty_time_windows.windows.len(),
self.dirty_time_windows.windows
@@ -147,6 +152,18 @@ impl DirtyTimeWindows {
}
}
+ pub fn window_size(&self) -> Duration {
+ let mut ret = Duration::from_secs(0);
+ for (start, end) in &self.windows {
+ if let Some(end) = end {
+ if let Some(duration) = end.sub(start) {
+ ret += duration.to_std().unwrap_or_default();
+ }
+ }
+ }
+ ret
+ }
+
pub fn add_window(&mut self, start: Timestamp, end: Option) {
self.windows.insert(start, end);
}
diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs
index 100216d24c..231a23ca2a 100644
--- a/src/query/src/datafusion.rs
+++ b/src/query/src/datafusion.rs
@@ -62,6 +62,10 @@ use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
use crate::{metrics, QueryEngine};
+/// Query parallelism hint key.
+/// This hint can be set in the query context to control the parallelism of the query execution.
+pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
+
pub struct DatafusionQueryEngine {
state: Arc,
plugins: Plugins,
@@ -480,7 +484,7 @@ impl QueryEngine for DatafusionQueryEngine {
state.config_mut().set_extension(query_ctx.clone());
// note that hints in "x-greptime-hints" is automatically parsed
// and set to query context's extension, so we can get it from query context.
- if let Some(parallelism) = query_ctx.extension("query_parallelism") {
+ if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
if let Ok(n) = parallelism.parse::() {
if n > 0 {
let new_cfg = state.config().clone().with_target_partitions(n as usize);
diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs
index e14b08e15e..4b15d13530 100644
--- a/tests-integration/src/standalone.rs
+++ b/tests-integration/src/standalone.rs
@@ -172,7 +172,7 @@ impl GreptimeDbStandaloneBuilder {
);
let (frontend_client, frontend_instance_handler) =
- FrontendClient::from_empty_grpc_handler();
+ FrontendClient::from_empty_grpc_handler(opts.query.clone());
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),