fix: disallow having

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-20 11:28:38 +08:00
parent 90a119cead
commit 5ccfca5228

View File

@@ -195,6 +195,21 @@ fn find_aggregate_exprs(plan: &LogicalPlan) -> Result<Option<Vec<Expr>>, Error>
Ok(aggregate_finder.aggr_exprs)
}
fn contains_aggregate(plan: &LogicalPlan) -> bool {
matches!(plan, LogicalPlan::Aggregate(_)) || plan.inputs().into_iter().any(contains_aggregate)
}
fn has_filter_above_aggregate(plan: &LogicalPlan) -> bool {
match plan {
// HAVING and other post-aggregate filters appear as `Filter` nodes above
// an `Aggregate`. Applying them before the sink-merge would filter on
// the delta aggregate rather than the final merged aggregate, so reject
// them until the rewrite can rebuild the predicate after merging.
LogicalPlan::Filter(filter) if contains_aggregate(filter.input.as_ref()) => true,
_ => plan.inputs().into_iter().any(has_filter_above_aggregate),
}
}
#[derive(Debug, Default)]
struct OutputProjectionInfo {
has_top_level_projection: bool,
@@ -367,6 +382,12 @@ pub fn analyze_incremental_aggregate_plan(
.into_iter()
.map(|name| format!("duplicate output field name: {name}"))
.collect::<Vec<_>>();
if has_filter_above_aggregate(plan) {
unsupported_exprs.push(
"unsupported post-aggregate filter (HAVING) in incremental aggregate rewrite"
.to_string(),
);
}
unsupported_exprs.extend(projection_info.duplicate_aggregate_aliases.iter().cloned());
if group_key_names.is_empty()
&& projection_info
@@ -1766,6 +1787,47 @@ mod test {
}));
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_allows_where_before_aggregate() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql =
"SELECT sum(number) AS number, ts FROM numbers_with_ts WHERE number > 10 GROUP BY ts";
let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(analysis.unsupported_exprs.is_empty());
assert!(analysis.group_key_names.contains(&"ts".to_string()));
assert_eq!(analysis.merge_columns.len(), 1);
assert_eq!(analysis.merge_columns[0].output_field_name, "number");
assert_eq!(
analysis.merge_columns[0].merge_op,
IncrementalAggregateMergeOp::Sum
);
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_rejects_having_filter() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT sum(number) AS number, ts FROM numbers_with_ts GROUP BY ts HAVING sum(number) > 10";
let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(
analysis
.unsupported_exprs
.iter()
.any(|expr| expr.contains("post-aggregate filter")),
"HAVING/post-aggregate filter should be unsupported: {:?}",
analysis.unsupported_exprs
);
assert!(
analysis.merge_columns.is_empty(),
"unsupported HAVING should disable merge columns"
);
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_preserves_raw_aggregate_name() {
let query_engine = create_test_query_engine();