diff --git a/src/query/src/dist_plan/analyzer.rs b/src/query/src/dist_plan/analyzer.rs index e9dd698354..e1c68c94e8 100644 --- a/src/query/src/dist_plan/analyzer.rs +++ b/src/query/src/dist_plan/analyzer.rs @@ -647,6 +647,8 @@ struct EnforceDistRequirementRewriter { /// when on `Projection` node, we don't need to apply the column requirements of `Aggregate` node /// because the `Projection` node is not in the scope of the `Aggregate` node cur_level: usize, + /// The base level where the rewriter is created + base_level: usize, plan_per_level: BTreeMap, } @@ -659,6 +661,7 @@ impl EnforceDistRequirementRewriter { Self { column_requirements, cur_level, + base_level: cur_level, plan_per_level: BTreeMap::new(), } } @@ -789,8 +792,10 @@ impl TreeNodeRewriter for EnforceDistRequirementRewriter { // still need to continue for next projection if applicable return Ok(Transformed::yes(new_node)); - } else if let LogicalPlan::Aggregate(_) = node { - // something is wrong, we shouldn't add column requirements for aggregate node + } else if let LogicalPlan::Aggregate(_) = node + && self.cur_level > self.base_level + { + // something is wrong, we shouldn't add column requirements for aggregate node that isn't part of a step aggregate // because aggregate node will change the schema and may drop certain columns rightfully // need to return a error with enough debug info for debugging let applicable_column_requirements =