From 474a6893098046e553c4b0e25810536e6fa9faae Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 20 Aug 2025 11:47:38 -0700 Subject: [PATCH] feat: region prune part 2 (#6752) * skeleton Signed-off-by: Ruihang Xia * get rule set Signed-off-by: Ruihang Xia * adjust style Signed-off-by: Ruihang Xia * adjust params Signed-off-by: Ruihang Xia * reuse collider Signed-off-by: Ruihang Xia * canonize Signed-off-by: Ruihang Xia * more robust predicate extractor Signed-off-by: Ruihang Xia * simplify predicate extractor's test and impl Signed-off-by: Ruihang Xia * unify import Signed-off-by: Ruihang Xia * simplification, remove unnecessary interfaces Signed-off-by: Ruihang Xia * handle partial referenced exprs Signed-off-by: Ruihang Xia * finalize predicate extractor Signed-off-by: Ruihang Xia * document region pruner Signed-off-by: Ruihang Xia * chore: reduce diff Signed-off-by: Ruihang Xia * simplify checker Signed-off-by: Ruihang Xia * refine overlapping check method Signed-off-by: Ruihang Xia * reduce diff Signed-off-by: Ruihang Xia * coerce types Signed-off-by: Ruihang Xia * remove unused errors Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * apply review comment Signed-off-by: Ruihang Xia * refactor use Bound Signed-off-by: Ruihang Xia * simplify hashmap Signed-off-by: Ruihang Xia * Apply suggestions from code review Co-authored-by: Yingwen * sqlness tests Signed-off-by: Ruihang Xia * redact region id Signed-off-by: Ruihang Xia * test: update sqlness result after udpate datafusion Signed-off-by: discord9 --------- Signed-off-by: Ruihang Xia Signed-off-by: discord9 Co-authored-by: Yingwen Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com> Co-authored-by: discord9 --- src/datanode/src/datanode.rs | 1 + src/flow/src/server.rs | 1 + src/frontend/src/instance/builder.rs | 3 +- src/partition/src/collider.rs | 37 +- src/partition/src/expr.rs | 9 +- src/query/src/dist_plan.rs | 4 +- src/query/src/dist_plan/planner.rs | 111 ++- src/query/src/dist_plan/region_pruner.rs | 755 ++++++++++++++++++ src/query/src/error.rs | 10 +- src/query/src/promql/planner.rs | 11 +- src/query/src/query_engine.rs | 4 + src/query/src/query_engine/context.rs | 1 + src/query/src/query_engine/state.rs | 9 +- src/query/src/tests/query_engine_test.rs | 1 + .../cases/standalone/common/partition.result | 209 +++++ tests/cases/standalone/common/partition.sql | 59 ++ 16 files changed, 1203 insertions(+), 22 deletions(-) create mode 100644 src/query/src/dist_plan/region_pruner.rs diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3c94bc9f3c..181b18d937 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -384,6 +384,7 @@ impl DatanodeBuilder { None, None, None, + None, false, self.plugins.clone(), opts.query.clone(), diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index c756dc2349..77e27bd90f 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -346,6 +346,7 @@ impl FlownodeBuilder { None, None, None, + None, false, Default::default(), self.opts.query.clone(), diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 021a40d8be..bc4aaee4b5 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -145,7 +145,7 @@ impl FrontendBuilder { )); let requester = Arc::new(Requester::new( self.catalog_manager.clone(), - partition_manager, + partition_manager.clone(), node_manager.clone(), )); let table_mutation_handler = Arc::new(TableMutationOperator::new( @@ -165,6 +165,7 @@ impl FrontendBuilder { let query_engine = QueryEngineFactory::new_with_plugins( self.catalog_manager.clone(), + Some(partition_manager.clone()), Some(region_query_handler.clone()), Some(table_mutation_handler), Some(procedure_service_handler), diff --git a/src/partition/src/collider.rs b/src/partition/src/collider.rs index e77afc345d..67bf89b806 100644 --- a/src/partition/src/collider.rs +++ b/src/partition/src/collider.rs @@ -46,12 +46,12 @@ pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64); /// Represents an "atomic" Expression, which isn't composed (OR-ed) of other expressions. #[allow(unused)] #[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct AtomicExpr { +pub struct AtomicExpr { /// A (ordered) list of simplified expressions. They are [`RestrictedOp::And`]'ed together. - pub(crate) nucleons: Vec, + pub nucleons: Vec, /// Index to reference the [`PartitionExpr`] that this [`AtomicExpr`] is derived from. /// This index is used with `exprs` field in [`MultiDimPartitionRule`](crate::multi_dim::MultiDimPartitionRule). - pub(crate) source_expr_index: usize, + pub source_expr_index: usize, } impl AtomicExpr { @@ -78,7 +78,7 @@ impl PartialOrd for AtomicExpr { /// /// This struct is used to compose [`AtomicExpr`], hence "nucleon". #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub(crate) struct NucleonExpr { +pub struct NucleonExpr { column: String, op: GluonOp, /// Normalized [`Value`]. @@ -93,6 +93,29 @@ impl NucleonExpr { lit(*self.value.as_ref()), )) } + + /// Get the column name + pub fn column(&self) -> &str { + &self.column + } + + /// Get the normalized value + pub fn value(&self) -> OrderedF64 { + self.value + } + + /// Get the operation + pub fn op(&self) -> &GluonOp { + &self.op + } + + pub fn new(column: impl Into, op: GluonOp, value: OrderedF64) -> Self { + Self { + column: column.into(), + op, + value, + } + } } /// Further restricted operation set. @@ -100,7 +123,7 @@ impl NucleonExpr { /// Conjunction operations are removed from [`RestrictedOp`]. /// This enumeration is used to bind elements in [`NucleonExpr`], hence "gluon". #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -enum GluonOp { +pub enum GluonOp { Eq, NotEq, Lt, @@ -129,11 +152,11 @@ impl GluonOp { pub struct Collider<'a> { source_exprs: &'a [PartitionExpr], - pub(crate) atomic_exprs: Vec, + pub atomic_exprs: Vec, /// A map of column name to a list of `(value, normalized value)` pairs. /// /// The normalized value is used for comparison. The normalization process keeps the order of the values. - pub(crate) normalized_values: HashMap>, + pub normalized_values: HashMap>, } impl<'a> Collider<'a> { diff --git a/src/partition/src/expr.rs b/src/partition/src/expr.rs index 460c789008..cb73bad690 100644 --- a/src/partition/src/expr.rs +++ b/src/partition/src/expr.rs @@ -38,9 +38,9 @@ use crate::partition::PartitionBound; /// [Expr]: sqlparser::ast::Expr #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct PartitionExpr { - pub(crate) lhs: Box, - pub(crate) op: RestrictedOp, - pub(crate) rhs: Box, + pub lhs: Box, + pub op: RestrictedOp, + pub rhs: Box, } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] @@ -252,14 +252,17 @@ impl PartitionExpr { Ok(expr) } + /// Get the left-hand side operand pub fn lhs(&self) -> &Operand { &self.lhs } + /// Get the right-hand side operand pub fn rhs(&self) -> &Operand { &self.rhs } + /// Get the operation pub fn op(&self) -> &RestrictedOp { &self.op } diff --git a/src/query/src/dist_plan.rs b/src/query/src/dist_plan.rs index c15f4b0e54..2a6ba0dfbb 100644 --- a/src/query/src/dist_plan.rs +++ b/src/query/src/dist_plan.rs @@ -17,9 +17,11 @@ mod commutativity; mod merge_scan; mod merge_sort; mod planner; -#[allow(dead_code)] mod predicate_extractor; +mod region_pruner; pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions}; pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan}; pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner}; +pub use predicate_extractor::PredicateExtractor; +pub use region_pruner::ConstraintPruner; diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1d79cc204a..1f95c6fe71 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use ahash::HashMap; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -27,6 +28,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{DataFusionError, TableReference}; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -36,6 +38,8 @@ use table::table_name::TableName; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::dist_plan::merge_sort::MergeSortLogicalPlan; +use crate::dist_plan::region_pruner::ConstraintPruner; +use crate::dist_plan::PredicateExtractor; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; use crate::region_query::RegionQueryHandlerRef; @@ -101,16 +105,19 @@ impl ExtensionPlanner for MergeSortExtensionPlanner { pub struct DistExtensionPlanner { catalog_manager: CatalogManagerRef, + partition_rule_manager: PartitionRuleManagerRef, region_query_handler: RegionQueryHandlerRef, } impl DistExtensionPlanner { pub fn new( catalog_manager: CatalogManagerRef, + partition_rule_manager: PartitionRuleManagerRef, region_query_handler: RegionQueryHandlerRef, ) -> Self { Self { catalog_manager, + partition_rule_manager, region_query_handler, } } @@ -150,7 +157,7 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(optimized_plan).await; }; - let Ok(regions) = self.get_regions(&table_name).await else { + let Ok(regions) = self.get_regions(&table_name, input_plan).await else { // no peers found, going to execute them locally return fallback(optimized_plan).await; }; @@ -184,7 +191,11 @@ impl DistExtensionPlanner { Ok(extractor.table_name) } - async fn get_regions(&self, table_name: &TableName) -> Result> { + async fn get_regions( + &self, + table_name: &TableName, + logical_plan: &LogicalPlan, + ) -> Result> { let table = self .catalog_manager .table( @@ -198,7 +209,101 @@ impl DistExtensionPlanner { .with_context(|| TableNotFoundSnafu { table: table_name.to_string(), })?; - Ok(table.table_info().region_ids()) + + let table_info = table.table_info(); + let all_regions = table_info.region_ids(); + + // Extract partition columns + let partition_columns: Vec = table_info + .meta + .partition_column_names() + .map(|s| s.to_string()) + .collect(); + if partition_columns.is_empty() { + return Ok(all_regions); + } + let partition_column_types = partition_columns + .iter() + .map(|col_name| { + let data_type = table_info + .meta + .schema + .column_schema_by_name(col_name) + // Safety: names are retrieved above from the same table + .unwrap() + .data_type + .clone(); + (col_name.clone(), data_type) + }) + .collect::>(); + + // Extract predicates from logical plan + let partition_expressions = match PredicateExtractor::extract_partition_expressions( + logical_plan, + &partition_columns, + ) { + Ok(expressions) => expressions, + Err(err) => { + common_telemetry::debug!( + "Failed to extract partition expressions for table {} (id: {}), using all regions: {:?}", + table_name, + table.table_info().table_id(), + err + ); + return Ok(all_regions); + } + }; + + if partition_expressions.is_empty() { + return Ok(all_regions); + } + + // Get partition information for the table if partition rule manager is available + let partitions = match self + .partition_rule_manager + .find_table_partitions(table.table_info().table_id()) + .await + { + Ok(partitions) => partitions, + Err(err) => { + common_telemetry::debug!( + "Failed to get partition information for table {}, using all regions: {:?}", + table_name, + err + ); + return Ok(all_regions); + } + }; + if partitions.is_empty() { + return Ok(all_regions); + } + + // Apply region pruning based on partition rules + let pruned_regions = match ConstraintPruner::prune_regions( + &partition_expressions, + &partitions, + partition_column_types, + ) { + Ok(regions) => regions, + Err(err) => { + common_telemetry::debug!( + "Failed to prune regions for table {}, using all regions: {:?}", + table_name, + err + ); + return Ok(all_regions); + } + }; + + common_telemetry::debug!( + "Region pruning for table {}: {} partition expressions applied, pruned from {} to {} regions", + table_name, + partition_expressions.len(), + all_regions.len(), + pruned_regions.len() + ); + + Ok(pruned_regions) } /// Input logical plan is analyzed. Thus only call logical optimizer to optimize it. diff --git a/src/query/src/dist_plan/region_pruner.rs b/src/query/src/dist_plan/region_pruner.rs new file mode 100644 index 0000000000..fd5c6a51ea --- /dev/null +++ b/src/query/src/dist_plan/region_pruner.rs @@ -0,0 +1,755 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! [`ConstraintPruner`] prunes partition info based on given expressions. + +use std::cmp::Ordering; +use std::ops::Bound; + +use ahash::{HashMap, HashSet}; +use common_telemetry::debug; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{OrderedF64, OrderedFloat, Value}; +use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr}; +use partition::expr::{Operand, PartitionExpr}; +use partition::manager::PartitionInfo; +use store_api::storage::RegionId; +use GluonOp::*; + +use crate::error::Result; + +pub struct ConstraintPruner; + +impl ConstraintPruner { + /// Prune regions using constraint satisfaction approach + /// + /// Takes query expressions and partition info, returns matching region IDs + pub fn prune_regions( + query_expressions: &[PartitionExpr], + partitions: &[PartitionInfo], + column_datatypes: HashMap, + ) -> Result> { + let start = std::time::Instant::now(); + if query_expressions.is_empty() || partitions.is_empty() { + // No constraints, return all regions + return Ok(partitions.iter().map(|p| p.id).collect()); + } + + // Collect all partition expressions for unified normalization + let mut expression_to_partition = Vec::with_capacity(partitions.len()); + let mut all_partition_expressions = Vec::with_capacity(partitions.len()); + for partition in partitions { + if let Some(expr) = &partition.partition_expr { + expression_to_partition.push(partition.id); + all_partition_expressions.push(expr.clone()); + } + } + if all_partition_expressions.is_empty() { + return Ok(partitions.iter().map(|p| p.id).collect()); + } + + // Create unified collider with both query and partition expressions for consistent normalization + let mut all_expressions = query_expressions.to_vec(); + all_expressions.extend(all_partition_expressions.iter().cloned()); + if !Self::normalize_datatype(&mut all_expressions, &column_datatypes) { + return Ok(partitions.iter().map(|p| p.id).collect()); + } + + let collider = match Collider::new(&all_expressions) { + Ok(collider) => collider, + Err(err) => { + debug!( + "Failed to create unified collider: {}, returning all regions conservatively", + err + ); + return Ok(partitions.iter().map(|p| p.id).collect()); + } + }; + + // Extract query atomic expressions (first N expressions in the collider) + let query_atomics: Vec<&AtomicExpr> = collider + .atomic_exprs + .iter() + .filter(|atomic| atomic.source_expr_index < query_expressions.len()) + .collect(); + + let mut candidate_regions = HashSet::default(); + + for region_atomics in collider + .atomic_exprs + .iter() + .filter(|atomic| atomic.source_expr_index >= query_expressions.len()) + { + if Self::atomic_sets_overlap(&query_atomics, region_atomics) { + let partition_expr_index = + region_atomics.source_expr_index - query_expressions.len(); + candidate_regions.insert(expression_to_partition[partition_expr_index]); + } + } + + debug!( + "Constraint pruning (cost {}ms): {} -> {} regions", + start.elapsed().as_millis(), + partitions.len(), + candidate_regions.len() + ); + + Ok(candidate_regions.into_iter().collect()) + } + + fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool { + for query_atomic in query_atomics { + if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) { + return true; + } + } + + false + } + + fn normalize_datatype( + all_expressions: &mut Vec, + column_datatypes: &HashMap, + ) -> bool { + for expr in all_expressions { + if !Self::normalize_expr_datatype(&mut expr.lhs, &mut expr.rhs, column_datatypes) { + return false; + } + } + true + } + + fn normalize_expr_datatype( + lhs: &mut Operand, + rhs: &mut Operand, + column_datatypes: &HashMap, + ) -> bool { + match (lhs, rhs) { + (Operand::Expr(lhs_expr), Operand::Expr(rhs_expr)) => { + Self::normalize_expr_datatype( + &mut lhs_expr.lhs, + &mut lhs_expr.rhs, + column_datatypes, + ) && Self::normalize_expr_datatype( + &mut rhs_expr.lhs, + &mut rhs_expr.rhs, + column_datatypes, + ) + } + (Operand::Column(col_name), Operand::Value(val)) + | (Operand::Value(val), Operand::Column(col_name)) => { + let Some(datatype) = column_datatypes.get(col_name) else { + debug!("Column {} not found from type set, skip pruning", col_name); + return false; + }; + + match datatype { + ConcreteDataType::Int8(_) + | ConcreteDataType::Int16(_) + | ConcreteDataType::Int32(_) + | ConcreteDataType::Int64(_) => { + let Some(new_lit) = val.as_i64() else { + debug!("Value {:?} cannot be converted to i64", val); + return false; + }; + *val = Value::Int64(new_lit); + } + + ConcreteDataType::UInt8(_) + | ConcreteDataType::UInt16(_) + | ConcreteDataType::UInt32(_) + | ConcreteDataType::UInt64(_) => { + let Some(new_lit) = val.as_u64() else { + debug!("Value {:?} cannot be converted to u64", val); + return false; + }; + *val = Value::UInt64(new_lit); + } + + ConcreteDataType::Float32(_) | ConcreteDataType::Float64(_) => { + let Some(new_lit) = val.as_f64_lossy() else { + debug!("Value {:?} cannot be converted to f64", val); + return false; + }; + + *val = Value::Float64(OrderedFloat(new_lit)); + } + + ConcreteDataType::String(_) | ConcreteDataType::Boolean(_) => { + // no operation needed + } + + ConcreteDataType::Decimal128(_) + | ConcreteDataType::Binary(_) + | ConcreteDataType::Date(_) + | ConcreteDataType::Timestamp(_) + | ConcreteDataType::Time(_) + | ConcreteDataType::Duration(_) + | ConcreteDataType::Interval(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) + | ConcreteDataType::Struct(_) + | ConcreteDataType::Json(_) + | ConcreteDataType::Null(_) + | ConcreteDataType::Vector(_) => { + debug!("Unsupported data type {datatype}"); + return false; + } + } + + true + } + _ => false, + } + } + + /// Check if a single atomic constraint can be satisfied + fn atomic_constraint_satisfied( + query_atomic: &AtomicExpr, + partition_atomic: &AtomicExpr, + ) -> bool { + let mut query_index = 0; + let mut partition_index = 0; + + while query_index < query_atomic.nucleons.len() + && partition_index < partition_atomic.nucleons.len() + { + let query_col = query_atomic.nucleons[query_index].column(); + let partition_col = partition_atomic.nucleons[partition_index].column(); + + match query_col.cmp(partition_col) { + Ordering::Equal => { + let mut query_index_for_next_col = query_index; + let mut partition_index_for_next_col = partition_index; + + while query_index_for_next_col < query_atomic.nucleons.len() + && query_atomic.nucleons[query_index_for_next_col].column() == query_col + { + query_index_for_next_col += 1; + } + while partition_index_for_next_col < partition_atomic.nucleons.len() + && partition_atomic.nucleons[partition_index_for_next_col].column() + == partition_col + { + partition_index_for_next_col += 1; + } + + let query_range = Self::nucleons_to_range( + &query_atomic.nucleons[query_index..query_index_for_next_col], + ); + let partition_range = Self::nucleons_to_range( + &partition_atomic.nucleons[partition_index..partition_index_for_next_col], + ); + + debug!("Comparing two ranges, {query_range:?} and {partition_range:?}"); + + query_index = query_index_for_next_col; + partition_index = partition_index_for_next_col; + + if !query_range.overlaps_with(&partition_range) { + return false; + } + } + Ordering::Less => { + // Query column comes before partition column - skip query column + while query_index < query_atomic.nucleons.len() + && query_atomic.nucleons[query_index].column() == query_col + { + query_index += 1; + } + } + Ordering::Greater => { + // Partition column comes before query column - skip partition column + while partition_index < partition_atomic.nucleons.len() + && partition_atomic.nucleons[partition_index].column() == partition_col + { + partition_index += 1; + } + } + } + } + + true + } + + /// Convert a slice of nucleons (all for the same column) into a ValueRange + fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange { + let mut range = ValueRange::new(); + + for nucleon in nucleons { + let value = nucleon.value(); + match nucleon.op() { + Eq => { + range.lower = Bound::Included(value); + range.upper = Bound::Included(value); + break; // exact value, most restrictive + } + Lt => { + // upper < value + range.update_upper(Bound::Excluded(value)); + } + LtEq => { + range.update_upper(Bound::Included(value)); + } + Gt => { + range.update_lower(Bound::Excluded(value)); + } + GtEq => { + range.update_lower(Bound::Included(value)); + } + NotEq => { + // handled as two separate atomic exprs elsewhere + continue; + } + } + } + + range + } +} + +/// Represents a value range derived from a group of nucleons for the same column +#[derive(Debug, Clone)] +struct ValueRange { + // lower and upper bounds using standard library Bound semantics + lower: Bound, + upper: Bound, +} + +impl ValueRange { + fn new() -> Self { + Self { + lower: Bound::Unbounded, + upper: Bound::Unbounded, + } + } + + // Update lower bound choosing the more restrictive one + fn update_lower(&mut self, new_lower: Bound) { + match (&self.lower, &new_lower) { + (Bound::Unbounded, _) => self.lower = new_lower, + (_, Bound::Unbounded) => { /* keep existing */ } + (Bound::Included(cur), Bound::Included(new)) + | (Bound::Excluded(cur), Bound::Included(new)) + | (Bound::Included(cur), Bound::Excluded(new)) + | (Bound::Excluded(cur), Bound::Excluded(new)) => { + if new > cur { + self.lower = new_lower; + } else if new == cur { + // prefer Excluded over Included for the same value (more restrictive) + if matches!(new_lower, Bound::Excluded(_)) + && matches!(self.lower, Bound::Included(_)) + { + self.lower = new_lower; + } + } + } + } + } + + // Update upper bound choosing the more restrictive one + fn update_upper(&mut self, new_upper: Bound) { + match (&self.upper, &new_upper) { + (Bound::Unbounded, _) => self.upper = new_upper, + (_, Bound::Unbounded) => { /* keep existing */ } + (Bound::Included(cur), Bound::Included(new)) + | (Bound::Excluded(cur), Bound::Included(new)) + | (Bound::Included(cur), Bound::Excluded(new)) + | (Bound::Excluded(cur), Bound::Excluded(new)) => { + if new < cur { + self.upper = new_upper; + } else if new == cur { + // prefer Excluded over Included for the same value (more restrictive) + if matches!(new_upper, Bound::Excluded(_)) + && matches!(self.upper, Bound::Included(_)) + { + self.upper = new_upper; + } + } + } + } + } + + /// Check if this range overlaps with another range + fn overlaps_with(&self, other: &ValueRange) -> bool { + fn no_overlap(upper: &Bound, lower: &Bound) -> bool { + match (upper, lower) { + (Bound::Unbounded, _) | (_, Bound::Unbounded) => false, + // u], [l + (Bound::Included(u), Bound::Included(l)) => u < l, + // u], (l + (Bound::Included(u), Bound::Excluded(l)) + // u), [l + | (Bound::Excluded(u), Bound::Included(l)) + // u), (l + | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l, + } + } + + if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) { + return false; + } + true + } +} + +#[cfg(test)] +mod tests { + use datatypes::value::Value; + use partition::expr::{col, Operand, PartitionExpr, RestrictedOp}; + use store_api::storage::RegionId; + + use super::*; + + fn create_test_partition_info(region_id: u64, expr: Option) -> PartitionInfo { + PartitionInfo { + id: RegionId::new(1, region_id as u32), + partition_expr: expr, + } + } + + #[test] + fn test_constraint_pruning_equality() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + // Region 3: user_id >= 200 AND user_id < 300 + create_test_partition_info( + 3, + Some( + col("user_id") + .gt_eq(Value::Int64(200)) + .and(col("user_id").lt(Value::Int64(300))), + ), + ), + ]; + + // Query: user_id = 150 (should only match Region 2) + let query_exprs = vec![col("user_id").eq(Value::Int64(150))]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // Should include Region 2, and potentially others due to conservative approach + assert!(pruned.contains(&RegionId::new(1, 2))); + } + + #[test] + fn test_constraint_pruning_in_list() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + // Region 3: user_id >= 200 AND user_id < 300 + create_test_partition_info( + 3, + Some( + col("user_id") + .gt_eq(Value::Int64(200)) + .and(col("user_id").lt(Value::Int64(300))), + ), + ), + ]; + + // Query: user_id IN (50, 150, 250) - should match all regions + let query_exprs = vec![PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Expr(col("user_id").eq(Value::Int64(50))), + RestrictedOp::Or, + Operand::Expr(col("user_id").eq(Value::Int64(150))), + )), + RestrictedOp::Or, + Operand::Expr(col("user_id").eq(Value::Int64(250))), + )]; + + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // Should include regions that can satisfy any of the values + assert!(!pruned.is_empty()); + } + + #[test] + fn test_constraint_pruning_range() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + // Region 3: user_id >= 200 AND user_id < 300 + create_test_partition_info( + 3, + Some( + col("user_id") + .gt_eq(Value::Int64(200)) + .and(col("user_id").lt(Value::Int64(300))), + ), + ), + ]; + + // Query: user_id >= 150 (should include regions that can satisfy this constraint) + let query_exprs = vec![col("user_id").gt_eq(Value::Int64(150))]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // With constraint-based approach: + // Region 1: [0, 100) - user_id >= 150 is not satisfiable + // Region 2: [100, 200) - user_id >= 150 is satisfiable in range [150, 200) + // Region 3: [200, 300) - user_id >= 150 is satisfiable (all values >= 200 satisfy user_id >= 150) + // Conservative approach may include more regions, but should at least include regions 2 and 3 + assert!(pruned.len() >= 2); + assert!(pruned.contains(&RegionId::new(1, 2))); // Region 2 should be included + assert!(pruned.contains(&RegionId::new(1, 3))); // Region 3 should be included + } + + #[test] + fn test_prune_regions_no_constraints() { + let partitions = vec![ + create_test_partition_info(1, None), + create_test_partition_info(2, None), + ]; + + let constraints = vec![]; + let column_datatypes = HashMap::default(); + let pruned = + ConstraintPruner::prune_regions(&constraints, &partitions, column_datatypes).unwrap(); + + // No constraints should return all regions + assert_eq!(pruned.len(), 2); + } + + #[test] + fn test_prune_regions_with_simple_equality() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + // Region 3: user_id >= 200 AND user_id < 300 + create_test_partition_info( + 3, + Some( + col("user_id") + .gt_eq(Value::Int64(200)) + .and(col("user_id").lt(Value::Int64(300))), + ), + ), + ]; + + // Query: user_id = 150 (should only match Region 2 which contains values [100, 200)) + let query_exprs = vec![col("user_id").eq(Value::Int64(150))]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // user_id = 150 should match Region 2 ([100, 200)) and potentially others due to conservative approach + assert!(pruned.contains(&RegionId::new(1, 2))); + } + + #[test] + fn test_prune_regions_with_or_constraint() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + // Region 3: user_id >= 200 AND user_id < 300 + create_test_partition_info( + 3, + Some( + col("user_id") + .gt_eq(Value::Int64(200)) + .and(col("user_id").lt(Value::Int64(300))), + ), + ), + ]; + + // Query: user_id = 50 OR user_id = 150 OR user_id = 250 - should match all 3 regions + let expr1 = col("user_id").eq(Value::Int64(50)); + let expr2 = col("user_id").eq(Value::Int64(150)); + let expr3 = col("user_id").eq(Value::Int64(250)); + + let or_expr = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Expr(expr1), + RestrictedOp::Or, + Operand::Expr(expr2), + )), + RestrictedOp::Or, + Operand::Expr(expr3), + ); + + let query_exprs = vec![or_expr]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // Should match all 3 regions: 50 matches Region 1, 150 matches Region 2, 250 matches Region 3 + assert_eq!(pruned.len(), 3); + assert!(pruned.contains(&RegionId::new(1, 1))); + assert!(pruned.contains(&RegionId::new(1, 2))); + assert!(pruned.contains(&RegionId::new(1, 3))); + } + + #[test] + fn test_constraint_pruning_no_match() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + ]; + + // Query: user_id = 300 (should match no regions) + let query_exprs = vec![col("user_id").eq(Value::Int64(300))]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // Should match no regions since 300 is outside both partition ranges + assert_eq!(pruned.len(), 0); + } + + #[test] + fn test_constraint_pruning_partial_match() { + let partitions = vec![ + // Region 1: user_id >= 0 AND user_id < 100 + create_test_partition_info( + 1, + Some( + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + ), + ), + // Region 2: user_id >= 100 AND user_id < 200 + create_test_partition_info( + 2, + Some( + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ), + ), + ]; + + // Query: user_id >= 50 (should match both regions partially) + let query_exprs = vec![col("user_id").gt_eq(Value::Int64(50))]; + let mut column_datatypes = HashMap::default(); + column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype()); + let pruned = + ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap(); + + // Region 1: [0,100) intersects with [50,∞) -> includes [50,100) + // Region 2: [100,200) is fully contained in [50,∞) + assert_eq!(pruned.len(), 2); + assert!(pruned.contains(&RegionId::new(1, 1))); + assert!(pruned.contains(&RegionId::new(1, 2))); + } +} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 1efc4bf470..d90151207e 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -337,6 +337,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert value for region pruning"))] + ConvertValue { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -360,7 +367,8 @@ impl ErrorExt for Error { | ColumnSchemaIncompatible { .. } | UnsupportedVariable { .. } | ColumnSchemaNoDefault { .. } - | CteColumnSchemaMismatch { .. } => StatusCode::InvalidArguments, + | CteColumnSchemaMismatch { .. } + | ConvertValue { .. } => StatusCode::InvalidArguments, BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 232ed2abf4..ef3890307b 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -3364,11 +3364,12 @@ mod test { fn build_query_engine_state() -> QueryEngineState { QueryEngineState::new( new_memory_catalog_manager().unwrap(), - None, // region_query_handler - None, // table_mutation_handler - None, // procedure_service_handler - None, // flow_service_handler - false, // with_dist_planner + None, + None, + None, + None, + None, + false, Plugins::default(), QueryOptions::default(), ) diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index e413854b75..db9ab7140a 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -31,6 +31,7 @@ use common_query::Output; use datafusion_expr::{AggregateUDF, LogicalPlan}; use datatypes::schema::Schema; pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer}; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use table::TableRef; @@ -110,6 +111,7 @@ impl QueryEngineFactory { ) -> Self { Self::new_with_plugins( catalog_manager, + None, region_query_handler, table_mutation_handler, procedure_service_handler, @@ -123,6 +125,7 @@ impl QueryEngineFactory { #[allow(clippy::too_many_arguments)] pub fn new_with_plugins( catalog_manager: CatalogManagerRef, + partition_rule_manager: Option, region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, @@ -133,6 +136,7 @@ impl QueryEngineFactory { ) -> Self { let state = Arc::new(QueryEngineState::new( catalog_manager, + partition_rule_manager, region_query_handler, table_mutation_handler, procedure_service_handler, diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index df20a70a42..c967fc38bd 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -84,6 +84,7 @@ impl QueryEngineContext { None, None, None, + None, false, Plugins::default(), QueryOptions::default(), diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index c1a6730473..e7218b50af 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -38,6 +38,7 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan}; use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; +use partition::manager::PartitionRuleManagerRef; use promql::extension_plan::PromExtensionPlanner; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; @@ -87,6 +88,7 @@ impl QueryEngineState { #[allow(clippy::too_many_arguments)] pub fn new( catalog_list: CatalogManagerRef, + partition_rule_manager: Option, region_query_handler: Option, table_mutation_handler: Option, procedure_service_handler: Option, @@ -171,6 +173,7 @@ impl QueryEngineState { .with_serializer_registry(Arc::new(DefaultSerializer)) .with_query_planner(Arc::new(DfQueryPlanner::new( catalog_list.clone(), + partition_rule_manager, region_query_handler, ))) .with_optimizer_rules(optimizer.rules) @@ -354,13 +357,17 @@ impl QueryPlanner for DfQueryPlanner { impl DfQueryPlanner { fn new( catalog_manager: CatalogManagerRef, + partition_rule_manager: Option, region_query_handler: Option, ) -> Self { let mut planners: Vec> = vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)]; - if let Some(region_query_handler) = region_query_handler { + if let (Some(region_query_handler), Some(partition_rule_manager)) = + (region_query_handler, partition_rule_manager) + { planners.push(Arc::new(DistExtensionPlanner::new( catalog_manager, + partition_rule_manager, region_query_handler, ))); planners.push(Arc::new(MergeSortExtensionPlanner {})); diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 07bac1363a..ede5285c26 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -137,6 +137,7 @@ async fn test_query_validate() -> Result<()> { None, None, None, + None, false, plugins, QueryOptionsNew::default(), diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index 9fa432aeaf..236da83a59 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -226,6 +226,215 @@ PARTITION ON COLUMNS (a, b) ( Affected Rows: 0 +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 2_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 3_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 4_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a < 10; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 AND b= 'z'; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR b= 'z'; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR ts > 1; + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR (ts > 1 AND b ='h'); + ++-+-+-+ +| stage | node | plan_| ++-+-+-+ +| 0_| 0_|_CooperativeExec REDACTED +|_|_|_MergeScanExec: REDACTED +|_|_|_| +| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED +|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED +|_|_|_CooperativeExec REDACTED +|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED +|_|_|_| +|_|_| Total rows: 0_| ++-+-+-+ + DROP TABLE valid_rule; Affected Rows: 0 diff --git a/tests/cases/standalone/common/partition.sql b/tests/cases/standalone/common/partition.sql index dc305e99dd..89c4258be9 100644 --- a/tests/cases/standalone/common/partition.sql +++ b/tests/cases/standalone/common/partition.sql @@ -121,6 +121,65 @@ PARTITION ON COLUMNS (a, b) ( a > 10, ); +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule; + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a < 10; + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 AND b= 'z'; + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR b= 'z'; + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR ts > 1; + +-- SQLNESS REPLACE (metrics.*) REDACTED +-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (Hash.*) REDACTED +-- SQLNESS REPLACE (-+) - +-- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED +-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED +EXPLAIN ANALYZE SELECT * FROM valid_rule +WHERE a = 10 OR (ts > 1 AND b ='h'); + DROP TABLE valid_rule; -- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247