diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 054190a693..622e5a54ef 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -351,7 +351,7 @@ impl FlowDualEngine { } } else { warn!( - "Flownode {:?} found flows not exist in flownode, flow_ids={:?}", + "Flows do not exist in flownode for node {:?}, flow_ids={:?}", nodeid, to_be_created ); } @@ -371,7 +371,7 @@ impl FlowDualEngine { } } else { warn!( - "Flownode {:?} found flows not exist in flownode, flow_ids={:?}", + "Flows do not exist in metadata for node {:?}, flow_ids={:?}", nodeid, to_be_dropped ); } diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 4c6f608af9..57d254a63b 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -71,18 +71,33 @@ impl TaskState { self.last_update_time = Instant::now(); } - /// wait for at least `last_query_duration`, at most `max_timeout` to start next query + /// Compute the next query delay based on the time window size or the last query duration. + /// Aiming to avoid too frequent queries. But also not too long delay. + /// The delay is computed as follows: + /// - If `time_window_size` is set, the delay is half the time window size, constrained to be + /// at least `last_query_duration` and at most `max_timeout`. + /// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained + /// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`. /// - /// if have more dirty time window, exec next query immediately + /// If there are dirty time windows, the function returns an immediate execution time to clean them. + /// TODO: Make this behavior configurable. pub fn get_next_start_query_time( &self, flow_id: FlowId, + time_window_size: &Option, max_timeout: Option, ) -> Instant { - let next_duration = max_timeout + let last_duration = max_timeout .unwrap_or(self.last_query_duration) - .min(self.last_query_duration); - let next_duration = next_duration.max(MIN_REFRESH_DURATION); + .min(self.last_query_duration) + .max(MIN_REFRESH_DURATION); + + let next_duration = time_window_size + .map(|t| { + let half = t / 2; + half.max(last_duration) + }) + .unwrap_or(last_duration); // if have dirty time window, execute immediately to clean dirty time window if self.dirty_time_windows.windows.is_empty() { diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 7bdcf90069..03789e9e9c 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -380,6 +380,23 @@ impl BatchingTask { frontend_client: Arc, ) { loop { + // first check if shutdown signal is received + // if so, break the loop + { + let mut state = self.state.write().unwrap(); + match state.shutdown_rx.try_recv() { + Ok(()) => break, + Err(TryRecvError::Closed) => { + warn!( + "Unexpected shutdown flow {}, shutdown anyway", + self.config.flow_id + ); + break; + } + Err(TryRecvError::Empty) => (), + } + } + let mut new_query = None; let mut gen_and_exec = async || { new_query = self.gen_insert_plan(&engine).await?; @@ -393,20 +410,15 @@ impl BatchingTask { // normal execute, sleep for some time before doing next query Ok(Some(_)) => { let sleep_until = { - let mut state = self.state.write().unwrap(); - match state.shutdown_rx.try_recv() { - Ok(()) => break, - Err(TryRecvError::Closed) => { - warn!( - "Unexpected shutdown flow {}, shutdown anyway", - self.config.flow_id - ); - break; - } - Err(TryRecvError::Empty) => (), - } + let state = self.state.write().unwrap(); + state.get_next_start_query_time( self.config.flow_id, + &self + .config + .time_window_expr + .as_ref() + .and_then(|t| *t.time_window_size()), Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT), ) }; diff --git a/src/flow/src/batching_mode/time_window.rs b/src/flow/src/batching_mode/time_window.rs index 398250fc8b..54ccf7a49d 100644 --- a/src/flow/src/batching_mode/time_window.rs +++ b/src/flow/src/batching_mode/time_window.rs @@ -55,6 +55,9 @@ use crate::error::{ use crate::expr::error::DataTypeSnafu; use crate::Error; +/// Represents a test timestamp in seconds since the Unix epoch. +const DEFAULT_TEST_TIMESTAMP: Timestamp = Timestamp::new_second(17_0000_0000); + /// Time window expr like `date_bin(INTERVAL '1' MINUTE, ts)`, this type help with /// evaluating the expr using given timestamp /// @@ -70,6 +73,7 @@ pub struct TimeWindowExpr { pub column_name: String, logical_expr: Expr, df_schema: DFSchema, + eval_time_window_size: Option, } impl std::fmt::Display for TimeWindowExpr { @@ -84,6 +88,11 @@ impl std::fmt::Display for TimeWindowExpr { } impl TimeWindowExpr { + /// The time window size of the expr, get from calling `eval` with a test timestamp + pub fn time_window_size(&self) -> &Option { + &self.eval_time_window_size + } + pub fn from_expr( expr: &Expr, column_name: &str, @@ -91,12 +100,28 @@ impl TimeWindowExpr { session: &SessionState, ) -> Result { let phy_expr: PhysicalExprRef = to_phy_expr(expr, df_schema, session)?; - Ok(Self { + let mut zelf = Self { phy_expr, column_name: column_name.to_string(), logical_expr: expr.clone(), df_schema: df_schema.clone(), - }) + eval_time_window_size: None, + }; + let test_ts = DEFAULT_TEST_TIMESTAMP; + let (l, u) = zelf.eval(test_ts)?; + let time_window_size = match (l, u) { + (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| { + UnexpectedSnafu { + reason: format!( + "Expect upper bound older than lower bound, found upper={u:?} and lower={l:?}" + ), + } + .build() + })?, + _ => None, + }; + zelf.eval_time_window_size = time_window_size; + Ok(zelf) } pub fn eval(