diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index eb712ec920..840cf16160 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -23,6 +23,7 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{DataFusionError, Result}; use store_api::region_engine::PartitionRange; +use store_api::storage::TimeSeriesDistribution; use table::table::scan::RegionScanExec; #[derive(Debug)] @@ -61,13 +62,27 @@ impl ParallelizeScan { } else if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + let expected_partition_num = config.execution.target_partitions; if region_scan_exec.is_partition_set() { return Ok(Transformed::no(plan)); } + // don't parallelize if we want per series distribution + if matches!( + region_scan_exec.distribution(), + Some(TimeSeriesDistribution::PerSeries) + ) { + let partition_range = region_scan_exec.get_partition_ranges(); + let mut new_partitions = vec![vec![]; expected_partition_num]; + new_partitions[0] = partition_range; + let new_plan = region_scan_exec + .with_new_partitions(new_partitions, expected_partition_num) + .map_err(|e| DataFusionError::External(e.into_inner()))?; + return Ok(Transformed::yes(Arc::new(new_plan))); + } + let ranges = region_scan_exec.get_partition_ranges(); let total_range_num = ranges.len(); - let expected_partition_num = config.execution.target_partitions; // assign ranges to each partition let mut partition_ranges =