diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index b6d22f7de5..0c7cf762f4 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -67,17 +67,27 @@ impl ParallelizeScan { } // don't parallelize if we want per series distribution - if matches!( - region_scan_exec.distribution(), - Some(TimeSeriesDistribution::PerSeries) - ) { - return Ok(Transformed::no(plan)); - } + // if matches!( + // region_scan_exec.distribution(), + // Some(TimeSeriesDistribution::PerSeries) + // ) { + // return Ok(Transformed::no(plan)); + // } + + common_telemetry::info!( + "[DEBUG] ParallelizeScan: parallelize scan, distribution: {:?}", + region_scan_exec.distribution() + ); let ranges = region_scan_exec.get_partition_ranges(); let total_range_num = ranges.len(); let expected_partition_num = config.execution.target_partitions; + + common_telemetry::info!( + "[DEBUG] ParallelizeScan: expected partition num: {expected_partition_num}, total range num: {total_range_num}" + ); + // assign ranges to each partition let mut partition_ranges = Self::assign_partition_range(ranges, expected_partition_num); @@ -131,11 +141,13 @@ impl ParallelizeScan { ) -> Vec> { if ranges.is_empty() { // Returns a single partition with no range. - return vec![vec![]]; + return vec![vec![]; expected_partition_num]; } if ranges.len() == 1 { - return vec![ranges]; + let mut vec = vec![vec![]; expected_partition_num]; + vec[0] = ranges; + return vec; } // Sort ranges by number of rows in descending order. @@ -149,7 +161,8 @@ impl ParallelizeScan { } else { ranges.len() }; - let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1); + // let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1); + let actual_partition_num = expected_partition_num; let mut partition_ranges = vec![vec![]; actual_partition_num]; #[derive(Eq, PartialEq)] diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index 8a096ab780..f89b799d9b 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -20,6 +20,8 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; +use store_api::storage::TimeSeriesDistribution; +use table::table::scan::RegionScanExec; use crate::dist_plan::MergeScanExec; @@ -71,7 +73,17 @@ impl PassDistribution { && let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone()) { Ok(Transformed::yes(Arc::new(new_plan) as _)) - } else { + } else if let Some(region_scan) = plan.as_any().downcast_ref::() + && let Some(TimeSeriesDistribution::PerSeries) = region_scan.distribution() { + common_telemetry::info!( + "[DEBUG] PassDistribution: pass distribution to RegionScanExec, distribution: PerSeries" + ); + common_telemetry::info!( + "[DEBUG] PassDistribution: existing distribution: {:?}", + region_scan.properties().partitioning + ); + Ok(Transformed::no(plan)) + } else{ Ok(Transformed::no(plan)) } })?; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 812fc2c2af..bf86a88fe8 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -31,6 +31,7 @@ use datafusion::error::Result as DfResult; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::SessionStateBuilder; +use datafusion::physical_optimizer::enforce_sorting::EnforceSorting; use datafusion::physical_optimizer::optimizer::PhysicalOptimizer; use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; use datafusion::physical_optimizer::PhysicalOptimizerRule; @@ -133,6 +134,9 @@ impl QueryEngineState { physical_optimizer .rules .insert(1, Arc::new(PassDistribution)); + physical_optimizer + .rules + .insert(2, Arc::new(EnforceSorting {})); // Add rule for windowed sort physical_optimizer .rules diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index 64e72029b8..5fb800ab3c 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -14,6 +14,7 @@ #![feature(assert_matches)] #![feature(try_blocks)] +#![feature(let_chains)] pub mod dist_table; pub mod error; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index cd69b22b5a..ad56094094 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -82,11 +82,17 @@ impl RegionScanExec { if scanner.properties().is_logical_region() { pk_names.sort_unstable(); } - let mut pk_columns: Vec = pk_names - .into_iter() + let pk_columns = pk_names + .iter() + .filter_map( + |col| Some(Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _), + ) + .collect::>(); + let mut pk_sort_columns: Vec = pk_names + .iter() .filter_map(|col| { Some(PhysicalSortExpr::new( - Arc::new(Column::new_with_schema(&col, &arrow_schema).ok()?) as _, + Arc::new(Column::new_with_schema(col, &arrow_schema).ok()?) as _, SortOptions { descending: false, nulls_first: true, @@ -113,28 +119,46 @@ impl RegionScanExec { let eq_props = match request.distribution { Some(TimeSeriesDistribution::PerSeries) => { if let Some(ts) = ts_col { - pk_columns.push(ts); + pk_sort_columns.push(ts); } EquivalenceProperties::new_with_orderings( arrow_schema.clone(), - &[LexOrdering::new(pk_columns)], + &[LexOrdering::new(pk_sort_columns)], ) } Some(TimeSeriesDistribution::TimeWindowed) => { if let Some(ts_col) = ts_col { - pk_columns.insert(0, ts_col); + pk_sort_columns.insert(0, ts_col); } EquivalenceProperties::new_with_orderings( arrow_schema.clone(), - &[LexOrdering::new(pk_columns)], + &[LexOrdering::new(pk_sort_columns)], ) } None => EquivalenceProperties::new(arrow_schema.clone()), }; + common_telemetry::info!("[DEBUG] RegionScanExec: eq_props: {:?}", eq_props,); + + let partitioning = match request.distribution { + Some(TimeSeriesDistribution::PerSeries) => { + Partitioning::Hash(pk_columns.clone(), num_output_partition) + } + Some(TimeSeriesDistribution::TimeWindowed) | None => { + Partitioning::UnknownPartitioning(num_output_partition) + } + }; + + common_telemetry::info!( + "[DEBUG] RegionScanExec: num_partitions: {}, pk_columns: {:?}, distrubution: {:?}", + num_output_partition, + pk_columns, + request.distribution, + ); + let properties = PlanProperties::new( eq_props, - Partitioning::UnknownPartitioning(num_output_partition), + partitioning, EmissionType::Incremental, Boundedness::Bounded, ); @@ -190,7 +214,23 @@ impl RegionScanExec { let num_partitions = partitions.len(); let mut properties = self.properties.clone(); - properties.partitioning = Partitioning::UnknownPartitioning(num_partitions); + let new_partitioning = match properties.partitioning { + Partitioning::Hash(ref columns, _) => { + Partitioning::Hash(columns.clone(), target_partitions) + } + _ => Partitioning::UnknownPartitioning(target_partitions), + }; + properties.partitioning = new_partitioning; + + common_telemetry::info!( + "[DEBUG] RegionScanExec: set new partitions: {:?}", + properties.partitioning + ); + common_telemetry::info!( + "[DEBUG] RegionScanExec: prepare with {} partitions, target_partitions: {}", + partitions.len(), + target_partitions + ); { let mut scanner = self.scanner.lock().unwrap();