diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index b2b9012f54..956d9a273a 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -156,9 +156,11 @@ impl BatchingEngine { let handle: JoinHandle> = tokio::spawn(async move { let src_table_names = &task.config.source_table_names; let mut all_dirty_windows = HashSet::new(); + let mut is_dirty = false; for src_table_name in src_table_names { if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) { let Some(expr) = &task.config.time_window_expr else { + is_dirty = true; continue; }; for timestamp in timestamps { @@ -173,6 +175,9 @@ impl BatchingEngine { } } let mut state = task.state.write().unwrap(); + if is_dirty { + state.dirty_time_windows.set_dirty(); + } let flow_id_label = task.config.flow_id.to_string(); for timestamp in all_dirty_windows { state.dirty_time_windows.add_window(timestamp, None); @@ -274,9 +279,12 @@ impl BatchingEngine { let handle: JoinHandle> = tokio::spawn(async move { let src_table_names = &task.config.source_table_names; + let mut is_dirty = false; + for src_table_name in src_table_names { if let Some(entry) = group_by_table_name.get(src_table_name) { let Some(expr) = &task.config.time_window_expr else { + is_dirty = true; continue; }; let involved_time_windows = expr.handle_rows(entry.clone()).await?; @@ -286,6 +294,10 @@ impl BatchingEngine { .add_lower_bounds(involved_time_windows.into_iter()); } } + if is_dirty { + task.state.write().unwrap().dirty_time_windows.set_dirty(); + } + Ok(()) }); handles.push(handle); diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 90575e8804..bc74957a2d 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -203,11 +203,21 @@ impl DirtyTimeWindows { self.windows.clear(); } + /// Set windows to be dirty, only useful for full aggr without time window + /// to mark some new data is inserted + pub fn set_dirty(&mut self) { + self.windows.insert(Timestamp::new_second(0), None); + } + /// Number of dirty windows. pub fn len(&self) -> usize { self.windows.len() } + pub fn is_empty(&self) -> bool { + self.windows.is_empty() + } + /// Get the effective count of time windows, which is the number of time windows that can be /// used for query, compute from total time window range divided by `window_size`. pub fn effective_count(&self, window_size: &Duration) -> usize { diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 576c75d3cf..75ac47f6d8 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -48,8 +48,8 @@ use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::state::{FilterExprInfo, TaskState}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ - get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, - FindGroupByFinalName, + gen_plan_with_matching_schema, get_table_info_df_schema, sql_to_df_plan, AddFilterRewriter, + ColumnMatcherRewriter, FindGroupByFinalName, }; use crate::batching_mode::BatchingModeOptions; use crate::df_optimizer::apply_df_optimizer; @@ -618,42 +618,63 @@ impl BatchingTask { .map(|expr| expr.eval(low_bound)) .transpose()?; - let (Some((Some(l), Some(u))), QueryType::Sql) = - (expire_time_window_bound, &self.config.query_type) - else { - // either no time window or not a sql query, then just use the original query - // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason - debug!( - "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id - ); - // clean dirty time window too, this could be from create flow's check_execute - self.state.write().unwrap().dirty_time_windows.clean(); + let (expire_lower_bound, expire_upper_bound) = + match (expire_time_window_bound, &self.config.query_type) { + (Some((Some(l), Some(u))), QueryType::Sql) => (l, u), + (None, QueryType::Sql) => { + // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns) + // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason + debug!( + "Flow id = {:?}, no time window, using the same query", + self.config.flow_id + ); + // clean dirty time window too, this could be from create flow's check_execute + let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty(); + self.state.write().unwrap().dirty_time_windows.clean(); - // TODO(discord9): not add auto column for tql query? - let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); + if !is_dirty { + // no dirty data, hence no need to update + debug!("Flow id={:?}, no new data, not update", self.config.flow_id); + return Ok(None); + } - let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false) - .await?; + let plan = gen_plan_with_matching_schema( + &self.config.query, + query_ctx, + engine, + sink_table_schema.clone(), + ) + .await?; - let plan = plan - .clone() - .rewrite(&mut add_auto_column) - .with_context(|_| DatafusionSnafu { - context: format!("Failed to rewrite plan:\n {}\n", plan), - })? - .data; + return Ok(Some(PlanInfo { plan, filter: None })); + } + _ => { + // clean for tql have no use for time window + self.state.write().unwrap().dirty_time_windows.clean(); - // since no time window lower/upper bound is found, just return the original query(with auto columns) - return Ok(Some(PlanInfo { plan, filter: None })); - }; + let plan = gen_plan_with_matching_schema( + &self.config.query, + query_ctx, + engine, + sink_table_schema.clone(), + ) + .await?; + + return Ok(Some(PlanInfo { plan, filter: None })); + } + }; debug!( "Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}", - self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows + self.config.flow_id, expire_lower_bound, expire_upper_bound, self.state.read().unwrap().dirty_time_windows ); - let window_size = u.sub(&l).with_context(|| UnexpectedSnafu { - reason: format!("Can't get window size from {u:?} - {l:?}"), - })?; + let window_size = expire_upper_bound + .sub(&expire_lower_bound) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}" + ), + })?; let col_name = self .config .time_window_expr @@ -673,7 +694,7 @@ impl BatchingTask { .dirty_time_windows .gen_filter_exprs( &col_name, - Some(l), + Some(expire_lower_bound), window_size, max_window_cnt .unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query), @@ -701,7 +722,7 @@ impl BatchingTask { }; let mut add_filter = AddFilterRewriter::new(expr.expr.clone()); - let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); + let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone()); let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?; diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 05ab1177b0..93cd02aee5 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -24,7 +24,7 @@ use datafusion::error::Result as DfResult; use datafusion::logical_expr::Expr; use datafusion::sql::unparser::Unparser; use datafusion_common::tree_node::{ - Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, + Transformed, TreeNode as _, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor, }; use datafusion_common::{DFSchema, DataFusionError, ScalarValue}; use datafusion_expr::{Distinct, LogicalPlan, Projection}; @@ -135,6 +135,27 @@ pub async fn sql_to_df_plan( Ok(plan) } +/// Generate a plan that matches the schema of the sink table +/// from given sql by alias and adding auto columns +pub(crate) async fn gen_plan_with_matching_schema( + sql: &str, + query_ctx: QueryContextRef, + engine: QueryEngineRef, + sink_table_schema: SchemaRef, +) -> Result { + let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?; + + let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema); + let plan = plan + .clone() + .rewrite(&mut add_auto_column) + .with_context(|_| DatafusionSnafu { + context: format!("Failed to rewrite plan:\n {}\n", plan), + })? + .data; + Ok(plan) +} + pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result { /// A dialect that forces identifiers to be quoted when have uppercase struct ForceQuoteIdentifiers; @@ -239,19 +260,19 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName { } } -/// Add to the final select columns like `update_at` +/// Optionally add to the final select columns like `update_at` if the sink table has such column /// (which doesn't necessary need to have exact name just need to be a extra timestamp column) /// and `__ts_placeholder`(this column need to have exact this name and be a timestamp) /// with values like `now()` and `0` /// /// it also give existing columns alias to column in sink table if needed #[derive(Debug)] -pub struct AddAutoColumnRewriter { +pub struct ColumnMatcherRewriter { pub schema: SchemaRef, pub is_rewritten: bool, } -impl AddAutoColumnRewriter { +impl ColumnMatcherRewriter { pub fn new(schema: SchemaRef) -> Self { Self { schema, @@ -348,7 +369,7 @@ impl AddAutoColumnRewriter { } } -impl TreeNodeRewriter for AddAutoColumnRewriter { +impl TreeNodeRewriter for ColumnMatcherRewriter { type Node = LogicalPlan; fn f_down(&mut self, mut node: Self::Node) -> DfResult> { if self.is_rewritten { @@ -696,7 +717,7 @@ mod test { let ctx = QueryContext::arc(); for (before, after, column_schemas) in testcases { let schema = Arc::new(Schema::new(column_schemas)); - let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema); + let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema); let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false) .await