diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 4f96966b46..194e2d3b9d 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -158,8 +158,9 @@ impl Categorizer { | Expr::ScalarSubquery(_) | Expr::Wildcard { .. } => Commutativity::Unimplemented, - Expr::Alias(_) - | Expr::Unnest(_) + Expr::Alias(alias) => Self::check_expr(&alias.expr), + + Expr::Unnest(_) | Expr::GroupingSet(_) | Expr::Placeholder(_) | Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented, diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 2ce78d38b9..8751211c2d 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -88,7 +90,7 @@ impl WindowedSortPhysicalRule { .expr .as_any() .downcast_ref::() - && column_expr.name() == scanner_info.time_index + && scanner_info.time_index.contains(column_expr.name()) { } else { return Ok(Transformed::no(plan)); @@ -148,13 +150,13 @@ impl WindowedSortPhysicalRule { #[derive(Debug)] struct ScannerInfo { partition_ranges: Vec>, - time_index: String, + time_index: HashSet, tag_columns: Vec, } fn fetch_partition_range(input: Arc) -> DataFusionResult> { let mut partition_ranges = None; - let mut time_index = None; + let mut time_index = HashSet::new(); let mut tag_columns = None; let mut is_batch_coalesced = false; @@ -172,9 +174,21 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { + for (expr, output_name) in projection.expr() { + if let Some(column_expr) = expr.as_any().downcast_ref::() { + if time_index.contains(column_expr.name()) { + time_index.insert(output_name.clone()); + } + } + } + } + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); - time_index = Some(region_scan_exec.time_index()); + // Reset time index column. + time_index = HashSet::from([region_scan_exec.time_index()]); tag_columns = Some(region_scan_exec.tag_columns()); // set distinguish_partition_ranges to true, this is an incorrect workaround @@ -189,7 +203,7 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult