handle partition and ordering

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-27 15:38:57 +08:00
parent 2799d67212
commit 5b78d76fc5
5 changed files with 89 additions and 19 deletions

View File

@@ -67,17 +67,27 @@ impl ParallelizeScan {
}
// don't parallelize if we want per series distribution
if matches!(
region_scan_exec.distribution(),
Some(TimeSeriesDistribution::PerSeries)
) {
return Ok(Transformed::no(plan));
}
// if matches!(
// region_scan_exec.distribution(),
// Some(TimeSeriesDistribution::PerSeries)
// ) {
// return Ok(Transformed::no(plan));
// }
common_telemetry::info!(
"[DEBUG] ParallelizeScan: parallelize scan, distribution: {:?}",
region_scan_exec.distribution()
);
let ranges = region_scan_exec.get_partition_ranges();
let total_range_num = ranges.len();
let expected_partition_num = config.execution.target_partitions;
common_telemetry::info!(
"[DEBUG] ParallelizeScan: expected partition num: {expected_partition_num}, total range num: {total_range_num}"
);
// assign ranges to each partition
let mut partition_ranges =
Self::assign_partition_range(ranges, expected_partition_num);
@@ -131,11 +141,13 @@ impl ParallelizeScan {
) -> Vec<Vec<PartitionRange>> {
if ranges.is_empty() {
// Returns a single partition with no range.
return vec![vec![]];
return vec![vec![]; expected_partition_num];
}
if ranges.len() == 1 {
return vec![ranges];
let mut vec = vec![vec![]; expected_partition_num];
vec[0] = ranges;
return vec;
}
// Sort ranges by number of rows in descending order.
@@ -149,7 +161,8 @@ impl ParallelizeScan {
} else {
ranges.len()
};
let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
// let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
let actual_partition_num = expected_partition_num;
let mut partition_ranges = vec![vec![]; actual_partition_num];
#[derive(Eq, PartialEq)]

View File

@@ -20,6 +20,8 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DfResult;
use datafusion_physical_expr::Distribution;
use store_api::storage::TimeSeriesDistribution;
use table::table::scan::RegionScanExec;
use crate::dist_plan::MergeScanExec;
@@ -71,7 +73,17 @@ impl PassDistribution {
&& let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone())
{
Ok(Transformed::yes(Arc::new(new_plan) as _))
} else {
} else if let Some(region_scan) = plan.as_any().downcast_ref::<RegionScanExec>()
&& let Some(TimeSeriesDistribution::PerSeries) = region_scan.distribution() {
common_telemetry::info!(
"[DEBUG] PassDistribution: pass distribution to RegionScanExec, distribution: PerSeries"
);
common_telemetry::info!(
"[DEBUG] PassDistribution: existing distribution: {:?}",
region_scan.properties().partitioning
);
Ok(Transformed::no(plan))
} else{
Ok(Transformed::no(plan))
}
})?;

View File

@@ -31,6 +31,7 @@ use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::enforce_sorting::EnforceSorting;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
@@ -133,6 +134,9 @@ impl QueryEngineState {
physical_optimizer
.rules
.insert(1, Arc::new(PassDistribution));
physical_optimizer
.rules
.insert(2, Arc::new(EnforceSorting {}));
// Add rule for windowed sort
physical_optimizer
.rules

View File

@@ -14,6 +14,7 @@
#![feature(assert_matches)]
#![feature(try_blocks)]
#![feature(let_chains)]
pub mod dist_table;
pub mod error;

View File

@@ -82,11 +82,17 @@ impl RegionScanExec {
if scanner.properties().is_logical_region() {
pk_names.sort_unstable();
}
let mut pk_columns: Vec<PhysicalSortExpr> = pk_names
.into_iter()
let pk_columns = pk_names
.iter()
.filter_map(
|col| Some(Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _),
)
.collect::<Vec<_>>();
let mut pk_sort_columns: Vec<PhysicalSortExpr> = pk_names
.iter()
.filter_map(|col| {
Some(PhysicalSortExpr::new(
Arc::new(Column::new_with_schema(&col, &arrow_schema).ok()?) as _,
Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _,
SortOptions {
descending: false,
nulls_first: true,
@@ -113,28 +119,46 @@ impl RegionScanExec {
let eq_props = match request.distribution {
Some(TimeSeriesDistribution::PerSeries) => {
if let Some(ts) = ts_col {
pk_columns.push(ts);
pk_sort_columns.push(ts);
}
EquivalenceProperties::new_with_orderings(
arrow_schema.clone(),
&[LexOrdering::new(pk_columns)],
&[LexOrdering::new(pk_sort_columns)],
)
}
Some(TimeSeriesDistribution::TimeWindowed) => {
if let Some(ts_col) = ts_col {
pk_columns.insert(0, ts_col);
pk_sort_columns.insert(0, ts_col);
}
EquivalenceProperties::new_with_orderings(
arrow_schema.clone(),
&[LexOrdering::new(pk_columns)],
&[LexOrdering::new(pk_sort_columns)],
)
}
None => EquivalenceProperties::new(arrow_schema.clone()),
};
common_telemetry::info!("[DEBUG] RegionScanExec: eq_props: {:?}", eq_props,);
let partitioning = match request.distribution {
Some(TimeSeriesDistribution::PerSeries) => {
Partitioning::Hash(pk_columns.clone(), num_output_partition)
}
Some(TimeSeriesDistribution::TimeWindowed) | None => {
Partitioning::UnknownPartitioning(num_output_partition)
}
};
common_telemetry::info!(
"[DEBUG] RegionScanExec: num_partitions: {}, pk_columns: {:?}, distrubution: {:?}",
num_output_partition,
pk_columns,
request.distribution,
);
let properties = PlanProperties::new(
eq_props,
Partitioning::UnknownPartitioning(num_output_partition),
partitioning,
EmissionType::Incremental,
Boundedness::Bounded,
);
@@ -190,7 +214,23 @@ impl RegionScanExec {
let num_partitions = partitions.len();
let mut properties = self.properties.clone();
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);
let new_partitioning = match properties.partitioning {
Partitioning::Hash(ref columns, _) => {
Partitioning::Hash(columns.clone(), target_partitions)
}
_ => Partitioning::UnknownPartitioning(target_partitions),
};
properties.partitioning = new_partitioning;
common_telemetry::info!(
"[DEBUG] RegionScanExec: set new partitions: {:?}",
properties.partitioning
);
common_telemetry::info!(
"[DEBUG] RegionScanExec: prepare with {} partitions, target_partitions: {}",
partitions.len(),
target_partitions
);
{
let mut scanner = self.scanner.lock().unwrap();