From aee72ab36380a36350afa8d71032062c427fd83c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 27 Apr 2025 17:35:29 +0800 Subject: [PATCH] fix clippy Signed-off-by: Ruihang Xia --- src/datatypes/src/vectors/dictionary.rs | 4 +-- src/datatypes/src/vectors/helper.rs | 2 +- src/query/src/dist_plan/commutativity.rs | 24 ++------------ src/query/src/optimizer/parallelize_scan.rs | 34 ++------------------ src/query/src/optimizer/pass_distribution.rs | 14 +------- src/table/src/table/scan.rs | 20 ------------ 6 files changed, 9 insertions(+), 89 deletions(-) diff --git a/src/datatypes/src/vectors/dictionary.rs b/src/datatypes/src/vectors/dictionary.rs index 590d7c99b6..60b06949ac 100644 --- a/src/datatypes/src/vectors/dictionary.rs +++ b/src/datatypes/src/vectors/dictionary.rs @@ -16,8 +16,8 @@ use std::any::Any; use std::sync::Arc; use arrow::array::Array; -use arrow::datatypes::{Int32Type, Int64Type}; -use arrow_array::{ArrayRef, DictionaryArray, Int32Array, Int64Array}; +use arrow::datatypes::Int64Type; +use arrow_array::{ArrayRef, DictionaryArray, Int64Array}; use serde_json::Value as JsonValue; use snafu::ResultExt; diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index f327daba7a..4e23d56809 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, StringArray}; use arrow::compute; use arrow::compute::kernels::comparison; -use arrow::datatypes::{DataType as ArrowDataType, Int32Type, Int64Type, TimeUnit}; +use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit}; use arrow_array::DictionaryArray; use arrow_schema::IntervalUnit; use datafusion_common::ScalarValue; diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index 7f8fca5af9..59871e991c 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -19,9 +19,6 @@ use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode}; use promql::extension_plan::{ EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize, }; -use promql::functions::{ - Delta, HoltWinters, Increase, PredictLinear, QuantileOverTime, Rate, Round, QUANTILE_NAME, -}; use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan}; use crate::dist_plan::MergeScanLogicalPlan; @@ -130,7 +127,7 @@ impl Categorizer { match plan.name() { name if name == SeriesDivide::name() => { let series_divide = plan.as_any().downcast_ref::().unwrap(); - let tags = series_divide.tags().into_iter().collect::>(); + let tags = series_divide.tags().iter().collect::>(); for partition_col in partition_cols { if !tags.contains(partition_col) { return Commutativity::NonCommutative; @@ -173,23 +170,8 @@ impl Categorizer { | Expr::Between(_) | Expr::Exists(_) | Expr::InList(_) => Commutativity::Commutative, - Expr::ScalarFunction(udf) => match udf.name() { - name if name == Delta::name() - || name == Rate::name() - || name == Increase::name() - || name == QuantileOverTime::name() - || name == PredictLinear::name() - || name == HoltWinters::name() - || name == Round::name() => - { - Commutativity::Unimplemented - } - _ => Commutativity::Commutative, - }, - Expr::AggregateFunction(udaf) => match udaf.func.name() { - name if name == QUANTILE_NAME => Commutativity::Unimplemented, - _ => Commutativity::Commutative, - }, + Expr::ScalarFunction(_udf) => Commutativity::Commutative, + Expr::AggregateFunction(_udaf) => Commutativity::Commutative, Expr::Like(_) | Expr::SimilarTo(_) diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index 0c7cf762f4..fbcaa45fde 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -23,7 +23,6 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{DataFusionError, Result}; use store_api::region_engine::PartitionRange; -use store_api::storage::TimeSeriesDistribution; use table::table::scan::RegionScanExec; #[derive(Debug)] @@ -66,28 +65,10 @@ impl ParallelizeScan { return Ok(Transformed::no(plan)); } - // don't parallelize if we want per series distribution - // if matches!( - // region_scan_exec.distribution(), - // Some(TimeSeriesDistribution::PerSeries) - // ) { - // return Ok(Transformed::no(plan)); - // } - - common_telemetry::info!( - "[DEBUG] ParallelizeScan: parallelize scan, distribution: {:?}", - region_scan_exec.distribution() - ); - let ranges = region_scan_exec.get_partition_ranges(); let total_range_num = ranges.len(); let expected_partition_num = config.execution.target_partitions; - - common_telemetry::info!( - "[DEBUG] ParallelizeScan: expected partition num: {expected_partition_num}, total range num: {total_range_num}" - ); - // assign ranges to each partition let mut partition_ranges = Self::assign_partition_range(ranges, expected_partition_num); @@ -152,18 +133,7 @@ impl ParallelizeScan { // Sort ranges by number of rows in descending order. ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows)); - // Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available. - let max_rows = ranges[0].num_rows; - let total_rows = ranges.iter().map(|range| range.num_rows).sum::(); - // Computes the partition num by the max row number. This eliminates the unbalance of the partitions. - let balanced_partition_num = if max_rows > 0 { - total_rows.div_ceil(max_rows) - } else { - ranges.len() - }; - // let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1); - let actual_partition_num = expected_partition_num; - let mut partition_ranges = vec![vec![]; actual_partition_num]; + let mut partition_ranges = vec![vec![]; expected_partition_num]; #[derive(Eq, PartialEq)] struct HeapNode { @@ -185,7 +155,7 @@ impl ParallelizeScan { } let mut part_heap = - BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode { + BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode { num_rows: 0, partition_idx, })); diff --git a/src/query/src/optimizer/pass_distribution.rs b/src/query/src/optimizer/pass_distribution.rs index f89b799d9b..8a096ab780 100644 --- a/src/query/src/optimizer/pass_distribution.rs +++ b/src/query/src/optimizer/pass_distribution.rs @@ -20,8 +20,6 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::Result as DfResult; use datafusion_physical_expr::Distribution; -use store_api::storage::TimeSeriesDistribution; -use table::table::scan::RegionScanExec; use crate::dist_plan::MergeScanExec; @@ -73,17 +71,7 @@ impl PassDistribution { && let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone()) { Ok(Transformed::yes(Arc::new(new_plan) as _)) - } else if let Some(region_scan) = plan.as_any().downcast_ref::() - && let Some(TimeSeriesDistribution::PerSeries) = region_scan.distribution() { - common_telemetry::info!( - "[DEBUG] PassDistribution: pass distribution to RegionScanExec, distribution: PerSeries" - ); - common_telemetry::info!( - "[DEBUG] PassDistribution: existing distribution: {:?}", - region_scan.properties().partitioning - ); - Ok(Transformed::no(plan)) - } else{ + } else { Ok(Transformed::no(plan)) } })?; diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index ad56094094..53690e84eb 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -138,8 +138,6 @@ impl RegionScanExec { None => EquivalenceProperties::new(arrow_schema.clone()), }; - common_telemetry::info!("[DEBUG] RegionScanExec: eq_props: {:?}", eq_props,); - let partitioning = match request.distribution { Some(TimeSeriesDistribution::PerSeries) => { Partitioning::Hash(pk_columns.clone(), num_output_partition) @@ -149,13 +147,6 @@ impl RegionScanExec { } }; - common_telemetry::info!( - "[DEBUG] RegionScanExec: num_partitions: {}, pk_columns: {:?}, distrubution: {:?}", - num_output_partition, - pk_columns, - request.distribution, - ); - let properties = PlanProperties::new( eq_props, partitioning, @@ -212,7 +203,6 @@ impl RegionScanExec { warn!("Setting partition ranges more than once for RegionScanExec"); } - let num_partitions = partitions.len(); let mut properties = self.properties.clone(); let new_partitioning = match properties.partitioning { Partitioning::Hash(ref columns, _) => { @@ -222,16 +212,6 @@ impl RegionScanExec { }; properties.partitioning = new_partitioning; - common_telemetry::info!( - "[DEBUG] RegionScanExec: set new partitions: {:?}", - properties.partitioning - ); - common_telemetry::info!( - "[DEBUG] RegionScanExec: prepare with {} partitions, target_partitions: {}", - partitions.len(), - target_partitions - ); - { let mut scanner = self.scanner.lock().unwrap(); scanner.prepare(