chore: rm unwrap

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-14 13:08:32 +08:00
parent efd98df923
commit 3094a7868d

View File

@@ -305,12 +305,13 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge(
.iter()
.map(|c| col(format!("{delta_alias}.{c}")).alias(c.clone()))
.collect::<Vec<_>>();
projection_exprs.extend(
analysis
.merge_columns
.iter()
.map(|merge_col| build_left_join_merge_expr(delta_alias, sink_alias, merge_col)),
);
for merge_col in &analysis.merge_columns {
projection_exprs.push(build_left_join_merge_expr(
delta_alias,
sink_alias,
merge_col,
)?);
}
LogicalPlanBuilder::from(joined)
.project(projection_exprs)
@@ -327,44 +328,60 @@ fn build_left_join_merge_expr(
delta_alias: &str,
sink_alias: &str,
merge_col: &IncrementalAggregateMergeColumn,
) -> Expr {
) -> Result<Expr, Error> {
let left = col(format!("{delta_alias}.{}", merge_col.output_field_name));
let right = col(format!("{sink_alias}.{}", merge_col.output_field_name));
let merged = match merge_col.merge_op {
IncrementalAggregateMergeOp::Sum => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(binary_expr(left.clone(), Operator::Plus, right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build SUM merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::Min => when(is_null(right.clone()), left.clone())
.when(left.clone().lt_eq(right.clone()), left.clone())
.otherwise(right.clone())
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build MIN merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::Max => when(is_null(right.clone()), left.clone())
.when(left.clone().gt_eq(right.clone()), left.clone())
.otherwise(right.clone())
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build MAX merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::BoolAnd => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(and(left.clone(), right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build BOOL_AND merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::BoolOr => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(or(left.clone(), right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build BOOL_OR merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::BitAnd => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(bitwise_and(left.clone(), right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build BIT_AND merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::BitOr => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(bitwise_or(left.clone(), right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build BIT_OR merge expression".to_string(),
})?,
IncrementalAggregateMergeOp::BitXor => when(is_null(left.clone()), right.clone())
.when(is_null(right.clone()), left.clone())
.otherwise(bitwise_xor(left.clone(), right.clone()))
.unwrap(),
.with_context(|_| DatafusionSnafu {
context: "Failed to build BIT_XOR merge expression".to_string(),
})?,
};
merged.alias(merge_col.output_field_name.clone())
Ok(merged.alias(merge_col.output_field_name.clone()))
}
pub async fn get_table_info_df_schema(