diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index a81e62db1c..df241d64b9 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -298,6 +298,10 @@ impl TreeNodeRewriter for PlanRewriter { // TODO(ruihang): does this work for nodes with multiple children? // replace the current node with expanded one let mut node = MergeScanLogicalPlan::new(node, false).into_logical_plan(); + + let merge_scan_schema = node.schema(); + common_telemetry::info!("[DEBUG] merge scan schema: {:?}", merge_scan_schema); + // expand stages for new_stage in self.stage.drain(..) { node = new_stage.with_new_exprs(new_stage.expressions(), vec![node.clone()])? diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 4649980391..5c0b9269ad 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -182,6 +182,8 @@ impl Categorizer { let transformer = Arc::new(move |plan: &LogicalPlan| { if let LogicalPlan::Aggregate(aggr) = plan { let mut new_plan = aggr.clone(); + + // transform aggr exprs for (expr, transformer) in new_plan.aggr_expr.iter_mut().zip(&expr_transformer) { @@ -190,6 +192,17 @@ impl Categorizer { *expr = new_expr; } } + + // transform group exprs + for expr in new_plan.group_expr.iter_mut() { + // if let Some(transformer) = transformer { + // let new_expr = transformer(expr)?; + // *expr = new_expr; + // } + let expr_name = expr.name_for_alias().expect("not a sort expr"); + *expr = Expr::Column(expr_name.into()); + } + common_telemetry::info!( "[DEBUG] new plan aggr expr: {:?}, group expr: {:?}", new_plan.aggr_expr,