From bbc9f3ea1efdcce62825fbb71159e29ca64d1c06 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Wed, 16 Jul 2025 16:05:20 +0800 Subject: [PATCH] refactor: expose flow batching mode constants to config (#6442) * refactor: make flow batching mode constant to configs Signed-off-by: discord9 * docs: config docs Signed-off-by: discord9 * docs: update code comment Signed-off-by: discord9 * test: fix test_config_api Signed-off-by: discord9 * feat: more batch opts Signed-off-by: discord9 * fix after rebase Signed-off-by: discord9 * chore: per review Signed-off-by: discord9 * per review experimental options Signed-off-by: discord9 --------- Signed-off-by: discord9 --- Cargo.lock | 1 + config/config.md | 10 +++ config/flownode.example.toml | 23 ++++++ src/cmd/src/flownode.rs | 1 + src/flow/Cargo.toml | 1 + src/flow/src/adapter.rs | 3 + src/flow/src/adapter/flownode_impl.rs | 15 +++- src/flow/src/batching_mode.rs | 70 +++++++++++------ src/flow/src/batching_mode/engine.rs | 23 ++++-- src/flow/src/batching_mode/frontend_client.rs | 30 +++++--- src/flow/src/batching_mode/state.rs | 72 +++++++++++------ src/flow/src/batching_mode/task.rs | 77 ++++++++++++------- src/flow/src/server.rs | 1 + tests-integration/tests/http.rs | 11 +++ 14 files changed, 242 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a47dc1982..1e0d5a09e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4688,6 +4688,7 @@ dependencies = [ "get-size2", "greptime-proto", "http 1.1.0", + "humantime-serde", "itertools 0.14.0", "lazy_static", "meta-client", diff --git a/config/config.md b/config/config.md index 780239142d..30140148d7 100644 --- a/config/config.md +++ b/config/config.md @@ -562,6 +562,16 @@ | `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. | | `flow` | -- | -- | flow engine options. | | `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. | +| `flow.batching_mode` | -- | -- | -- | +| `flow.batching_mode.query_timeout` | String | `600s` | The default batching engine query timeout is 10 minutes. | +| `flow.batching_mode.slow_query_threshold` | String | `60s` | will output a warn log for any query that runs for more that this threshold | +| `flow.batching_mode.experimental_min_refresh_duration` | String | `5s` | The minimum duration between two queries execution by batching mode task | +| `flow.batching_mode.grpc_conn_timeout` | String | `5s` | The gRPC connection timeout | +| `flow.batching_mode.experimental_grpc_max_retries` | Integer | `3` | The gRPC max retry number | +| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting | +| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout
if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
it will be removed from the list that flownode use to connect | +| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query | +| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,
and used for connections from outside the host | diff --git a/config/flownode.example.toml b/config/flownode.example.toml index d1d4d9e341..cc9dd90705 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -7,6 +7,29 @@ node_id = 14 ## The number of flow worker in flownode. ## Not setting(or set to 0) this value will use the number of CPU cores divided by 2. #+num_workers=0 +[flow.batching_mode] +## The default batching engine query timeout is 10 minutes. +#+query_timeout="600s" +## will output a warn log for any query that runs for more that this threshold +#+slow_query_threshold="60s" +## The minimum duration between two queries execution by batching mode task +#+experimental_min_refresh_duration="5s" +## The gRPC connection timeout +#+grpc_conn_timeout="5s" +## The gRPC max retry number +#+experimental_grpc_max_retries=3 +## Flow wait for available frontend timeout, +## if failed to find available frontend after frontend_scan_timeout elapsed, return error +## which prevent flownode from starting +#+experimental_frontend_scan_timeout="30s" +## Frontend activity timeout +## if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, +## it will be removed from the list that flownode use to connect +#+experimental_frontend_activity_timeout="60s" +## Maximum number of filters allowed in a single query +#+experimental_max_filter_num_per_query=20 +## Time window merge distance +#+experimental_time_window_merge_threshold=3 ## The gRPC server options. [grpc] diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index eb920a0a75..1a071e10a3 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -374,6 +374,7 @@ impl StartCommand { meta_client.clone(), flow_auth_header, opts.query.clone(), + opts.flow.batching_mode.clone(), ); let frontend_client = Arc::new(frontend_client); let flownode_builder = FlownodeBuilder::new( diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 82377868fb..7d22f45b6c 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -48,6 +48,7 @@ futures.workspace = true get-size2 = "0.1.2" greptime-proto.workspace = true http.workspace = true +humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true meta-client.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 2acba4979e..a6ad3d1d40 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,6 +50,7 @@ use crate::adapter::refill::RefillTask; use crate::adapter::table_source::ManagedTableSource; use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::batching_mode::BatchingModeOptions; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu}; @@ -84,12 +85,14 @@ pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at"; #[serde(default)] pub struct FlowConfig { pub num_workers: usize, + pub batching_mode: BatchingModeOptions, } impl Default for FlowConfig { fn default() -> Self { Self { num_workers: (common_config::utils::get_cpus() / 2).max(1), + batching_mode: BatchingModeOptions::default(), } } } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index f68526508f..dfb54a382a 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -41,7 +41,6 @@ use tokio::sync::{Mutex, RwLock}; use crate::adapter::{CreateFlowArgs, StreamingEngine}; use crate::batching_mode::engine::BatchingEngine; -use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION}; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu, @@ -440,13 +439,21 @@ struct ConsistentCheckTask { impl ConsistentCheckTask { async fn start_check_task(engine: &Arc) -> Result { let engine = engine.clone(); + let min_refresh_duration = engine + .batching_engine() + .batch_opts + .experimental_min_refresh_duration; + let frontend_scan_timeout = engine + .batching_engine() + .batch_opts + .experimental_frontend_scan_timeout; let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10); let handle = common_runtime::spawn_global(async move { // first check if available frontend is found if let Err(err) = engine - .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT) + .wait_for_available_frontend(frontend_scan_timeout) .await { warn!("No frontend is available yet:\n {err:?}"); @@ -459,9 +466,9 @@ impl ConsistentCheckTask { error!( "Failed to recover flows:\n {err:?}, retry {} in {}s", recover_retry, - MIN_REFRESH_DURATION.as_secs() + min_refresh_duration.as_secs() ); - tokio::time::sleep(MIN_REFRESH_DURATION).await; + tokio::time::sleep(min_refresh_duration).await; } engine.set_done_recovering(); diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 0e6f3bbb25..65e678bd00 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -16,6 +16,8 @@ use std::time::Duration; +use serde::{Deserialize, Serialize}; + pub(crate) mod engine; pub(crate) mod frontend_client; mod state; @@ -23,27 +25,49 @@ mod task; mod time_window; mod utils; -/// TODO(discord9): make those constants configurable -/// The default batching engine query timeout is 10 minutes -pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60); +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct BatchingModeOptions { + /// The default batching engine query timeout is 10 minutes + #[serde(with = "humantime_serde")] + pub query_timeout: Duration, + /// will output a warn log for any query that runs for more that this threshold + #[serde(with = "humantime_serde")] + pub slow_query_threshold: Duration, + /// The minimum duration between two queries execution by batching mode task + #[serde(with = "humantime_serde")] + pub experimental_min_refresh_duration: Duration, + /// The gRPC connection timeout + #[serde(with = "humantime_serde")] + pub grpc_conn_timeout: Duration, + /// The gRPC max retry number + pub experimental_grpc_max_retries: u32, + /// Flow wait for available frontend timeout, + /// if failed to find available frontend after frontend_scan_timeout elapsed, return error + /// which prevent flownode from starting + #[serde(with = "humantime_serde")] + pub experimental_frontend_scan_timeout: Duration, + /// Frontend activity timeout + /// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect + #[serde(with = "humantime_serde")] + pub experimental_frontend_activity_timeout: Duration, + /// Maximum number of filters allowed in a single query + pub experimental_max_filter_num_per_query: usize, + /// Time window merge distance + pub experimental_time_window_merge_threshold: usize, +} -/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running -pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60); - -/// The minimum duration between two queries execution by batching mode task -pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); - -/// Grpc connection timeout -const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5); - -/// Grpc max retry number -const GRPC_MAX_RETRIES: u32 = 3; - -/// Flow wait for available frontend timeout, -/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error -/// which should prevent flownode from starting -pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30); - -/// Frontend activity timeout -/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect -pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60); +impl Default for BatchingModeOptions { + fn default() -> Self { + Self { + query_timeout: Duration::from_secs(10 * 60), + slow_query_threshold: Duration::from_secs(60), + experimental_min_refresh_duration: Duration::new(5, 0), + grpc_conn_timeout: Duration::from_secs(5), + experimental_grpc_max_retries: 3, + experimental_frontend_scan_timeout: Duration::from_secs(30), + experimental_frontend_activity_timeout: Duration::from_secs(60), + experimental_max_filter_num_per_query: 20, + experimental_time_window_merge_threshold: 3, + } + } +} diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index e6ff6c54b3..f8d2398520 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -34,9 +34,10 @@ use store_api::storage::{RegionId, TableId}; use tokio::sync::{oneshot, RwLock}; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::task::BatchingTask; +use crate::batching_mode::task::{BatchingTask, TaskArgs}; use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr}; use crate::batching_mode::utils::sql_to_df_plan; +use crate::batching_mode::BatchingModeOptions; use crate::engine::FlowEngine; use crate::error::{ ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, @@ -57,6 +58,9 @@ pub struct BatchingEngine { table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, + /// Batching mode options for control how batching mode query works + /// + pub(crate) batch_opts: Arc, } impl BatchingEngine { @@ -66,6 +70,7 @@ impl BatchingEngine { flow_metadata_manager: FlowMetadataManagerRef, table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, + batch_opts: BatchingModeOptions, ) -> Self { Self { tasks: Default::default(), @@ -75,6 +80,7 @@ impl BatchingEngine { table_meta, catalog_manager, query_engine, + batch_opts: Arc::new(batch_opts), } } @@ -424,18 +430,21 @@ impl BatchingEngine { .unwrap_or("None".to_string()) ); - let task = BatchingTask::try_new( + let task_args = TaskArgs { flow_id, - &sql, + query: &sql, plan, - phy_expr, + time_window_expr: phy_expr, expire_after, sink_table_name, source_table_names, query_ctx, - self.catalog_manager.clone(), - rx, - )?; + catalog_manager: self.catalog_manager.clone(), + shutdown_rx: rx, + batch_opts: self.batch_opts.clone(), + }; + + let task = BatchingTask::try_new(task_args)?; let task_inner = task.clone(); let engine = self.query_engine.clone(); diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index e32ab4e608..eb29b6c5cd 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -38,10 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; -use crate::batching_mode::{ - DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT, - GRPC_MAX_RETRIES, -}; +use crate::batching_mode::BatchingModeOptions; use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu}; use crate::{Error, FlowAuthHeader}; @@ -88,6 +85,7 @@ pub enum FrontendClient { chnl_mgr: ChannelManager, auth: Option, query: QueryOptions, + batch_opts: BatchingModeOptions, }, Standalone { /// for the sake of simplicity still use grpc even in standalone mode @@ -114,18 +112,20 @@ impl FrontendClient { meta_client: Arc, auth: Option, query: QueryOptions, + batch_opts: BatchingModeOptions, ) -> Self { common_telemetry::info!("Frontend client build with auth={:?}", auth); Self::Distributed { meta_client, chnl_mgr: { let cfg = ChannelConfig::new() - .connect_timeout(GRPC_CONN_TIMEOUT) - .timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT); + .connect_timeout(batch_opts.grpc_conn_timeout) + .timeout(batch_opts.query_timeout); ChannelManager::with_config(cfg) }, auth, query, + batch_opts, } } @@ -209,6 +209,7 @@ impl FrontendClient { chnl_mgr, auth, query: _, + batch_opts, } = self else { return UnexpectedSnafu { @@ -217,9 +218,9 @@ impl FrontendClient { .fail(); }; - let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT); + let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout); interval.tick().await; - for retry in 0..GRPC_MAX_RETRIES { + for retry in 0..batch_opts.experimental_grpc_max_retries { let mut frontends = self.scan_for_frontend().await?; let now_in_ms = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -233,7 +234,10 @@ impl FrontendClient { .iter() // filter out frontend that have been down for more than 1 min .filter(|(_, node_info)| { - node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64 + node_info.last_activity_ts + + batch_opts + .experimental_frontend_activity_timeout + .as_millis() as i64 > now_in_ms }) { @@ -263,7 +267,7 @@ impl FrontendClient { } NoAvailableFrontendSnafu { - timeout: GRPC_CONN_TIMEOUT, + timeout: batch_opts.grpc_conn_timeout, context: "No available frontend found that is able to process query", } .fail() @@ -346,7 +350,9 @@ impl FrontendClient { peer_desc: &mut Option, ) -> Result { match self { - FrontendClient::Distributed { query, .. } => { + FrontendClient::Distributed { + query, batch_opts, .. + } => { let db = self.get_random_active_frontend(catalog, schema).await?; *peer_desc = Some(PeerDesc::Dist { @@ -356,7 +362,7 @@ impl FrontendClient { db.database .handle_with_retry( req.clone(), - GRPC_MAX_RETRIES, + batch_opts.experimental_grpc_max_retries, &[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())], ) .await diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index a816b663bb..ca3d78e710 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -28,7 +28,6 @@ use tokio::time::Instant; use crate::batching_mode::task::BatchingTask; use crate::batching_mode::time_window::TimeWindowExpr; -use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE, @@ -89,10 +88,12 @@ impl TaskState { &self, flow_id: FlowId, time_window_size: &Option, + min_refresh_duration: Duration, max_timeout: Option, + max_filter_num_per_query: usize, ) -> Instant { // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout` - let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION); + let lower = time_window_size.unwrap_or(min_refresh_duration); let next_duration = self.last_query_duration.max(lower); let next_duration = if let Some(max_timeout) = max_timeout { next_duration.min(max_timeout) @@ -104,7 +105,7 @@ impl TaskState { // compute how much time range can be handled in one query let max_query_update_range = (*time_window_size) .unwrap_or_default() - .mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64); + .mul_f64(max_filter_num_per_query as f64); // if dirty time range is more than one query can handle, execute immediately // to faster clean up dirty time windows if cur_dirty_window_size < max_query_update_range { @@ -125,11 +126,36 @@ impl TaskState { /// For keep recording of dirty time windows, which is time window that have new data inserted /// since last query. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct DirtyTimeWindows { /// windows's `start -> end` and non-overlapping /// `end` is exclusive(and optional) windows: BTreeMap>, + /// Maximum number of filters allowed in a single query + max_filter_num_per_query: usize, + /// Time window merge distance + /// + time_window_merge_threshold: usize, +} + +impl DirtyTimeWindows { + pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self { + Self { + windows: BTreeMap::new(), + max_filter_num_per_query, + time_window_merge_threshold, + } + } +} + +impl Default for DirtyTimeWindows { + fn default() -> Self { + Self { + windows: BTreeMap::new(), + max_filter_num_per_query: 20, + time_window_merge_threshold: 3, + } + } } impl DirtyTimeWindows { @@ -138,9 +164,6 @@ impl DirtyTimeWindows { /// TODO(discord9): make those configurable pub const MERGE_DIST: i32 = 3; - /// Maximum number of filters allowed in a single query - pub const MAX_FILTER_NUM: usize = 20; - /// Add lower bounds to the dirty time windows. Upper bounds are ignored. /// /// # Arguments @@ -234,7 +257,7 @@ impl DirtyTimeWindows { ); self.merge_dirty_time_windows(window_size, expire_lower_bound)?; - if self.windows.len() > Self::MAX_FILTER_NUM { + if self.windows.len() > self.max_filter_num_per_query { let first_time_window = self.windows.first_key_value(); let last_time_window = self.windows.last_key_value(); @@ -243,7 +266,7 @@ impl DirtyTimeWindows { "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}", task_ctx.config.flow_id, self.windows.len(), - Self::MAX_FILTER_NUM, + self.max_filter_num_per_query, task_ctx.config.time_window_expr, task_ctx.config.expire_after, first_time_window, @@ -254,7 +277,7 @@ impl DirtyTimeWindows { warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}", flow_id, self.windows.len(), - Self::MAX_FILTER_NUM, + self.max_filter_num_per_query, first_time_window, last_time_window ) @@ -460,7 +483,7 @@ impl DirtyTimeWindows { if lower_bound .sub(&prev_upper) - .map(|dist| dist <= window_size * Self::MERGE_DIST) + .map(|dist| dist <= window_size * self.time_window_merge_threshold as i32) .unwrap_or(false) { prev_tw.1 = Some(cur_upper); @@ -508,18 +531,19 @@ mod test { #[test] fn test_merge_dirty_time_windows() { + let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold; let testcases = vec![ // just enough to merge ( vec![ Timestamp::new_second(0), - Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Timestamp::new_second((1 + merge_dist as i64) * 5 * 60), ], (chrono::Duration::seconds(5 * 60), None), BTreeMap::from([( Timestamp::new_second(0), Some(Timestamp::new_second( - (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60, + (2 + merge_dist as i64) * 5 * 60, )), )]), Some( @@ -530,7 +554,7 @@ mod test { ( vec![ Timestamp::new_second(0), - Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Timestamp::new_second((2 + merge_dist as i64) * 5 * 60), ], (chrono::Duration::seconds(5 * 60), None), BTreeMap::from([ @@ -539,9 +563,9 @@ mod test { Some(Timestamp::new_second(5 * 60)), ), ( - Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Timestamp::new_second((2 + merge_dist as i64) * 5 * 60), Some(Timestamp::new_second( - (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60, + (3 + merge_dist as i64) * 5 * 60, )), ), ]), @@ -553,13 +577,13 @@ mod test { ( vec![ Timestamp::new_second(0), - Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Timestamp::new_second((merge_dist as i64) * 5 * 60), ], (chrono::Duration::seconds(5 * 60), None), BTreeMap::from([( Timestamp::new_second(0), Some(Timestamp::new_second( - (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60, + (1 + merge_dist as i64) * 5 * 60, )), )]), Some( @@ -570,14 +594,14 @@ mod test { ( vec![ Timestamp::new_second(0), - Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3), - Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2), + Timestamp::new_second((merge_dist as i64) * 3), + Timestamp::new_second((merge_dist as i64) * 3 * 2), ], (chrono::Duration::seconds(3), None), BTreeMap::from([( Timestamp::new_second(0), Some(Timestamp::new_second( - (DirtyTimeWindows::MERGE_DIST as i64) * 7 + (merge_dist as i64) * 7 )), )]), Some( @@ -646,12 +670,12 @@ mod test { ( vec![ Timestamp::new_second(0), - Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Timestamp::new_second((merge_dist as i64) * 5 * 60), ], ( chrono::Duration::seconds(5 * 60), Some(Timestamp::new_second( - (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60, + (merge_dist as i64) * 6 * 60, )), ), BTreeMap::from([]), @@ -674,7 +698,7 @@ mod test { "ts", expire_lower_bound, window_size, - DirtyTimeWindows::MAX_FILTER_NUM, + dirty.max_filter_num_per_query, 0, None, ) diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 9f203bb8d3..ebce09e367 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -46,15 +46,13 @@ use tokio::time::Instant; use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::{DirtyTimeWindows, TaskState}; +use crate::batching_mode::state::TaskState; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, FindGroupByFinalName, }; -use crate::batching_mode::{ - DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD, -}; +use crate::batching_mode::BatchingModeOptions; use crate::df_optimizer::apply_df_optimizer; use crate::error::{ ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu, @@ -81,6 +79,7 @@ pub struct TaskConfig { pub source_table_names: HashSet<[String; 3]>, catalog_manager: CatalogManagerRef, query_type: QueryType, + batch_opts: Arc, } fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { @@ -116,19 +115,37 @@ pub struct BatchingTask { pub state: Arc>, } +/// Arguments for creating batching task +pub struct TaskArgs<'a> { + pub flow_id: FlowId, + pub query: &'a str, + pub plan: LogicalPlan, + pub time_window_expr: Option, + pub expire_after: Option, + pub sink_table_name: [String; 3], + pub source_table_names: Vec<[String; 3]>, + pub query_ctx: QueryContextRef, + pub catalog_manager: CatalogManagerRef, + pub shutdown_rx: oneshot::Receiver<()>, + pub batch_opts: Arc, +} + impl BatchingTask { #[allow(clippy::too_many_arguments)] pub fn try_new( - flow_id: FlowId, - query: &str, - plan: LogicalPlan, - time_window_expr: Option, - expire_after: Option, - sink_table_name: [String; 3], - source_table_names: Vec<[String; 3]>, - query_ctx: QueryContextRef, - catalog_manager: CatalogManagerRef, - shutdown_rx: oneshot::Receiver<()>, + TaskArgs { + flow_id, + query, + plan, + time_window_expr, + expire_after, + sink_table_name, + source_table_names, + query_ctx, + catalog_manager, + shutdown_rx, + batch_opts, + }: TaskArgs<'_>, ) -> Result { Ok(Self { config: Arc::new(TaskConfig { @@ -141,6 +158,7 @@ impl BatchingTask { catalog_manager, output_schema: plan.schema().clone(), query_type: determine_query_type(query, &query_ctx)?, + batch_opts, }), state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))), }) @@ -386,7 +404,7 @@ impl BatchingTask { } // record slow query - if elapsed >= SLOW_QUERY_THRESHOLD { + if elapsed >= self.config.batch_opts.slow_query_threshold { warn!( "Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}", peer_desc, elapsed, &plan @@ -439,12 +457,14 @@ impl BatchingTask { .with_label_values(&[&flow_id_str]) .inc(); + let min_refresh = self.config.batch_opts.experimental_min_refresh_duration; + let new_query = match self.gen_insert_plan(&engine, None).await { Ok(new_query) => new_query, Err(err) => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); // also sleep for a little while before try again to prevent flooding logs - tokio::time::sleep(MIN_REFRESH_DURATION).await; + tokio::time::sleep(min_refresh).await; continue; } }; @@ -461,14 +481,18 @@ impl BatchingTask { let sleep_until = { let state = self.state.write().unwrap(); + let time_window_size = self + .config + .time_window_expr + .as_ref() + .and_then(|t| *t.time_window_size()); + state.get_next_start_query_time( self.config.flow_id, - &self - .config - .time_window_expr - .as_ref() - .and_then(|t| *t.time_window_size()), - Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT), + &time_window_size, + min_refresh, + Some(self.config.batch_opts.query_timeout), + self.config.batch_opts.experimental_max_filter_num_per_query, ) }; tokio::time::sleep_until(sleep_until).await; @@ -477,9 +501,9 @@ impl BatchingTask { Ok(None) => { debug!( "Flow id = {:?} found no new data, sleep for {:?} then continue", - self.config.flow_id, MIN_REFRESH_DURATION + self.config.flow_id, min_refresh ); - tokio::time::sleep(MIN_REFRESH_DURATION).await; + tokio::time::sleep(min_refresh).await; continue; } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed @@ -496,7 +520,7 @@ impl BatchingTask { } } // also sleep for a little while before try again to prevent flooding logs - tokio::time::sleep(MIN_REFRESH_DURATION).await; + tokio::time::sleep(min_refresh).await; } } } @@ -604,7 +628,8 @@ impl BatchingTask { &col_name, Some(l), window_size, - max_window_cnt.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM), + max_window_cnt + .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query), self.config.flow_id, Some(self), )?; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d065253d9b..20a10e269e 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -360,6 +360,7 @@ impl FlownodeBuilder { self.flow_metadata_manager.clone(), self.table_meta.clone(), self.catalog_manager.clone(), + self.opts.flow.batching_mode.clone(), )); let dual = FlowDualEngine::new( manager.clone(), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1b6eb32f77..c82851600d 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1140,6 +1140,17 @@ max_running_procedures = 128 [flow] +[flow.batching_mode] +query_timeout = "10m" +slow_query_threshold = "1m" +experimental_min_refresh_duration = "5s" +grpc_conn_timeout = "5s" +experimental_grpc_max_retries = 3 +experimental_frontend_scan_timeout = "30s" +experimental_frontend_activity_timeout = "1m" +experimental_max_filter_num_per_query = 20 +experimental_time_window_merge_threshold = 3 + [logging] max_log_files = 720 append_stdout = true