assign partition_ranges

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-08 21:36:29 +08:00
committed by evenyag
parent c18c3f5839
commit 8fa1ebcc3e

View File

@@ -23,6 +23,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
use store_api::region_engine::PartitionRange;
use store_api::storage::TimeSeriesDistribution;
use table::table::scan::RegionScanExec;
#[derive(Debug)]
@@ -61,13 +62,27 @@ impl ParallelizeScan {
} else if let Some(region_scan_exec) =
plan.as_any().downcast_ref::<RegionScanExec>()
{
let expected_partition_num = config.execution.target_partitions;
if region_scan_exec.is_partition_set() {
return Ok(Transformed::no(plan));
}
// don't parallelize if we want per series distribution
if matches!(
region_scan_exec.distribution(),
Some(TimeSeriesDistribution::PerSeries)
) {
let partition_range = region_scan_exec.get_partition_ranges();
let mut new_partitions = vec![vec![]; expected_partition_num];
new_partitions[0] = partition_range;
let new_plan = region_scan_exec
.with_new_partitions(new_partitions, expected_partition_num)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_plan)));
}
let ranges = region_scan_exec.get_partition_ranges();
let total_range_num = ranges.len();
let expected_partition_num = config.execution.target_partitions;
// assign ranges to each partition
let mut partition_ranges =