diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 931985fd28..6ca69d9d8f 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -576,14 +576,12 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge( merge_col, )?); } else { - ensure!( - false, - InvalidQuerySnafu { - reason: format!( - "UNSUPPORTED_INCREMENTAL_AGG: output field {output_field_name} is not covered by group keys, literals, or merge columns" - ) - } - ); + return InvalidQuerySnafu { + reason: format!( + "UNSUPPORTED_INCREMENTAL_AGG: output field {output_field_name} is not covered by group keys, literals, or merge columns" + ), + } + .fail(); } } @@ -1801,6 +1799,22 @@ mod test { assert_eq!(analysis.output_field_names[2], "ts"); } + #[tokio::test] + async fn test_analyze_incremental_aggregate_plan_allows_string_literal_output() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = "SELECT max(number) AS number, 'hello' AS label FROM numbers_with_ts"; + let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap(); + + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!(analysis.unsupported_exprs.is_empty()); + assert_eq!(analysis.literal_columns, vec!["label".to_string()]); + assert_eq!( + analysis.output_field_names, + vec!["number".to_string(), "label".to_string()] + ); + } + #[tokio::test] async fn test_rewrite_incremental_aggregate_preserves_non_identifier_aliases() { let query_engine = create_test_query_engine(); @@ -1844,21 +1858,25 @@ mod test { async fn test_analyze_incremental_aggregate_plan_rejects_reserved_global_join_key_output() { let query_engine = create_test_query_engine(); let ctx = QueryContext::arc(); - let sql = - format!("SELECT max(number) AS \"{GLOBAL_AGGREGATE_JOIN_KEY}\" FROM numbers_with_ts"); - let plan = sql_to_df_plan(ctx, query_engine, &sql, false) - .await - .unwrap(); + let testcases = [ + format!("SELECT max(number) AS \"{GLOBAL_AGGREGATE_JOIN_KEY}\" FROM numbers_with_ts"), + format!("SELECT max(number) AS {GLOBAL_AGGREGATE_JOIN_KEY} FROM numbers_with_ts"), + ]; - let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); - assert!( - analysis - .unsupported_exprs - .iter() - .any(|expr| expr.contains("reserved internal name")), - "global aggregate output should not collide with the internal join key: {:?}", - analysis.unsupported_exprs - ); + for sql in testcases { + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, false) + .await + .unwrap(); + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!( + analysis + .unsupported_exprs + .iter() + .any(|expr| expr.contains("reserved internal name")), + "global aggregate output should not collide with the internal join key for SQL {sql}: {:?}", + analysis.unsupported_exprs + ); + } } #[tokio::test] @@ -1894,6 +1912,24 @@ mod test { ); } + #[tokio::test] + async fn test_analyze_incremental_aggregate_plan_rejects_same_aggregate_multiple_aliases() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let sql = "SELECT sum(number) AS a, sum(number) AS b, ts FROM numbers_with_ts GROUP BY ts"; + let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap(); + + let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap(); + assert!( + analysis + .unsupported_exprs + .iter() + .any(|expr| expr.contains("unsupported output field: b")), + "same aggregate with multiple aliases should be unsupported until explicit reproduction is implemented: {:?}", + analysis.unsupported_exprs + ); + } + #[test] fn test_qualified_col_preserves_non_identifier_field_name() { let expr = qualified_col("__flow_delta", "max(numbers_with_ts.number)");