feat: region prune part 2 (#6752)

* skeleton

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* get rule set

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust style

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust params

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reuse collider

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* canonize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more robust predicate extractor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify predicate extractor's test and impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unify import

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplification, remove unnecessary interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle partial referenced exprs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* finalize predicate extractor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* document region pruner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: reduce diff

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify checker

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine overlapping check method

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reduce diff

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* coerce types

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply review comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor use Bound

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify hashmap

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* redact region id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test: update sqlness result after udpate datafusion

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Ruihang Xia
2025-08-20 11:47:38 -07:00
committed by GitHub
parent 2995eddca5
commit 474a689309
16 changed files with 1203 additions and 22 deletions

View File

@@ -384,6 +384,7 @@ impl DatanodeBuilder {
None,
None,
None,
None,
false,
self.plugins.clone(),
opts.query.clone(),

View File

@@ -346,6 +346,7 @@ impl FlownodeBuilder {
None,
None,
None,
None,
false,
Default::default(),
self.opts.query.clone(),

View File

@@ -145,7 +145,7 @@ impl FrontendBuilder {
));
let requester = Arc::new(Requester::new(
self.catalog_manager.clone(),
partition_manager,
partition_manager.clone(),
node_manager.clone(),
));
let table_mutation_handler = Arc::new(TableMutationOperator::new(
@@ -165,6 +165,7 @@ impl FrontendBuilder {
let query_engine = QueryEngineFactory::new_with_plugins(
self.catalog_manager.clone(),
Some(partition_manager.clone()),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
Some(procedure_service_handler),

View File

@@ -46,12 +46,12 @@ pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
/// Represents an "atomic" Expression, which isn't composed (OR-ed) of other expressions.
#[allow(unused)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct AtomicExpr {
pub struct AtomicExpr {
/// A (ordered) list of simplified expressions. They are [`RestrictedOp::And`]'ed together.
pub(crate) nucleons: Vec<NucleonExpr>,
pub nucleons: Vec<NucleonExpr>,
/// Index to reference the [`PartitionExpr`] that this [`AtomicExpr`] is derived from.
/// This index is used with `exprs` field in [`MultiDimPartitionRule`](crate::multi_dim::MultiDimPartitionRule).
pub(crate) source_expr_index: usize,
pub source_expr_index: usize,
}
impl AtomicExpr {
@@ -78,7 +78,7 @@ impl PartialOrd for AtomicExpr {
///
/// This struct is used to compose [`AtomicExpr`], hence "nucleon".
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct NucleonExpr {
pub struct NucleonExpr {
column: String,
op: GluonOp,
/// Normalized [`Value`].
@@ -93,6 +93,29 @@ impl NucleonExpr {
lit(*self.value.as_ref()),
))
}
/// Get the column name
pub fn column(&self) -> &str {
&self.column
}
/// Get the normalized value
pub fn value(&self) -> OrderedF64 {
self.value
}
/// Get the operation
pub fn op(&self) -> &GluonOp {
&self.op
}
pub fn new(column: impl Into<String>, op: GluonOp, value: OrderedF64) -> Self {
Self {
column: column.into(),
op,
value,
}
}
}
/// Further restricted operation set.
@@ -100,7 +123,7 @@ impl NucleonExpr {
/// Conjunction operations are removed from [`RestrictedOp`].
/// This enumeration is used to bind elements in [`NucleonExpr`], hence "gluon".
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum GluonOp {
pub enum GluonOp {
Eq,
NotEq,
Lt,
@@ -129,11 +152,11 @@ impl GluonOp {
pub struct Collider<'a> {
source_exprs: &'a [PartitionExpr],
pub(crate) atomic_exprs: Vec<AtomicExpr>,
pub atomic_exprs: Vec<AtomicExpr>,
/// A map of column name to a list of `(value, normalized value)` pairs.
///
/// The normalized value is used for comparison. The normalization process keeps the order of the values.
pub(crate) normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
pub normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
}
impl<'a> Collider<'a> {

View File

@@ -38,9 +38,9 @@ use crate::partition::PartitionBound;
/// [Expr]: sqlparser::ast::Expr
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PartitionExpr {
pub(crate) lhs: Box<Operand>,
pub(crate) op: RestrictedOp,
pub(crate) rhs: Box<Operand>,
pub lhs: Box<Operand>,
pub op: RestrictedOp,
pub rhs: Box<Operand>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
@@ -252,14 +252,17 @@ impl PartitionExpr {
Ok(expr)
}
/// Get the left-hand side operand
pub fn lhs(&self) -> &Operand {
&self.lhs
}
/// Get the right-hand side operand
pub fn rhs(&self) -> &Operand {
&self.rhs
}
/// Get the operation
pub fn op(&self) -> &RestrictedOp {
&self.op
}

View File

@@ -17,9 +17,11 @@ mod commutativity;
mod merge_scan;
mod merge_sort;
mod planner;
#[allow(dead_code)]
mod predicate_extractor;
mod region_pruner;
pub use analyzer::{DistPlannerAnalyzer, DistPlannerOptions};
pub use merge_scan::{MergeScanExec, MergeScanLogicalPlan};
pub use planner::{DistExtensionPlanner, MergeSortExtensionPlanner};
pub use predicate_extractor::PredicateExtractor;
pub use region_pruner::ConstraintPruner;

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use ahash::HashMap;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -27,6 +28,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -36,6 +38,8 @@ use table::table_name::TableName;
use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
use crate::dist_plan::merge_sort::MergeSortLogicalPlan;
use crate::dist_plan::region_pruner::ConstraintPruner;
use crate::dist_plan::PredicateExtractor;
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
use crate::region_query::RegionQueryHandlerRef;
@@ -101,16 +105,19 @@ impl ExtensionPlanner for MergeSortExtensionPlanner {
pub struct DistExtensionPlanner {
catalog_manager: CatalogManagerRef,
partition_rule_manager: PartitionRuleManagerRef,
region_query_handler: RegionQueryHandlerRef,
}
impl DistExtensionPlanner {
pub fn new(
catalog_manager: CatalogManagerRef,
partition_rule_manager: PartitionRuleManagerRef,
region_query_handler: RegionQueryHandlerRef,
) -> Self {
Self {
catalog_manager,
partition_rule_manager,
region_query_handler,
}
}
@@ -150,7 +157,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
return fallback(optimized_plan).await;
};
let Ok(regions) = self.get_regions(&table_name).await else {
let Ok(regions) = self.get_regions(&table_name, input_plan).await else {
// no peers found, going to execute them locally
return fallback(optimized_plan).await;
};
@@ -184,7 +191,11 @@ impl DistExtensionPlanner {
Ok(extractor.table_name)
}
async fn get_regions(&self, table_name: &TableName) -> Result<Vec<RegionId>> {
async fn get_regions(
&self,
table_name: &TableName,
logical_plan: &LogicalPlan,
) -> Result<Vec<RegionId>> {
let table = self
.catalog_manager
.table(
@@ -198,7 +209,101 @@ impl DistExtensionPlanner {
.with_context(|| TableNotFoundSnafu {
table: table_name.to_string(),
})?;
Ok(table.table_info().region_ids())
let table_info = table.table_info();
let all_regions = table_info.region_ids();
// Extract partition columns
let partition_columns: Vec<String> = table_info
.meta
.partition_column_names()
.map(|s| s.to_string())
.collect();
if partition_columns.is_empty() {
return Ok(all_regions);
}
let partition_column_types = partition_columns
.iter()
.map(|col_name| {
let data_type = table_info
.meta
.schema
.column_schema_by_name(col_name)
// Safety: names are retrieved above from the same table
.unwrap()
.data_type
.clone();
(col_name.clone(), data_type)
})
.collect::<HashMap<_, _>>();
// Extract predicates from logical plan
let partition_expressions = match PredicateExtractor::extract_partition_expressions(
logical_plan,
&partition_columns,
) {
Ok(expressions) => expressions,
Err(err) => {
common_telemetry::debug!(
"Failed to extract partition expressions for table {} (id: {}), using all regions: {:?}",
table_name,
table.table_info().table_id(),
err
);
return Ok(all_regions);
}
};
if partition_expressions.is_empty() {
return Ok(all_regions);
}
// Get partition information for the table if partition rule manager is available
let partitions = match self
.partition_rule_manager
.find_table_partitions(table.table_info().table_id())
.await
{
Ok(partitions) => partitions,
Err(err) => {
common_telemetry::debug!(
"Failed to get partition information for table {}, using all regions: {:?}",
table_name,
err
);
return Ok(all_regions);
}
};
if partitions.is_empty() {
return Ok(all_regions);
}
// Apply region pruning based on partition rules
let pruned_regions = match ConstraintPruner::prune_regions(
&partition_expressions,
&partitions,
partition_column_types,
) {
Ok(regions) => regions,
Err(err) => {
common_telemetry::debug!(
"Failed to prune regions for table {}, using all regions: {:?}",
table_name,
err
);
return Ok(all_regions);
}
};
common_telemetry::debug!(
"Region pruning for table {}: {} partition expressions applied, pruned from {} to {} regions",
table_name,
partition_expressions.len(),
all_regions.len(),
pruned_regions.len()
);
Ok(pruned_regions)
}
/// Input logical plan is analyzed. Thus only call logical optimizer to optimize it.

View File

@@ -0,0 +1,755 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! [`ConstraintPruner`] prunes partition info based on given expressions.
use std::cmp::Ordering;
use std::ops::Bound;
use ahash::{HashMap, HashSet};
use common_telemetry::debug;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{OrderedF64, OrderedFloat, Value};
use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
use partition::expr::{Operand, PartitionExpr};
use partition::manager::PartitionInfo;
use store_api::storage::RegionId;
use GluonOp::*;
use crate::error::Result;
pub struct ConstraintPruner;
impl ConstraintPruner {
/// Prune regions using constraint satisfaction approach
///
/// Takes query expressions and partition info, returns matching region IDs
pub fn prune_regions(
query_expressions: &[PartitionExpr],
partitions: &[PartitionInfo],
column_datatypes: HashMap<String, ConcreteDataType>,
) -> Result<Vec<RegionId>> {
let start = std::time::Instant::now();
if query_expressions.is_empty() || partitions.is_empty() {
// No constraints, return all regions
return Ok(partitions.iter().map(|p| p.id).collect());
}
// Collect all partition expressions for unified normalization
let mut expression_to_partition = Vec::with_capacity(partitions.len());
let mut all_partition_expressions = Vec::with_capacity(partitions.len());
for partition in partitions {
if let Some(expr) = &partition.partition_expr {
expression_to_partition.push(partition.id);
all_partition_expressions.push(expr.clone());
}
}
if all_partition_expressions.is_empty() {
return Ok(partitions.iter().map(|p| p.id).collect());
}
// Create unified collider with both query and partition expressions for consistent normalization
let mut all_expressions = query_expressions.to_vec();
all_expressions.extend(all_partition_expressions.iter().cloned());
if !Self::normalize_datatype(&mut all_expressions, &column_datatypes) {
return Ok(partitions.iter().map(|p| p.id).collect());
}
let collider = match Collider::new(&all_expressions) {
Ok(collider) => collider,
Err(err) => {
debug!(
"Failed to create unified collider: {}, returning all regions conservatively",
err
);
return Ok(partitions.iter().map(|p| p.id).collect());
}
};
// Extract query atomic expressions (first N expressions in the collider)
let query_atomics: Vec<&AtomicExpr> = collider
.atomic_exprs
.iter()
.filter(|atomic| atomic.source_expr_index < query_expressions.len())
.collect();
let mut candidate_regions = HashSet::default();
for region_atomics in collider
.atomic_exprs
.iter()
.filter(|atomic| atomic.source_expr_index >= query_expressions.len())
{
if Self::atomic_sets_overlap(&query_atomics, region_atomics) {
let partition_expr_index =
region_atomics.source_expr_index - query_expressions.len();
candidate_regions.insert(expression_to_partition[partition_expr_index]);
}
}
debug!(
"Constraint pruning (cost {}ms): {} -> {} regions",
start.elapsed().as_millis(),
partitions.len(),
candidate_regions.len()
);
Ok(candidate_regions.into_iter().collect())
}
fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool {
for query_atomic in query_atomics {
if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) {
return true;
}
}
false
}
fn normalize_datatype(
all_expressions: &mut Vec<PartitionExpr>,
column_datatypes: &HashMap<String, ConcreteDataType>,
) -> bool {
for expr in all_expressions {
if !Self::normalize_expr_datatype(&mut expr.lhs, &mut expr.rhs, column_datatypes) {
return false;
}
}
true
}
fn normalize_expr_datatype(
lhs: &mut Operand,
rhs: &mut Operand,
column_datatypes: &HashMap<String, ConcreteDataType>,
) -> bool {
match (lhs, rhs) {
(Operand::Expr(lhs_expr), Operand::Expr(rhs_expr)) => {
Self::normalize_expr_datatype(
&mut lhs_expr.lhs,
&mut lhs_expr.rhs,
column_datatypes,
) && Self::normalize_expr_datatype(
&mut rhs_expr.lhs,
&mut rhs_expr.rhs,
column_datatypes,
)
}
(Operand::Column(col_name), Operand::Value(val))
| (Operand::Value(val), Operand::Column(col_name)) => {
let Some(datatype) = column_datatypes.get(col_name) else {
debug!("Column {} not found from type set, skip pruning", col_name);
return false;
};
match datatype {
ConcreteDataType::Int8(_)
| ConcreteDataType::Int16(_)
| ConcreteDataType::Int32(_)
| ConcreteDataType::Int64(_) => {
let Some(new_lit) = val.as_i64() else {
debug!("Value {:?} cannot be converted to i64", val);
return false;
};
*val = Value::Int64(new_lit);
}
ConcreteDataType::UInt8(_)
| ConcreteDataType::UInt16(_)
| ConcreteDataType::UInt32(_)
| ConcreteDataType::UInt64(_) => {
let Some(new_lit) = val.as_u64() else {
debug!("Value {:?} cannot be converted to u64", val);
return false;
};
*val = Value::UInt64(new_lit);
}
ConcreteDataType::Float32(_) | ConcreteDataType::Float64(_) => {
let Some(new_lit) = val.as_f64_lossy() else {
debug!("Value {:?} cannot be converted to f64", val);
return false;
};
*val = Value::Float64(OrderedFloat(new_lit));
}
ConcreteDataType::String(_) | ConcreteDataType::Boolean(_) => {
// no operation needed
}
ConcreteDataType::Decimal128(_)
| ConcreteDataType::Binary(_)
| ConcreteDataType::Date(_)
| ConcreteDataType::Timestamp(_)
| ConcreteDataType::Time(_)
| ConcreteDataType::Duration(_)
| ConcreteDataType::Interval(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_)
| ConcreteDataType::Struct(_)
| ConcreteDataType::Json(_)
| ConcreteDataType::Null(_)
| ConcreteDataType::Vector(_) => {
debug!("Unsupported data type {datatype}");
return false;
}
}
true
}
_ => false,
}
}
/// Check if a single atomic constraint can be satisfied
fn atomic_constraint_satisfied(
query_atomic: &AtomicExpr,
partition_atomic: &AtomicExpr,
) -> bool {
let mut query_index = 0;
let mut partition_index = 0;
while query_index < query_atomic.nucleons.len()
&& partition_index < partition_atomic.nucleons.len()
{
let query_col = query_atomic.nucleons[query_index].column();
let partition_col = partition_atomic.nucleons[partition_index].column();
match query_col.cmp(partition_col) {
Ordering::Equal => {
let mut query_index_for_next_col = query_index;
let mut partition_index_for_next_col = partition_index;
while query_index_for_next_col < query_atomic.nucleons.len()
&& query_atomic.nucleons[query_index_for_next_col].column() == query_col
{
query_index_for_next_col += 1;
}
while partition_index_for_next_col < partition_atomic.nucleons.len()
&& partition_atomic.nucleons[partition_index_for_next_col].column()
== partition_col
{
partition_index_for_next_col += 1;
}
let query_range = Self::nucleons_to_range(
&query_atomic.nucleons[query_index..query_index_for_next_col],
);
let partition_range = Self::nucleons_to_range(
&partition_atomic.nucleons[partition_index..partition_index_for_next_col],
);
debug!("Comparing two ranges, {query_range:?} and {partition_range:?}");
query_index = query_index_for_next_col;
partition_index = partition_index_for_next_col;
if !query_range.overlaps_with(&partition_range) {
return false;
}
}
Ordering::Less => {
// Query column comes before partition column - skip query column
while query_index < query_atomic.nucleons.len()
&& query_atomic.nucleons[query_index].column() == query_col
{
query_index += 1;
}
}
Ordering::Greater => {
// Partition column comes before query column - skip partition column
while partition_index < partition_atomic.nucleons.len()
&& partition_atomic.nucleons[partition_index].column() == partition_col
{
partition_index += 1;
}
}
}
}
true
}
/// Convert a slice of nucleons (all for the same column) into a ValueRange
fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
let mut range = ValueRange::new();
for nucleon in nucleons {
let value = nucleon.value();
match nucleon.op() {
Eq => {
range.lower = Bound::Included(value);
range.upper = Bound::Included(value);
break; // exact value, most restrictive
}
Lt => {
// upper < value
range.update_upper(Bound::Excluded(value));
}
LtEq => {
range.update_upper(Bound::Included(value));
}
Gt => {
range.update_lower(Bound::Excluded(value));
}
GtEq => {
range.update_lower(Bound::Included(value));
}
NotEq => {
// handled as two separate atomic exprs elsewhere
continue;
}
}
}
range
}
}
/// Represents a value range derived from a group of nucleons for the same column
#[derive(Debug, Clone)]
struct ValueRange {
// lower and upper bounds using standard library Bound semantics
lower: Bound<OrderedF64>,
upper: Bound<OrderedF64>,
}
impl ValueRange {
fn new() -> Self {
Self {
lower: Bound::Unbounded,
upper: Bound::Unbounded,
}
}
// Update lower bound choosing the more restrictive one
fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
match (&self.lower, &new_lower) {
(Bound::Unbounded, _) => self.lower = new_lower,
(_, Bound::Unbounded) => { /* keep existing */ }
(Bound::Included(cur), Bound::Included(new))
| (Bound::Excluded(cur), Bound::Included(new))
| (Bound::Included(cur), Bound::Excluded(new))
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
if new > cur {
self.lower = new_lower;
} else if new == cur {
// prefer Excluded over Included for the same value (more restrictive)
if matches!(new_lower, Bound::Excluded(_))
&& matches!(self.lower, Bound::Included(_))
{
self.lower = new_lower;
}
}
}
}
}
// Update upper bound choosing the more restrictive one
fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
match (&self.upper, &new_upper) {
(Bound::Unbounded, _) => self.upper = new_upper,
(_, Bound::Unbounded) => { /* keep existing */ }
(Bound::Included(cur), Bound::Included(new))
| (Bound::Excluded(cur), Bound::Included(new))
| (Bound::Included(cur), Bound::Excluded(new))
| (Bound::Excluded(cur), Bound::Excluded(new)) => {
if new < cur {
self.upper = new_upper;
} else if new == cur {
// prefer Excluded over Included for the same value (more restrictive)
if matches!(new_upper, Bound::Excluded(_))
&& matches!(self.upper, Bound::Included(_))
{
self.upper = new_upper;
}
}
}
}
}
/// Check if this range overlaps with another range
fn overlaps_with(&self, other: &ValueRange) -> bool {
fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
match (upper, lower) {
(Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
// u], [l
(Bound::Included(u), Bound::Included(l)) => u < l,
// u], (l
(Bound::Included(u), Bound::Excluded(l))
// u), [l
| (Bound::Excluded(u), Bound::Included(l))
// u), (l
| (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
}
}
if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
return false;
}
true
}
}
#[cfg(test)]
mod tests {
use datatypes::value::Value;
use partition::expr::{col, Operand, PartitionExpr, RestrictedOp};
use store_api::storage::RegionId;
use super::*;
fn create_test_partition_info(region_id: u64, expr: Option<PartitionExpr>) -> PartitionInfo {
PartitionInfo {
id: RegionId::new(1, region_id as u32),
partition_expr: expr,
}
}
#[test]
fn test_constraint_pruning_equality() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
// Region 3: user_id >= 200 AND user_id < 300
create_test_partition_info(
3,
Some(
col("user_id")
.gt_eq(Value::Int64(200))
.and(col("user_id").lt(Value::Int64(300))),
),
),
];
// Query: user_id = 150 (should only match Region 2)
let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// Should include Region 2, and potentially others due to conservative approach
assert!(pruned.contains(&RegionId::new(1, 2)));
}
#[test]
fn test_constraint_pruning_in_list() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
// Region 3: user_id >= 200 AND user_id < 300
create_test_partition_info(
3,
Some(
col("user_id")
.gt_eq(Value::Int64(200))
.and(col("user_id").lt(Value::Int64(300))),
),
),
];
// Query: user_id IN (50, 150, 250) - should match all regions
let query_exprs = vec![PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Expr(col("user_id").eq(Value::Int64(50))),
RestrictedOp::Or,
Operand::Expr(col("user_id").eq(Value::Int64(150))),
)),
RestrictedOp::Or,
Operand::Expr(col("user_id").eq(Value::Int64(250))),
)];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// Should include regions that can satisfy any of the values
assert!(!pruned.is_empty());
}
#[test]
fn test_constraint_pruning_range() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
// Region 3: user_id >= 200 AND user_id < 300
create_test_partition_info(
3,
Some(
col("user_id")
.gt_eq(Value::Int64(200))
.and(col("user_id").lt(Value::Int64(300))),
),
),
];
// Query: user_id >= 150 (should include regions that can satisfy this constraint)
let query_exprs = vec![col("user_id").gt_eq(Value::Int64(150))];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// With constraint-based approach:
// Region 1: [0, 100) - user_id >= 150 is not satisfiable
// Region 2: [100, 200) - user_id >= 150 is satisfiable in range [150, 200)
// Region 3: [200, 300) - user_id >= 150 is satisfiable (all values >= 200 satisfy user_id >= 150)
// Conservative approach may include more regions, but should at least include regions 2 and 3
assert!(pruned.len() >= 2);
assert!(pruned.contains(&RegionId::new(1, 2))); // Region 2 should be included
assert!(pruned.contains(&RegionId::new(1, 3))); // Region 3 should be included
}
#[test]
fn test_prune_regions_no_constraints() {
let partitions = vec![
create_test_partition_info(1, None),
create_test_partition_info(2, None),
];
let constraints = vec![];
let column_datatypes = HashMap::default();
let pruned =
ConstraintPruner::prune_regions(&constraints, &partitions, column_datatypes).unwrap();
// No constraints should return all regions
assert_eq!(pruned.len(), 2);
}
#[test]
fn test_prune_regions_with_simple_equality() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
// Region 3: user_id >= 200 AND user_id < 300
create_test_partition_info(
3,
Some(
col("user_id")
.gt_eq(Value::Int64(200))
.and(col("user_id").lt(Value::Int64(300))),
),
),
];
// Query: user_id = 150 (should only match Region 2 which contains values [100, 200))
let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// user_id = 150 should match Region 2 ([100, 200)) and potentially others due to conservative approach
assert!(pruned.contains(&RegionId::new(1, 2)));
}
#[test]
fn test_prune_regions_with_or_constraint() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
// Region 3: user_id >= 200 AND user_id < 300
create_test_partition_info(
3,
Some(
col("user_id")
.gt_eq(Value::Int64(200))
.and(col("user_id").lt(Value::Int64(300))),
),
),
];
// Query: user_id = 50 OR user_id = 150 OR user_id = 250 - should match all 3 regions
let expr1 = col("user_id").eq(Value::Int64(50));
let expr2 = col("user_id").eq(Value::Int64(150));
let expr3 = col("user_id").eq(Value::Int64(250));
let or_expr = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Expr(expr1),
RestrictedOp::Or,
Operand::Expr(expr2),
)),
RestrictedOp::Or,
Operand::Expr(expr3),
);
let query_exprs = vec![or_expr];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// Should match all 3 regions: 50 matches Region 1, 150 matches Region 2, 250 matches Region 3
assert_eq!(pruned.len(), 3);
assert!(pruned.contains(&RegionId::new(1, 1)));
assert!(pruned.contains(&RegionId::new(1, 2)));
assert!(pruned.contains(&RegionId::new(1, 3)));
}
#[test]
fn test_constraint_pruning_no_match() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
];
// Query: user_id = 300 (should match no regions)
let query_exprs = vec![col("user_id").eq(Value::Int64(300))];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// Should match no regions since 300 is outside both partition ranges
assert_eq!(pruned.len(), 0);
}
#[test]
fn test_constraint_pruning_partial_match() {
let partitions = vec![
// Region 1: user_id >= 0 AND user_id < 100
create_test_partition_info(
1,
Some(
col("user_id")
.gt_eq(Value::Int64(0))
.and(col("user_id").lt(Value::Int64(100))),
),
),
// Region 2: user_id >= 100 AND user_id < 200
create_test_partition_info(
2,
Some(
col("user_id")
.gt_eq(Value::Int64(100))
.and(col("user_id").lt(Value::Int64(200))),
),
),
];
// Query: user_id >= 50 (should match both regions partially)
let query_exprs = vec![col("user_id").gt_eq(Value::Int64(50))];
let mut column_datatypes = HashMap::default();
column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
let pruned =
ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
// Region 1: [0,100) intersects with [50,∞) -> includes [50,100)
// Region 2: [100,200) is fully contained in [50,∞)
assert_eq!(pruned.len(), 2);
assert!(pruned.contains(&RegionId::new(1, 1)));
assert!(pruned.contains(&RegionId::new(1, 2)));
}
}

View File

@@ -337,6 +337,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert value for region pruning"))]
ConvertValue {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -360,7 +367,8 @@ impl ErrorExt for Error {
| ColumnSchemaIncompatible { .. }
| UnsupportedVariable { .. }
| ColumnSchemaNoDefault { .. }
| CteColumnSchemaMismatch { .. } => StatusCode::InvalidArguments,
| CteColumnSchemaMismatch { .. }
| ConvertValue { .. } => StatusCode::InvalidArguments,
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,

View File

@@ -3364,11 +3364,12 @@ mod test {
fn build_query_engine_state() -> QueryEngineState {
QueryEngineState::new(
new_memory_catalog_manager().unwrap(),
None, // region_query_handler
None, // table_mutation_handler
None, // procedure_service_handler
None, // flow_service_handler
false, // with_dist_planner
None,
None,
None,
None,
None,
false,
Plugins::default(),
QueryOptions::default(),
)

View File

@@ -31,6 +31,7 @@ use common_query::Output;
use datafusion_expr::{AggregateUDF, LogicalPlan};
use datatypes::schema::Schema;
pub use default_serializer::{DefaultPlanDecoder, DefaultSerializer};
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use table::TableRef;
@@ -110,6 +111,7 @@ impl QueryEngineFactory {
) -> Self {
Self::new_with_plugins(
catalog_manager,
None,
region_query_handler,
table_mutation_handler,
procedure_service_handler,
@@ -123,6 +125,7 @@ impl QueryEngineFactory {
#[allow(clippy::too_many_arguments)]
pub fn new_with_plugins(
catalog_manager: CatalogManagerRef,
partition_rule_manager: Option<PartitionRuleManagerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
@@ -133,6 +136,7 @@ impl QueryEngineFactory {
) -> Self {
let state = Arc::new(QueryEngineState::new(
catalog_manager,
partition_rule_manager,
region_query_handler,
table_mutation_handler,
procedure_service_handler,

View File

@@ -84,6 +84,7 @@ impl QueryEngineContext {
None,
None,
None,
None,
false,
Plugins::default(),
QueryOptions::default(),

View File

@@ -38,6 +38,7 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy
use datafusion_expr::{AggregateUDF, LogicalPlan as DfLogicalPlan};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManagerRef;
use promql::extension_plan::PromExtensionPlanner;
use table::table::adapter::DfTableProviderAdapter;
use table::TableRef;
@@ -87,6 +88,7 @@ impl QueryEngineState {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog_list: CatalogManagerRef,
partition_rule_manager: Option<PartitionRuleManagerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
@@ -171,6 +173,7 @@ impl QueryEngineState {
.with_serializer_registry(Arc::new(DefaultSerializer))
.with_query_planner(Arc::new(DfQueryPlanner::new(
catalog_list.clone(),
partition_rule_manager,
region_query_handler,
)))
.with_optimizer_rules(optimizer.rules)
@@ -354,13 +357,17 @@ impl QueryPlanner for DfQueryPlanner {
impl DfQueryPlanner {
fn new(
catalog_manager: CatalogManagerRef,
partition_rule_manager: Option<PartitionRuleManagerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
) -> Self {
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
if let Some(region_query_handler) = region_query_handler {
if let (Some(region_query_handler), Some(partition_rule_manager)) =
(region_query_handler, partition_rule_manager)
{
planners.push(Arc::new(DistExtensionPlanner::new(
catalog_manager,
partition_rule_manager,
region_query_handler,
)));
planners.push(Arc::new(MergeSortExtensionPlanner {}));

View File

@@ -137,6 +137,7 @@ async fn test_query_validate() -> Result<()> {
None,
None,
None,
None,
false,
plugins,
QueryOptionsNew::default(),

View File

@@ -226,6 +226,215 @@ PARTITION ON COLUMNS (a, b) (
Affected Rows: 0
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 2_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 3_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 4_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a < 10;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 AND b= 'z';
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR b= 'z';
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR b@1 = z REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR ts > 1;
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR (ts > 1 AND b ='h');
+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CooperativeExec REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 1_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 2_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 3_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
| 1_| 4_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: a@0 = 10 OR ts@2 > 1 AND b@1 = h REDACTED
|_|_|_CooperativeExec REDACTED
|_|_|_SeqScan: region=REDACTED, "partition_count":{"count":0, "mem_ranges":0, "files":0, "file_ranges":0} REDACTED
|_|_|_|
|_|_| Total rows: 0_|
+-+-+-+
DROP TABLE valid_rule;
Affected Rows: 0

View File

@@ -121,6 +121,65 @@ PARTITION ON COLUMNS (a, b) (
a > 10,
);
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a < 10;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 AND b= 'z';
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR b= 'z';
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR ts > 1;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT * FROM valid_rule
WHERE a = 10 OR (ts > 1 AND b ='h');
DROP TABLE valid_rule;
-- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247