more per review

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-19 21:17:10 +08:00
parent 6d1a3b0e0f
commit d4e9ec264b

View File

@@ -576,14 +576,12 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge(
merge_col,
)?);
} else {
ensure!(
false,
InvalidQuerySnafu {
reason: format!(
"UNSUPPORTED_INCREMENTAL_AGG: output field {output_field_name} is not covered by group keys, literals, or merge columns"
)
}
);
return InvalidQuerySnafu {
reason: format!(
"UNSUPPORTED_INCREMENTAL_AGG: output field {output_field_name} is not covered by group keys, literals, or merge columns"
),
}
.fail();
}
}
@@ -1801,6 +1799,22 @@ mod test {
assert_eq!(analysis.output_field_names[2], "ts");
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_allows_string_literal_output() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT max(number) AS number, 'hello' AS label FROM numbers_with_ts";
let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(analysis.unsupported_exprs.is_empty());
assert_eq!(analysis.literal_columns, vec!["label".to_string()]);
assert_eq!(
analysis.output_field_names,
vec!["number".to_string(), "label".to_string()]
);
}
#[tokio::test]
async fn test_rewrite_incremental_aggregate_preserves_non_identifier_aliases() {
let query_engine = create_test_query_engine();
@@ -1844,21 +1858,25 @@ mod test {
async fn test_analyze_incremental_aggregate_plan_rejects_reserved_global_join_key_output() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql =
format!("SELECT max(number) AS \"{GLOBAL_AGGREGATE_JOIN_KEY}\" FROM numbers_with_ts");
let plan = sql_to_df_plan(ctx, query_engine, &sql, false)
.await
.unwrap();
let testcases = [
format!("SELECT max(number) AS \"{GLOBAL_AGGREGATE_JOIN_KEY}\" FROM numbers_with_ts"),
format!("SELECT max(number) AS {GLOBAL_AGGREGATE_JOIN_KEY} FROM numbers_with_ts"),
];
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(
analysis
.unsupported_exprs
.iter()
.any(|expr| expr.contains("reserved internal name")),
"global aggregate output should not collide with the internal join key: {:?}",
analysis.unsupported_exprs
);
for sql in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, false)
.await
.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(
analysis
.unsupported_exprs
.iter()
.any(|expr| expr.contains("reserved internal name")),
"global aggregate output should not collide with the internal join key for SQL {sql}: {:?}",
analysis.unsupported_exprs
);
}
}
#[tokio::test]
@@ -1894,6 +1912,24 @@ mod test {
);
}
#[tokio::test]
async fn test_analyze_incremental_aggregate_plan_rejects_same_aggregate_multiple_aliases() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let sql = "SELECT sum(number) AS a, sum(number) AS b, ts FROM numbers_with_ts GROUP BY ts";
let plan = sql_to_df_plan(ctx, query_engine, sql, false).await.unwrap();
let analysis = analyze_incremental_aggregate_plan(&plan).unwrap().unwrap();
assert!(
analysis
.unsupported_exprs
.iter()
.any(|expr| expr.contains("unsupported output field: b")),
"same aggregate with multiple aliases should be unsupported until explicit reproduction is implemented: {:?}",
analysis.unsupported_exprs
);
}
#[test]
fn test_qualified_col_preserves_non_identifier_field_name() {
let expr = qualified_col("__flow_delta", "max(numbers_with_ts.number)");