diff --git a/Cargo.lock b/Cargo.lock
index 8a47dc1982..1e0d5a09e2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4688,6 +4688,7 @@ dependencies = [
"get-size2",
"greptime-proto",
"http 1.1.0",
+ "humantime-serde",
"itertools 0.14.0",
"lazy_static",
"meta-client",
diff --git a/config/config.md b/config/config.md
index 780239142d..30140148d7 100644
--- a/config/config.md
+++ b/config/config.md
@@ -562,6 +562,16 @@
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
+| `flow.batching_mode` | -- | -- | -- |
+| `flow.batching_mode.query_timeout` | String | `600s` | The default batching engine query timeout is 10 minutes. |
+| `flow.batching_mode.slow_query_threshold` | String | `60s` | will output a warn log for any query that runs for more that this threshold |
+| `flow.batching_mode.experimental_min_refresh_duration` | String | `5s` | The minimum duration between two queries execution by batching mode task |
+| `flow.batching_mode.grpc_conn_timeout` | String | `5s` | The gRPC connection timeout |
+| `flow.batching_mode.experimental_grpc_max_retries` | Integer | `3` | The gRPC max retry number |
+| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting |
+| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout
if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
it will be removed from the list that flownode use to connect |
+| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
+| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,
and used for connections from outside the host |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index d1d4d9e341..cc9dd90705 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -7,6 +7,29 @@ node_id = 14
## The number of flow worker in flownode.
## Not setting(or set to 0) this value will use the number of CPU cores divided by 2.
#+num_workers=0
+[flow.batching_mode]
+## The default batching engine query timeout is 10 minutes.
+#+query_timeout="600s"
+## will output a warn log for any query that runs for more that this threshold
+#+slow_query_threshold="60s"
+## The minimum duration between two queries execution by batching mode task
+#+experimental_min_refresh_duration="5s"
+## The gRPC connection timeout
+#+grpc_conn_timeout="5s"
+## The gRPC max retry number
+#+experimental_grpc_max_retries=3
+## Flow wait for available frontend timeout,
+## if failed to find available frontend after frontend_scan_timeout elapsed, return error
+## which prevent flownode from starting
+#+experimental_frontend_scan_timeout="30s"
+## Frontend activity timeout
+## if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
+## it will be removed from the list that flownode use to connect
+#+experimental_frontend_activity_timeout="60s"
+## Maximum number of filters allowed in a single query
+#+experimental_max_filter_num_per_query=20
+## Time window merge distance
+#+experimental_time_window_merge_threshold=3
## The gRPC server options.
[grpc]
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index eb920a0a75..1a071e10a3 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -374,6 +374,7 @@ impl StartCommand {
meta_client.clone(),
flow_auth_header,
opts.query.clone(),
+ opts.flow.batching_mode.clone(),
);
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml
index 82377868fb..7d22f45b6c 100644
--- a/src/flow/Cargo.toml
+++ b/src/flow/Cargo.toml
@@ -48,6 +48,7 @@ futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
http.workspace = true
+humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 2acba4979e..a6ad3d1d40 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -50,6 +50,7 @@ use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
+use crate::batching_mode::BatchingModeOptions;
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
@@ -84,12 +85,14 @@ pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
#[serde(default)]
pub struct FlowConfig {
pub num_workers: usize,
+ pub batching_mode: BatchingModeOptions,
}
impl Default for FlowConfig {
fn default() -> Self {
Self {
num_workers: (common_config::utils::get_cpus() / 2).max(1),
+ batching_mode: BatchingModeOptions::default(),
}
}
}
diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs
index f68526508f..dfb54a382a 100644
--- a/src/flow/src/adapter/flownode_impl.rs
+++ b/src/flow/src/adapter/flownode_impl.rs
@@ -41,7 +41,6 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
-use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
@@ -440,13 +439,21 @@ struct ConsistentCheckTask {
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc) -> Result {
let engine = engine.clone();
+ let min_refresh_duration = engine
+ .batching_engine()
+ .batch_opts
+ .experimental_min_refresh_duration;
+ let frontend_scan_timeout = engine
+ .batching_engine()
+ .batch_opts
+ .experimental_frontend_scan_timeout;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
- .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
+ .wait_for_available_frontend(frontend_scan_timeout)
.await
{
warn!("No frontend is available yet:\n {err:?}");
@@ -459,9 +466,9 @@ impl ConsistentCheckTask {
error!(
"Failed to recover flows:\n {err:?}, retry {} in {}s",
recover_retry,
- MIN_REFRESH_DURATION.as_secs()
+ min_refresh_duration.as_secs()
);
- tokio::time::sleep(MIN_REFRESH_DURATION).await;
+ tokio::time::sleep(min_refresh_duration).await;
}
engine.set_done_recovering();
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 0e6f3bbb25..65e678bd00 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -16,6 +16,8 @@
use std::time::Duration;
+use serde::{Deserialize, Serialize};
+
pub(crate) mod engine;
pub(crate) mod frontend_client;
mod state;
@@ -23,27 +25,49 @@ mod task;
mod time_window;
mod utils;
-/// TODO(discord9): make those constants configurable
-/// The default batching engine query timeout is 10 minutes
-pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct BatchingModeOptions {
+ /// The default batching engine query timeout is 10 minutes
+ #[serde(with = "humantime_serde")]
+ pub query_timeout: Duration,
+ /// will output a warn log for any query that runs for more that this threshold
+ #[serde(with = "humantime_serde")]
+ pub slow_query_threshold: Duration,
+ /// The minimum duration between two queries execution by batching mode task
+ #[serde(with = "humantime_serde")]
+ pub experimental_min_refresh_duration: Duration,
+ /// The gRPC connection timeout
+ #[serde(with = "humantime_serde")]
+ pub grpc_conn_timeout: Duration,
+ /// The gRPC max retry number
+ pub experimental_grpc_max_retries: u32,
+ /// Flow wait for available frontend timeout,
+ /// if failed to find available frontend after frontend_scan_timeout elapsed, return error
+ /// which prevent flownode from starting
+ #[serde(with = "humantime_serde")]
+ pub experimental_frontend_scan_timeout: Duration,
+ /// Frontend activity timeout
+ /// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
+ #[serde(with = "humantime_serde")]
+ pub experimental_frontend_activity_timeout: Duration,
+ /// Maximum number of filters allowed in a single query
+ pub experimental_max_filter_num_per_query: usize,
+ /// Time window merge distance
+ pub experimental_time_window_merge_threshold: usize,
+}
-/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
-pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
-
-/// The minimum duration between two queries execution by batching mode task
-pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
-
-/// Grpc connection timeout
-const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
-
-/// Grpc max retry number
-const GRPC_MAX_RETRIES: u32 = 3;
-
-/// Flow wait for available frontend timeout,
-/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
-/// which should prevent flownode from starting
-pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
-
-/// Frontend activity timeout
-/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
-pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);
+impl Default for BatchingModeOptions {
+ fn default() -> Self {
+ Self {
+ query_timeout: Duration::from_secs(10 * 60),
+ slow_query_threshold: Duration::from_secs(60),
+ experimental_min_refresh_duration: Duration::new(5, 0),
+ grpc_conn_timeout: Duration::from_secs(5),
+ experimental_grpc_max_retries: 3,
+ experimental_frontend_scan_timeout: Duration::from_secs(30),
+ experimental_frontend_activity_timeout: Duration::from_secs(60),
+ experimental_max_filter_num_per_query: 20,
+ experimental_time_window_merge_threshold: 3,
+ }
+ }
+}
diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs
index e6ff6c54b3..f8d2398520 100644
--- a/src/flow/src/batching_mode/engine.rs
+++ b/src/flow/src/batching_mode/engine.rs
@@ -34,9 +34,10 @@ use store_api::storage::{RegionId, TableId};
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
-use crate::batching_mode::task::BatchingTask;
+use crate::batching_mode::task::{BatchingTask, TaskArgs};
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
+use crate::batching_mode::BatchingModeOptions;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
@@ -57,6 +58,9 @@ pub struct BatchingEngine {
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
+ /// Batching mode options for control how batching mode query works
+ ///
+ pub(crate) batch_opts: Arc,
}
impl BatchingEngine {
@@ -66,6 +70,7 @@ impl BatchingEngine {
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
+ batch_opts: BatchingModeOptions,
) -> Self {
Self {
tasks: Default::default(),
@@ -75,6 +80,7 @@ impl BatchingEngine {
table_meta,
catalog_manager,
query_engine,
+ batch_opts: Arc::new(batch_opts),
}
}
@@ -424,18 +430,21 @@ impl BatchingEngine {
.unwrap_or("None".to_string())
);
- let task = BatchingTask::try_new(
+ let task_args = TaskArgs {
flow_id,
- &sql,
+ query: &sql,
plan,
- phy_expr,
+ time_window_expr: phy_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
- self.catalog_manager.clone(),
- rx,
- )?;
+ catalog_manager: self.catalog_manager.clone(),
+ shutdown_rx: rx,
+ batch_opts: self.batch_opts.clone(),
+ };
+
+ let task = BatchingTask::try_new(task_args)?;
let task_inner = task.clone();
let engine = self.query_engine.clone();
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index e32ab4e608..eb29b6c5cd 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -38,10 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
-use crate::batching_mode::{
- DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
- GRPC_MAX_RETRIES,
-};
+use crate::batching_mode::BatchingModeOptions;
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
@@ -88,6 +85,7 @@ pub enum FrontendClient {
chnl_mgr: ChannelManager,
auth: Option,
query: QueryOptions,
+ batch_opts: BatchingModeOptions,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,18 +112,20 @@ impl FrontendClient {
meta_client: Arc,
auth: Option,
query: QueryOptions,
+ batch_opts: BatchingModeOptions,
) -> Self {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new()
- .connect_timeout(GRPC_CONN_TIMEOUT)
- .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
+ .connect_timeout(batch_opts.grpc_conn_timeout)
+ .timeout(batch_opts.query_timeout);
ChannelManager::with_config(cfg)
},
auth,
query,
+ batch_opts,
}
}
@@ -209,6 +209,7 @@ impl FrontendClient {
chnl_mgr,
auth,
query: _,
+ batch_opts,
} = self
else {
return UnexpectedSnafu {
@@ -217,9 +218,9 @@ impl FrontendClient {
.fail();
};
- let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
+ let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
interval.tick().await;
- for retry in 0..GRPC_MAX_RETRIES {
+ for retry in 0..batch_opts.experimental_grpc_max_retries {
let mut frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
@@ -233,7 +234,10 @@ impl FrontendClient {
.iter()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
- node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
+ node_info.last_activity_ts
+ + batch_opts
+ .experimental_frontend_activity_timeout
+ .as_millis() as i64
> now_in_ms
})
{
@@ -263,7 +267,7 @@ impl FrontendClient {
}
NoAvailableFrontendSnafu {
- timeout: GRPC_CONN_TIMEOUT,
+ timeout: batch_opts.grpc_conn_timeout,
context: "No available frontend found that is able to process query",
}
.fail()
@@ -346,7 +350,9 @@ impl FrontendClient {
peer_desc: &mut Option,
) -> Result {
match self {
- FrontendClient::Distributed { query, .. } => {
+ FrontendClient::Distributed {
+ query, batch_opts, ..
+ } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
@@ -356,7 +362,7 @@ impl FrontendClient {
db.database
.handle_with_retry(
req.clone(),
- GRPC_MAX_RETRIES,
+ batch_opts.experimental_grpc_max_retries,
&[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
)
.await
diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs
index a816b663bb..ca3d78e710 100644
--- a/src/flow/src/batching_mode/state.rs
+++ b/src/flow/src/batching_mode/state.rs
@@ -28,7 +28,6 @@ use tokio::time::Instant;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
-use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
@@ -89,10 +88,12 @@ impl TaskState {
&self,
flow_id: FlowId,
time_window_size: &Option,
+ min_refresh_duration: Duration,
max_timeout: Option,
+ max_filter_num_per_query: usize,
) -> Instant {
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
- let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
+ let lower = time_window_size.unwrap_or(min_refresh_duration);
let next_duration = self.last_query_duration.max(lower);
let next_duration = if let Some(max_timeout) = max_timeout {
next_duration.min(max_timeout)
@@ -104,7 +105,7 @@ impl TaskState {
// compute how much time range can be handled in one query
let max_query_update_range = (*time_window_size)
.unwrap_or_default()
- .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
+ .mul_f64(max_filter_num_per_query as f64);
// if dirty time range is more than one query can handle, execute immediately
// to faster clean up dirty time windows
if cur_dirty_window_size < max_query_update_range {
@@ -125,11 +126,36 @@ impl TaskState {
/// For keep recording of dirty time windows, which is time window that have new data inserted
/// since last query.
-#[derive(Debug, Clone, Default)]
+#[derive(Debug, Clone)]
pub struct DirtyTimeWindows {
/// windows's `start -> end` and non-overlapping
/// `end` is exclusive(and optional)
windows: BTreeMap>,
+ /// Maximum number of filters allowed in a single query
+ max_filter_num_per_query: usize,
+ /// Time window merge distance
+ ///
+ time_window_merge_threshold: usize,
+}
+
+impl DirtyTimeWindows {
+ pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
+ Self {
+ windows: BTreeMap::new(),
+ max_filter_num_per_query,
+ time_window_merge_threshold,
+ }
+ }
+}
+
+impl Default for DirtyTimeWindows {
+ fn default() -> Self {
+ Self {
+ windows: BTreeMap::new(),
+ max_filter_num_per_query: 20,
+ time_window_merge_threshold: 3,
+ }
+ }
}
impl DirtyTimeWindows {
@@ -138,9 +164,6 @@ impl DirtyTimeWindows {
/// TODO(discord9): make those configurable
pub const MERGE_DIST: i32 = 3;
- /// Maximum number of filters allowed in a single query
- pub const MAX_FILTER_NUM: usize = 20;
-
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
@@ -234,7 +257,7 @@ impl DirtyTimeWindows {
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
- if self.windows.len() > Self::MAX_FILTER_NUM {
+ if self.windows.len() > self.max_filter_num_per_query {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
@@ -243,7 +266,7 @@ impl DirtyTimeWindows {
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.config.flow_id,
self.windows.len(),
- Self::MAX_FILTER_NUM,
+ self.max_filter_num_per_query,
task_ctx.config.time_window_expr,
task_ctx.config.expire_after,
first_time_window,
@@ -254,7 +277,7 @@ impl DirtyTimeWindows {
warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
flow_id,
self.windows.len(),
- Self::MAX_FILTER_NUM,
+ self.max_filter_num_per_query,
first_time_window,
last_time_window
)
@@ -460,7 +483,7 @@ impl DirtyTimeWindows {
if lower_bound
.sub(&prev_upper)
- .map(|dist| dist <= window_size * Self::MERGE_DIST)
+ .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
.unwrap_or(false)
{
prev_tw.1 = Some(cur_upper);
@@ -508,18 +531,19 @@ mod test {
#[test]
fn test_merge_dirty_time_windows() {
+ let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
let testcases = vec![
// just enough to merge
(
vec![
Timestamp::new_second(0),
- Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
+ Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
- (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
+ (2 + merge_dist as i64) * 5 * 60,
)),
)]),
Some(
@@ -530,7 +554,7 @@ mod test {
(
vec![
Timestamp::new_second(0),
- Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
+ Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([
@@ -539,9 +563,9 @@ mod test {
Some(Timestamp::new_second(5 * 60)),
),
(
- Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
+ Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
Some(Timestamp::new_second(
- (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
+ (3 + merge_dist as i64) * 5 * 60,
)),
),
]),
@@ -553,13 +577,13 @@ mod test {
(
vec![
Timestamp::new_second(0),
- Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
+ Timestamp::new_second((merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
- (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
+ (1 + merge_dist as i64) * 5 * 60,
)),
)]),
Some(
@@ -570,14 +594,14 @@ mod test {
(
vec![
Timestamp::new_second(0),
- Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
- Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
+ Timestamp::new_second((merge_dist as i64) * 3),
+ Timestamp::new_second((merge_dist as i64) * 3 * 2),
],
(chrono::Duration::seconds(3), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
- (DirtyTimeWindows::MERGE_DIST as i64) * 7
+ (merge_dist as i64) * 7
)),
)]),
Some(
@@ -646,12 +670,12 @@ mod test {
(
vec![
Timestamp::new_second(0),
- Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
+ Timestamp::new_second((merge_dist as i64) * 5 * 60),
],
(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
- (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
+ (merge_dist as i64) * 6 * 60,
)),
),
BTreeMap::from([]),
@@ -674,7 +698,7 @@ mod test {
"ts",
expire_lower_bound,
window_size,
- DirtyTimeWindows::MAX_FILTER_NUM,
+ dirty.max_filter_num_per_query,
0,
None,
)
diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs
index 9f203bb8d3..ebce09e367 100644
--- a/src/flow/src/batching_mode/task.rs
+++ b/src/flow/src/batching_mode/task.rs
@@ -46,15 +46,13 @@ use tokio::time::Instant;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
-use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
+use crate::batching_mode::state::TaskState;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
FindGroupByFinalName,
};
-use crate::batching_mode::{
- DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
-};
+use crate::batching_mode::BatchingModeOptions;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
@@ -81,6 +79,7 @@ pub struct TaskConfig {
pub source_table_names: HashSet<[String; 3]>,
catalog_manager: CatalogManagerRef,
query_type: QueryType,
+ batch_opts: Arc,
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result {
@@ -116,19 +115,37 @@ pub struct BatchingTask {
pub state: Arc>,
}
+/// Arguments for creating batching task
+pub struct TaskArgs<'a> {
+ pub flow_id: FlowId,
+ pub query: &'a str,
+ pub plan: LogicalPlan,
+ pub time_window_expr: Option,
+ pub expire_after: Option,
+ pub sink_table_name: [String; 3],
+ pub source_table_names: Vec<[String; 3]>,
+ pub query_ctx: QueryContextRef,
+ pub catalog_manager: CatalogManagerRef,
+ pub shutdown_rx: oneshot::Receiver<()>,
+ pub batch_opts: Arc,
+}
+
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
- flow_id: FlowId,
- query: &str,
- plan: LogicalPlan,
- time_window_expr: Option,
- expire_after: Option,
- sink_table_name: [String; 3],
- source_table_names: Vec<[String; 3]>,
- query_ctx: QueryContextRef,
- catalog_manager: CatalogManagerRef,
- shutdown_rx: oneshot::Receiver<()>,
+ TaskArgs {
+ flow_id,
+ query,
+ plan,
+ time_window_expr,
+ expire_after,
+ sink_table_name,
+ source_table_names,
+ query_ctx,
+ catalog_manager,
+ shutdown_rx,
+ batch_opts,
+ }: TaskArgs<'_>,
) -> Result {
Ok(Self {
config: Arc::new(TaskConfig {
@@ -141,6 +158,7 @@ impl BatchingTask {
catalog_manager,
output_schema: plan.schema().clone(),
query_type: determine_query_type(query, &query_ctx)?,
+ batch_opts,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
})
@@ -386,7 +404,7 @@ impl BatchingTask {
}
// record slow query
- if elapsed >= SLOW_QUERY_THRESHOLD {
+ if elapsed >= self.config.batch_opts.slow_query_threshold {
warn!(
"Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
peer_desc, elapsed, &plan
@@ -439,12 +457,14 @@ impl BatchingTask {
.with_label_values(&[&flow_id_str])
.inc();
+ let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
+
let new_query = match self.gen_insert_plan(&engine, None).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
// also sleep for a little while before try again to prevent flooding logs
- tokio::time::sleep(MIN_REFRESH_DURATION).await;
+ tokio::time::sleep(min_refresh).await;
continue;
}
};
@@ -461,14 +481,18 @@ impl BatchingTask {
let sleep_until = {
let state = self.state.write().unwrap();
+ let time_window_size = self
+ .config
+ .time_window_expr
+ .as_ref()
+ .and_then(|t| *t.time_window_size());
+
state.get_next_start_query_time(
self.config.flow_id,
- &self
- .config
- .time_window_expr
- .as_ref()
- .and_then(|t| *t.time_window_size()),
- Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
+ &time_window_size,
+ min_refresh,
+ Some(self.config.batch_opts.query_timeout),
+ self.config.batch_opts.experimental_max_filter_num_per_query,
)
};
tokio::time::sleep_until(sleep_until).await;
@@ -477,9 +501,9 @@ impl BatchingTask {
Ok(None) => {
debug!(
"Flow id = {:?} found no new data, sleep for {:?} then continue",
- self.config.flow_id, MIN_REFRESH_DURATION
+ self.config.flow_id, min_refresh
);
- tokio::time::sleep(MIN_REFRESH_DURATION).await;
+ tokio::time::sleep(min_refresh).await;
continue;
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
@@ -496,7 +520,7 @@ impl BatchingTask {
}
}
// also sleep for a little while before try again to prevent flooding logs
- tokio::time::sleep(MIN_REFRESH_DURATION).await;
+ tokio::time::sleep(min_refresh).await;
}
}
}
@@ -604,7 +628,8 @@ impl BatchingTask {
&col_name,
Some(l),
window_size,
- max_window_cnt.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
+ max_window_cnt
+ .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
self.config.flow_id,
Some(self),
)?;
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index d065253d9b..20a10e269e 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -360,6 +360,7 @@ impl FlownodeBuilder {
self.flow_metadata_manager.clone(),
self.table_meta.clone(),
self.catalog_manager.clone(),
+ self.opts.flow.batching_mode.clone(),
));
let dual = FlowDualEngine::new(
manager.clone(),
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 1b6eb32f77..c82851600d 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1140,6 +1140,17 @@ max_running_procedures = 128
[flow]
+[flow.batching_mode]
+query_timeout = "10m"
+slow_query_threshold = "1m"
+experimental_min_refresh_duration = "5s"
+grpc_conn_timeout = "5s"
+experimental_grpc_max_retries = 3
+experimental_frontend_scan_timeout = "30s"
+experimental_frontend_activity_timeout = "1m"
+experimental_max_filter_num_per_query = 20
+experimental_time_window_merge_threshold = 3
+
[logging]
max_log_files = 720
append_stdout = true