From 5ccfca5228ecc788d06600484c66ce463177c817 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 20 May 2026 11:28:38 +0800 Subject: [PATCH] fix: disallow having Signed-off-by: discord9 --- src/flow/src/batching_mode/utils.rs | 62 +++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 188815174d..59041c2d31 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -195,6 +195,21 @@ fn find_aggregate_exprs(plan: &LogicalPlan) -> Result>, Error> Ok(aggregate_finder.aggr_exprs) } +fn contains_aggregate(plan: &LogicalPlan) -> bool { + matches!(plan, LogicalPlan::Aggregate(_)) || plan.inputs().into_iter().any(contains_aggregate) +} + +fn has_filter_above_aggregate(plan: &LogicalPlan) -> bool { + match plan { + // HAVING and other post-aggregate filters appear as `Filter` nodes above + // an `Aggregate`. Applying them before the sink-merge would filter on + // the delta aggregate rather than the final merged aggregate, so reject + // them until the rewrite can rebuild the predicate after merging. + LogicalPlan::Filter(filter) if contains_aggregate(filter.input.as_ref()) => true, + _ => plan.inputs().into_iter().any(has_filter_above_aggregate), + } +} + #[derive(Debug, Default)] struct OutputProjectionInfo { has_top_level_projection: bool, @@ -367,6 +382,12 @@ pub fn analyze_incremental_aggregate_plan( .into_iter() .map(|name| format!("duplicate output field name: {name}")) .collect::>(); + if has_filter_above_aggregate(plan) { + unsupported_exprs.push( + "unsupported post-aggregate filter (HAVING) in incremental aggregate rewrite" + .to_string(), + ); + } unsupported_exprs.extend(projection_info.duplicate_aggregate_aliases.iter().cloned()); if group_key_names.is_empty() && projection_info @@ -1766,6 +1787,47 @@ mod test { })); } + #[tokio::test] + async fn test_analyze_incremental_aggregate_plan_allows_where_before_aggregate() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = + "SELECT sum(number) AS number, ts FROM numbers_with_ts WHERE number > 10 GROUP BY ts"; + let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap(); + + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!(analysis.unsupported_exprs.is_empty()); + assert!(analysis.group_key_names.contains(&"ts".to_string())); + assert_eq!(analysis.merge_columns.len(), 1); + assert_eq!(analysis.merge_columns[0].output_field_name, "number"); + assert_eq!( + analysis.merge_columns[0].merge_op, + IncrementalAggregateMergeOp::Sum + ); + } + + #[tokio::test] + async fn test_analyze_incremental_aggregate_plan_rejects_having_filter() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = "SELECT sum(number) AS number, ts FROM numbers_with_ts GROUP BY ts HAVING sum(number) > 10"; + let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap(); + + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!( + analysis + .unsupported_exprs + .iter() + .any(|expr| expr.contains("post-aggregate filter")), + "HAVING/post-aggregate filter should be unsupported: {:?}", + analysis.unsupported_exprs + ); + assert!( + analysis.merge_columns.is_empty(), + "unsupported HAVING should disable merge columns" + ); + } + #[tokio::test] async fn test_analyze_incremental_aggregate_plan_preserves_raw_aggregate_name() { let query_engine = create_test_query_engine();