mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
@@ -16,8 +16,8 @@ use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::Array;
|
||||
use arrow::datatypes::{Int32Type, Int64Type};
|
||||
use arrow_array::{ArrayRef, DictionaryArray, Int32Array, Int64Array};
|
||||
use arrow::datatypes::Int64Type;
|
||||
use arrow_array::{ArrayRef, DictionaryArray, Int64Array};
|
||||
use serde_json::Value as JsonValue;
|
||||
use snafu::ResultExt;
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::sync::Arc;
|
||||
use arrow::array::{Array, ArrayRef, StringArray};
|
||||
use arrow::compute;
|
||||
use arrow::compute::kernels::comparison;
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Int32Type, Int64Type, TimeUnit};
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Int64Type, TimeUnit};
|
||||
use arrow_array::DictionaryArray;
|
||||
use arrow_schema::IntervalUnit;
|
||||
use datafusion_common::ScalarValue;
|
||||
|
||||
@@ -19,9 +19,6 @@ use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
|
||||
use promql::extension_plan::{
|
||||
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
|
||||
};
|
||||
use promql::functions::{
|
||||
Delta, HoltWinters, Increase, PredictLinear, QuantileOverTime, Rate, Round, QUANTILE_NAME,
|
||||
};
|
||||
|
||||
use crate::dist_plan::merge_sort::{merge_sort_transformer, MergeSortLogicalPlan};
|
||||
use crate::dist_plan::MergeScanLogicalPlan;
|
||||
@@ -130,7 +127,7 @@ impl Categorizer {
|
||||
match plan.name() {
|
||||
name if name == SeriesDivide::name() => {
|
||||
let series_divide = plan.as_any().downcast_ref::<SeriesDivide>().unwrap();
|
||||
let tags = series_divide.tags().into_iter().collect::<HashSet<_>>();
|
||||
let tags = series_divide.tags().iter().collect::<HashSet<_>>();
|
||||
for partition_col in partition_cols {
|
||||
if !tags.contains(partition_col) {
|
||||
return Commutativity::NonCommutative;
|
||||
@@ -173,23 +170,8 @@ impl Categorizer {
|
||||
| Expr::Between(_)
|
||||
| Expr::Exists(_)
|
||||
| Expr::InList(_) => Commutativity::Commutative,
|
||||
Expr::ScalarFunction(udf) => match udf.name() {
|
||||
name if name == Delta::name()
|
||||
|| name == Rate::name()
|
||||
|| name == Increase::name()
|
||||
|| name == QuantileOverTime::name()
|
||||
|| name == PredictLinear::name()
|
||||
|| name == HoltWinters::name()
|
||||
|| name == Round::name() =>
|
||||
{
|
||||
Commutativity::Unimplemented
|
||||
}
|
||||
_ => Commutativity::Commutative,
|
||||
},
|
||||
Expr::AggregateFunction(udaf) => match udaf.func.name() {
|
||||
name if name == QUANTILE_NAME => Commutativity::Unimplemented,
|
||||
_ => Commutativity::Commutative,
|
||||
},
|
||||
Expr::ScalarFunction(_udf) => Commutativity::Commutative,
|
||||
Expr::AggregateFunction(_udaf) => Commutativity::Commutative,
|
||||
|
||||
Expr::Like(_)
|
||||
| Expr::SimilarTo(_)
|
||||
|
||||
@@ -23,7 +23,6 @@ use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::{DataFusionError, Result};
|
||||
use store_api::region_engine::PartitionRange;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
use table::table::scan::RegionScanExec;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -66,28 +65,10 @@ impl ParallelizeScan {
|
||||
return Ok(Transformed::no(plan));
|
||||
}
|
||||
|
||||
// don't parallelize if we want per series distribution
|
||||
// if matches!(
|
||||
// region_scan_exec.distribution(),
|
||||
// Some(TimeSeriesDistribution::PerSeries)
|
||||
// ) {
|
||||
// return Ok(Transformed::no(plan));
|
||||
// }
|
||||
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] ParallelizeScan: parallelize scan, distribution: {:?}",
|
||||
region_scan_exec.distribution()
|
||||
);
|
||||
|
||||
let ranges = region_scan_exec.get_partition_ranges();
|
||||
let total_range_num = ranges.len();
|
||||
let expected_partition_num = config.execution.target_partitions;
|
||||
|
||||
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] ParallelizeScan: expected partition num: {expected_partition_num}, total range num: {total_range_num}"
|
||||
);
|
||||
|
||||
// assign ranges to each partition
|
||||
let mut partition_ranges =
|
||||
Self::assign_partition_range(ranges, expected_partition_num);
|
||||
@@ -152,18 +133,7 @@ impl ParallelizeScan {
|
||||
|
||||
// Sort ranges by number of rows in descending order.
|
||||
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
|
||||
// Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available.
|
||||
let max_rows = ranges[0].num_rows;
|
||||
let total_rows = ranges.iter().map(|range| range.num_rows).sum::<usize>();
|
||||
// Computes the partition num by the max row number. This eliminates the unbalance of the partitions.
|
||||
let balanced_partition_num = if max_rows > 0 {
|
||||
total_rows.div_ceil(max_rows)
|
||||
} else {
|
||||
ranges.len()
|
||||
};
|
||||
// let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
|
||||
let actual_partition_num = expected_partition_num;
|
||||
let mut partition_ranges = vec![vec![]; actual_partition_num];
|
||||
let mut partition_ranges = vec![vec![]; expected_partition_num];
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
struct HeapNode {
|
||||
@@ -185,7 +155,7 @@ impl ParallelizeScan {
|
||||
}
|
||||
|
||||
let mut part_heap =
|
||||
BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode {
|
||||
BinaryHeap::from_iter((0..expected_partition_num).map(|partition_idx| HeapNode {
|
||||
num_rows: 0,
|
||||
partition_idx,
|
||||
}));
|
||||
|
||||
@@ -20,8 +20,6 @@ use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode};
|
||||
use datafusion_common::Result as DfResult;
|
||||
use datafusion_physical_expr::Distribution;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
use table::table::scan::RegionScanExec;
|
||||
|
||||
use crate::dist_plan::MergeScanExec;
|
||||
|
||||
@@ -73,17 +71,7 @@ impl PassDistribution {
|
||||
&& let Some(new_plan) = merge_scan.try_with_new_distribution(distribution.clone())
|
||||
{
|
||||
Ok(Transformed::yes(Arc::new(new_plan) as _))
|
||||
} else if let Some(region_scan) = plan.as_any().downcast_ref::<RegionScanExec>()
|
||||
&& let Some(TimeSeriesDistribution::PerSeries) = region_scan.distribution() {
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] PassDistribution: pass distribution to RegionScanExec, distribution: PerSeries"
|
||||
);
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] PassDistribution: existing distribution: {:?}",
|
||||
region_scan.properties().partitioning
|
||||
);
|
||||
Ok(Transformed::no(plan))
|
||||
} else{
|
||||
} else {
|
||||
Ok(Transformed::no(plan))
|
||||
}
|
||||
})?;
|
||||
|
||||
@@ -138,8 +138,6 @@ impl RegionScanExec {
|
||||
None => EquivalenceProperties::new(arrow_schema.clone()),
|
||||
};
|
||||
|
||||
common_telemetry::info!("[DEBUG] RegionScanExec: eq_props: {:?}", eq_props,);
|
||||
|
||||
let partitioning = match request.distribution {
|
||||
Some(TimeSeriesDistribution::PerSeries) => {
|
||||
Partitioning::Hash(pk_columns.clone(), num_output_partition)
|
||||
@@ -149,13 +147,6 @@ impl RegionScanExec {
|
||||
}
|
||||
};
|
||||
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] RegionScanExec: num_partitions: {}, pk_columns: {:?}, distrubution: {:?}",
|
||||
num_output_partition,
|
||||
pk_columns,
|
||||
request.distribution,
|
||||
);
|
||||
|
||||
let properties = PlanProperties::new(
|
||||
eq_props,
|
||||
partitioning,
|
||||
@@ -212,7 +203,6 @@ impl RegionScanExec {
|
||||
warn!("Setting partition ranges more than once for RegionScanExec");
|
||||
}
|
||||
|
||||
let num_partitions = partitions.len();
|
||||
let mut properties = self.properties.clone();
|
||||
let new_partitioning = match properties.partitioning {
|
||||
Partitioning::Hash(ref columns, _) => {
|
||||
@@ -222,16 +212,6 @@ impl RegionScanExec {
|
||||
};
|
||||
properties.partitioning = new_partitioning;
|
||||
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] RegionScanExec: set new partitions: {:?}",
|
||||
properties.partitioning
|
||||
);
|
||||
common_telemetry::info!(
|
||||
"[DEBUG] RegionScanExec: prepare with {} partitions, target_partitions: {}",
|
||||
partitions.len(),
|
||||
target_partitions
|
||||
);
|
||||
|
||||
{
|
||||
let mut scanner = self.scanner.lock().unwrap();
|
||||
scanner.prepare(
|
||||
|
||||
Reference in New Issue
Block a user