From 3094a7868dd080eb111aa292d7bdeddd89494544 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 14 May 2026 13:08:32 +0800 Subject: [PATCH] chore: rm unwrap Signed-off-by: discord9 --- src/flow/src/batching_mode/utils.rs | 49 +++++++++++++++++++---------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index 70d737732a..78ee05ad9a 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -305,12 +305,13 @@ pub async fn rewrite_incremental_aggregate_with_sink_merge( .iter() .map(|c| col(format!("{delta_alias}.{c}")).alias(c.clone())) .collect::>(); - projection_exprs.extend( - analysis - .merge_columns - .iter() - .map(|merge_col| build_left_join_merge_expr(delta_alias, sink_alias, merge_col)), - ); + for merge_col in &analysis.merge_columns { + projection_exprs.push(build_left_join_merge_expr( + delta_alias, + sink_alias, + merge_col, + )?); + } LogicalPlanBuilder::from(joined) .project(projection_exprs) @@ -327,44 +328,60 @@ fn build_left_join_merge_expr( delta_alias: &str, sink_alias: &str, merge_col: &IncrementalAggregateMergeColumn, -) -> Expr { +) -> Result { let left = col(format!("{delta_alias}.{}", merge_col.output_field_name)); let right = col(format!("{sink_alias}.{}", merge_col.output_field_name)); let merged = match merge_col.merge_op { IncrementalAggregateMergeOp::Sum => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(binary_expr(left.clone(), Operator::Plus, right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build SUM merge expression".to_string(), + })?, IncrementalAggregateMergeOp::Min => when(is_null(right.clone()), left.clone()) .when(left.clone().lt_eq(right.clone()), left.clone()) .otherwise(right.clone()) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build MIN merge expression".to_string(), + })?, IncrementalAggregateMergeOp::Max => when(is_null(right.clone()), left.clone()) .when(left.clone().gt_eq(right.clone()), left.clone()) .otherwise(right.clone()) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build MAX merge expression".to_string(), + })?, IncrementalAggregateMergeOp::BoolAnd => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(and(left.clone(), right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build BOOL_AND merge expression".to_string(), + })?, IncrementalAggregateMergeOp::BoolOr => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(or(left.clone(), right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build BOOL_OR merge expression".to_string(), + })?, IncrementalAggregateMergeOp::BitAnd => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(bitwise_and(left.clone(), right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build BIT_AND merge expression".to_string(), + })?, IncrementalAggregateMergeOp::BitOr => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(bitwise_or(left.clone(), right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build BIT_OR merge expression".to_string(), + })?, IncrementalAggregateMergeOp::BitXor => when(is_null(left.clone()), right.clone()) .when(is_null(right.clone()), left.clone()) .otherwise(bitwise_xor(left.clone(), right.clone())) - .unwrap(), + .with_context(|_| DatafusionSnafu { + context: "Failed to build BIT_XOR merge expression".to_string(), + })?, }; - merged.alias(merge_col.output_field_name.clone()) + Ok(merged.alias(merge_col.output_field_name.clone())) } pub async fn get_table_info_df_schema(