From e46ce7c6daf3093d0ee0793f31e5356be4b733ca Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sat, 11 Oct 2025 14:17:25 +0800 Subject: [PATCH] feat: divide subtasks from old/new partition rules (#7003) * feat: divide subtasks from old/new partition rules Signed-off-by: Ruihang Xia * fix format Signed-off-by: Ruihang Xia * change copyright year Signed-off-by: Ruihang Xia * simplify filter Signed-off-by: Ruihang Xia * naming Signed-off-by: Ruihang Xia * Update src/partition/src/subtask.rs Co-authored-by: Zhenchi Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Zhenchi --- src/partition/src/lib.rs | 2 + src/partition/src/overlap.rs | 334 +++++++++++++++++++++++ src/partition/src/subtask.rs | 306 +++++++++++++++++++++ src/query/src/dist_plan/region_pruner.rs | 209 +------------- 4 files changed, 649 insertions(+), 202 deletions(-) create mode 100644 src/partition/src/overlap.rs create mode 100644 src/partition/src/subtask.rs diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 41aa55a785..9ce8786276 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -22,7 +22,9 @@ pub mod error; pub mod expr; pub mod manager; pub mod multi_dim; +pub mod overlap; pub mod partition; pub mod splitter; +pub mod subtask; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/overlap.rs b/src/partition/src/overlap.rs new file mode 100644 index 0000000000..5d9af3cabf --- /dev/null +++ b/src/partition/src/overlap.rs @@ -0,0 +1,334 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Rule overlap and association utilities. +//! +//! This module provides pure functions to determine overlap relationships between +//! partition expressions and to associate rule sets. + +use std::cmp::Ordering; +use std::ops::Bound; + +use datatypes::value::OrderedF64; + +use crate::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr}; +use crate::error::Result; +use crate::expr::PartitionExpr; + +/// Check if two atomic expressions can be both satisfied, i.e., whether they +/// overlap on all common columns. +pub fn atomic_exprs_overlap(lhs: &AtomicExpr, rhs: &AtomicExpr) -> bool { + // Merge-walk over columns since nucleons are sorted by (column, op, value) + let mut lhs_index = 0; + let mut rhs_index = 0; + + while lhs_index < lhs.nucleons.len() && rhs_index < rhs.nucleons.len() { + let lhs_col = lhs.nucleons[lhs_index].column(); + let rhs_col = rhs.nucleons[rhs_index].column(); + + match lhs_col.cmp(rhs_col) { + Ordering::Equal => { + // advance to the next column boundaries in both atomics + let mut lhs_next = lhs_index; + let mut rhs_next = rhs_index; + while lhs_next < lhs.nucleons.len() && lhs.nucleons[lhs_next].column() == lhs_col { + lhs_next += 1; + } + while rhs_next < rhs.nucleons.len() && rhs.nucleons[rhs_next].column() == rhs_col { + rhs_next += 1; + } + + let lhs_range = nucleons_to_range(&lhs.nucleons[lhs_index..lhs_next]); + let rhs_range = nucleons_to_range(&rhs.nucleons[rhs_index..rhs_next]); + + if !lhs_range.overlaps_with(&rhs_range) { + return false; + } + + lhs_index = lhs_next; + rhs_index = rhs_next; + } + Ordering::Less => { + // column appears only in `lhs`, skip all nucleons for this column + let col = lhs_col; + while lhs_index < lhs.nucleons.len() && lhs.nucleons[lhs_index].column() == col { + lhs_index += 1; + } + } + Ordering::Greater => { + // column appears only in `rhs`, skip all nucleons for this column + let col = rhs_col; + while rhs_index < rhs.nucleons.len() && rhs.nucleons[rhs_index].column() == col { + rhs_index += 1; + } + } + } + } + + true +} + +/// Pairwise overlap check between two expression lists. +/// +/// Returns true if two [`PartitionExpr`]s are overlapping (any pair of atomics overlaps). +fn expr_pair_overlap(lhs: &PartitionExpr, rhs: &PartitionExpr) -> Result { + let binding = [lhs.clone(), rhs.clone()]; + let collider = Collider::new(&binding)?; + // Split atomic exprs by source index + let mut lhs_atoms = Vec::new(); + let mut rhs_atoms = Vec::new(); + for atomic in collider.atomic_exprs.iter() { + if atomic.source_expr_index == 0 { + lhs_atoms.push(atomic); + } else { + rhs_atoms.push(atomic); + } + } + for lhs_atomic in &lhs_atoms { + for rhs_atomic in &rhs_atoms { + if atomic_exprs_overlap(lhs_atomic, rhs_atomic) { + return Ok(true); + } + } + } + Ok(false) +} + +/// Associates each expression in `from_exprs` with indices of overlapping expressions in `to_exprs`. +/// +/// Output vector length equals `from_exprs.len()`, and each inner vector contains indices into +/// `to_exprs` that overlap with the corresponding `from_exprs[i]`. +pub fn associate_from_to( + from_exprs: &[PartitionExpr], + to_exprs: &[PartitionExpr], +) -> Result>> { + let mut result = Vec::with_capacity(from_exprs.len()); + for from in from_exprs.iter() { + let mut targets = Vec::new(); + for (i, to) in to_exprs.iter().enumerate() { + if expr_pair_overlap(from, to)? { + targets.push(i); + } + } + result.push(targets); + } + Ok(result) +} + +/// Represents a value range derived from a group of nucleons for the same column +#[derive(Debug, Clone)] +struct ValueRange { + lower: Bound, + upper: Bound, +} + +impl ValueRange { + fn new() -> Self { + Self { + lower: Bound::Unbounded, + upper: Bound::Unbounded, + } + } + + fn update_lower(&mut self, new_lower: Bound) { + match (&self.lower, &new_lower) { + (Bound::Unbounded, _) => self.lower = new_lower, + (_, Bound::Unbounded) => {} + (Bound::Included(cur), Bound::Included(new)) + | (Bound::Excluded(cur), Bound::Included(new)) + | (Bound::Included(cur), Bound::Excluded(new)) + | (Bound::Excluded(cur), Bound::Excluded(new)) => { + if new > cur { + self.lower = new_lower; + } + } + } + } + + fn update_upper(&mut self, new_upper: Bound) { + match (&self.upper, &new_upper) { + (Bound::Unbounded, _) => self.upper = new_upper, + (_, Bound::Unbounded) => {} + (Bound::Included(cur), Bound::Included(new)) + | (Bound::Excluded(cur), Bound::Included(new)) + | (Bound::Included(cur), Bound::Excluded(new)) + | (Bound::Excluded(cur), Bound::Excluded(new)) => { + if new < cur { + self.upper = new_upper; + } + } + } + } + + fn overlaps_with(&self, other: &Self) -> bool { + fn no_overlap(upper: &Bound, lower: &Bound) -> bool { + match (upper, lower) { + (Bound::Unbounded, _) | (_, Bound::Unbounded) => false, + // u], [l + (Bound::Included(u), Bound::Included(l)) => u < l, + // u], (l) or u), [l or u), (l) + (Bound::Included(u), Bound::Excluded(l)) + | (Bound::Excluded(u), Bound::Included(l)) + | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l, + } + } + + if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) { + return false; + } + true + } +} + +/// Convert nucleons for the same column into a ValueRange +fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange { + use GluonOp::*; + + let mut range = ValueRange::new(); + for n in nucleons { + let v = n.value(); + match n.op() { + Eq => { + range.lower = Bound::Included(v); + range.upper = Bound::Included(v); + break; + } + Lt => range.update_upper(Bound::Excluded(v)), + LtEq => range.update_upper(Bound::Included(v)), + Gt => range.update_lower(Bound::Excluded(v)), + GtEq => range.update_lower(Bound::Included(v)), + NotEq => continue, // handled elsewhere as separate atomics + } + } + range +} + +#[cfg(test)] +mod tests { + use datatypes::value::Value; + + use super::*; + use crate::expr::{Operand, PartitionExpr, RestrictedOp, col}; + + #[test] + fn test_pair_overlap_simple() { + let a = col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))); + let b = col("user_id").eq(Value::Int64(50)); + assert!(expr_pair_overlap(&a, &b).unwrap()); + + let c = col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))); + assert!(!expr_pair_overlap(&a, &c).unwrap()); + } + + #[test] + fn test_associate_from_to() { + // from: [ [0,100), [100,200) ] + let from = vec![ + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(100))), + col("user_id") + .gt_eq(Value::Int64(100)) + .and(col("user_id").lt(Value::Int64(200))), + ]; + // to: [ [0,150), [150,300) ] + let to = vec![ + col("user_id") + .gt_eq(Value::Int64(0)) + .and(col("user_id").lt(Value::Int64(150))), + col("user_id") + .gt_eq(Value::Int64(150)) + .and(col("user_id").lt(Value::Int64(300))), + ]; + let assoc = associate_from_to(&from, &to).unwrap(); + assert_eq!(assoc.len(), 2); + // [0,100) overlaps only with [0,150) + assert_eq!(assoc[0], vec![0]); + // [100,200) overlaps both [0,150) and [150,300) + assert_eq!(assoc[1], vec![0, 1]); + } + + #[test] + fn test_expr_with_or() { + // a: (user_id = 10 OR user_id = 20) + let a = PartitionExpr::new( + Operand::Expr(col("user_id").eq(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr(col("user_id").eq(Value::Int64(20))), + ); + let b = col("user_id") + .gt_eq(Value::Int64(15)) + .and(col("user_id").lt_eq(Value::Int64(25))); + assert!(expr_pair_overlap(&a, &b).unwrap()); + } + + #[test] + fn test_adjacent_ranges_non_overlap() { + // [0, 100) vs [100, 200) -> no overlap + let from = vec![ + col("k") + .gt_eq(Value::Int64(0)) + .and(col("k").lt(Value::Int64(100))), + ]; + let to = vec![ + col("k") + .gt_eq(Value::Int64(100)) + .and(col("k").lt(Value::Int64(200))), + ]; + let assoc = associate_from_to(&from, &to).unwrap(); + assert_eq!(assoc[0], Vec::::new()); + } + + #[test] + fn test_multi_column_conflict_no_overlap() { + // Left: a in [0,10) AND b >= 5 + let left = col("a") + .gt_eq(Value::Int64(0)) + .and(col("a").lt(Value::Int64(10))) + .and(col("b").gt_eq(Value::Int64(5))); + // Right: a = 9 AND b < 5 -> conflict on b + let right = col("a") + .eq(Value::Int64(9)) + .and(col("b").lt(Value::Int64(5))); + assert!(!expr_pair_overlap(&left, &right).unwrap()); + } + + #[test] + fn test_disjoint_columns_overlap() { + // Different columns don't constrain each other => satisfiable together + let from = vec![col("a").eq(Value::Int64(1))]; + let to = vec![col("b").eq(Value::Int64(2))]; + let assoc = associate_from_to(&from, &to).unwrap(); + assert_eq!(assoc[0], vec![0]); + } + + #[test] + fn test_boundary_inclusive_exclusive() { + // Left: a <= 10 AND a >= 10 => a = 10 + let left = col("a") + .lt_eq(Value::Int64(10)) + .and(col("a").gt_eq(Value::Int64(10))); + // Right: a = 10 -> overlap + let right_eq = col("a").eq(Value::Int64(10)); + assert!(expr_pair_overlap(&left, &right_eq).unwrap()); + + // Left: a < 10, Right: a = 10 -> no overlap + let left_lt = col("a").lt(Value::Int64(10)); + assert!(!expr_pair_overlap(&left_lt, &right_eq).unwrap()); + } +} diff --git a/src/partition/src/subtask.rs b/src/partition/src/subtask.rs new file mode 100644 index 0000000000..e74e9872f5 --- /dev/null +++ b/src/partition/src/subtask.rs @@ -0,0 +1,306 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use crate::error::Result; +use crate::expr::PartitionExpr; +use crate::overlap::associate_from_to; + +/// Indices are into the original input arrays (array of [`PartitionExpr`]). A connected component. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RepartitionSubtask { + pub from_expr_indices: Vec, + pub to_expr_indices: Vec, + /// For each `from_expr_indices[k]`, the corresponding vector contains global + /// `to_expr_indices` that overlap with it (indices into the original `to_exprs`). + pub transition_map: Vec>, +} + +/// Create independent subtasks out of given FROM/TO partition expressions. +pub fn create_subtasks( + from_exprs: &[PartitionExpr], + to_exprs: &[PartitionExpr], +) -> Result> { + // FROM -> TO + let assoc = associate_from_to(from_exprs, to_exprs)?; + if !assoc.iter().any(|v| !v.is_empty()) { + return Ok(vec![]); + } + + // TO -> FROM + let mut rev = vec![Vec::new(); to_exprs.len()]; + for (li, rights) in assoc.iter().enumerate() { + for &r in rights { + rev[r].push(li); + } + } + + // FROM(left), TO(right). Undirected + let mut visited_left = vec![false; from_exprs.len()]; + let mut visited_right = vec![false; to_exprs.len()]; + let mut subtasks = Vec::new(); + + for li in 0..from_exprs.len() { + if assoc[li].is_empty() || visited_left[li] { + continue; + } + + #[derive(Copy, Clone)] + enum Node { + Left(usize), + Right(usize), + } + let mut left_set = Vec::new(); + let mut right_set = Vec::new(); + let mut queue = VecDeque::new(); + + visited_left[li] = true; + queue.push_back(Node::Left(li)); + + while let Some(node) = queue.pop_front() { + match node { + Node::Left(left) => { + left_set.push(left); + for &r in &assoc[left] { + if !visited_right[r] { + visited_right[r] = true; + queue.push_back(Node::Right(r)); + } + } + } + Node::Right(right) => { + right_set.push(right); + for &l in &rev[right] { + if !visited_left[l] { + visited_left[l] = true; + queue.push_back(Node::Left(l)); + } + } + } + } + } + + left_set.sort_unstable(); + right_set.sort_unstable(); + + let transition_map = left_set + .iter() + .map(|&i| assoc[i].clone()) + .collect::>(); + + subtasks.push(RepartitionSubtask { + from_expr_indices: left_set, + to_expr_indices: right_set, + transition_map, + }); + } + + Ok(subtasks) +} + +#[cfg(test)] +mod tests { + use datatypes::value::Value; + + use super::*; + use crate::expr::col; + #[test] + fn test_split_one_to_two() { + // Left: [0, 40) + let from = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(20))), + ]; + + // Right: [0, 10), [10, 20) + let to = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(10))), + col("u") + .gt_eq(Value::Int64(10)) + .and(col("u").lt(Value::Int64(20))), + ]; + + let subtasks = create_subtasks(&from, &to).unwrap(); + assert_eq!(subtasks.len(), 1); + assert_eq!(subtasks[0].from_expr_indices, vec![0]); + assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]); + assert_eq!(subtasks[0].transition_map[0], vec![0, 1]); + } + + #[test] + fn test_merge_two_to_one() { + // Left: [0, 10), [10, 20) + let from = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(10))), + col("u") + .gt_eq(Value::Int64(10)) + .and(col("u").lt(Value::Int64(20))), + ]; + // Right: [0, 40) + let to = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(20))), + ]; + + let subtasks = create_subtasks(&from, &to).unwrap(); + assert_eq!(subtasks.len(), 1); + assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]); + assert_eq!(subtasks[0].to_expr_indices, vec![0]); + assert_eq!(subtasks[0].transition_map[0], vec![0]); + assert_eq!(subtasks[0].transition_map[1], vec![0]); + } + + #[test] + fn test_create_subtasks_disconnected() { + // Left: A:[0,10), B:[20,30) + let from = vec![ + col("x") + .gt_eq(Value::Int64(0)) + .and(col("x").lt(Value::Int64(10))), + col("x") + .gt_eq(Value::Int64(20)) + .and(col("x").lt(Value::Int64(30))), + ]; + // Right: C:[5,15), D:[40,50) + let to = vec![ + col("x") + .gt_eq(Value::Int64(5)) + .and(col("x").lt(Value::Int64(15))), + col("x") + .gt_eq(Value::Int64(40)) + .and(col("x").lt(Value::Int64(50))), + ]; + + let subtasks = create_subtasks(&from, &to).unwrap(); + + // Expect two components: {A,C} and {B} has no edges so filtered out + // Note: nodes with no edges are excluded by construction + assert_eq!(subtasks.len(), 1); + assert_eq!(subtasks[0].from_expr_indices, vec![0]); + assert_eq!(subtasks[0].to_expr_indices, vec![0]); + assert_eq!(subtasks[0].transition_map, vec![vec![0]]); + } + + #[test] + fn test_create_subtasks_multi() { + // Left: [0,100), [100,200) + let from = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(100))), + col("u") + .gt_eq(Value::Int64(100)) + .and(col("u").lt(Value::Int64(200))), + ]; + // Right: [0,50), [50,150), [150,250) + let to = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(50))), + col("u") + .gt_eq(Value::Int64(50)) + .and(col("u").lt(Value::Int64(150))), + col("u") + .gt_eq(Value::Int64(150)) + .and(col("u").lt(Value::Int64(250))), + ]; + + let subtasks = create_subtasks(&from, &to).unwrap(); + // All connected into a single component + assert_eq!(subtasks.len(), 1); + assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]); + assert_eq!(subtasks[0].to_expr_indices, vec![0, 1, 2]); + // [0,100) -> [0,50), [50,150) + // [100,200) -> [50,150), [150,250) + assert_eq!(subtasks[0].transition_map[0], vec![0, 1]); + assert_eq!(subtasks[0].transition_map[1], vec![1, 2]); + } + + #[test] + fn test_two_components() { + // Left: A:[0,10), B:[20,30) + let from = vec![ + col("x") + .gt_eq(Value::Int64(0)) + .and(col("x").lt(Value::Int64(10))), + col("x") + .gt_eq(Value::Int64(20)) + .and(col("x").lt(Value::Int64(30))), + ]; + // Right: C:[5,7), D:[22,28) + let to = vec![ + col("x") + .gt_eq(Value::Int64(5)) + .and(col("x").lt(Value::Int64(7))), + col("x") + .gt_eq(Value::Int64(22)) + .and(col("x").lt(Value::Int64(28))), + ]; + let mut subtasks = create_subtasks(&from, &to).unwrap(); + // Deterministic order: left indices sorted, so components may appear in order of discovery. + assert_eq!(subtasks.len(), 2); + // Sort for stable assertion by smallest left index + subtasks.sort_by_key(|s| s.from_expr_indices[0]); + assert_eq!(subtasks[0].from_expr_indices, vec![0]); + assert_eq!(subtasks[0].to_expr_indices, vec![0]); + assert_eq!(subtasks[0].transition_map, vec![vec![0]]); + assert_eq!(subtasks[1].from_expr_indices, vec![1]); + assert_eq!(subtasks[1].to_expr_indices, vec![1]); + assert_eq!(subtasks[1].transition_map, vec![vec![1]]); + } + + #[test] + fn test_bridge_single_component() { + // Left: [0,10), [10,20) + let from = vec![ + col("u") + .gt_eq(Value::Int64(0)) + .and(col("u").lt(Value::Int64(10))), + col("u") + .gt_eq(Value::Int64(10)) + .and(col("u").lt(Value::Int64(20))), + ]; + // Right: [5,15), [15,25) + let to = vec![ + col("u") + .gt_eq(Value::Int64(5)) + .and(col("u").lt(Value::Int64(15))), + col("u") + .gt_eq(Value::Int64(15)) + .and(col("u").lt(Value::Int64(25))), + ]; + let subtasks = create_subtasks(&from, &to).unwrap(); + assert_eq!(subtasks.len(), 1); + assert_eq!(subtasks[0].from_expr_indices, vec![0, 1]); + assert_eq!(subtasks[0].to_expr_indices, vec![0, 1]); + assert_eq!(subtasks[0].transition_map[0], vec![0]); + assert_eq!(subtasks[0].transition_map[1], vec![0, 1]); + } + + #[test] + fn test_all_isolated_no_subtasks() { + // No edges at all + let from = vec![col("k").lt(Value::Int64(10))]; + let to = vec![col("k").gt_eq(Value::Int64(10))]; + let subtasks = create_subtasks(&from, &to).unwrap(); + assert!(subtasks.is_empty()); + } +} diff --git a/src/query/src/dist_plan/region_pruner.rs b/src/query/src/dist_plan/region_pruner.rs index 80a4a78998..3304bf23df 100644 --- a/src/query/src/dist_plan/region_pruner.rs +++ b/src/query/src/dist_plan/region_pruner.rs @@ -14,17 +14,14 @@ //! [`ConstraintPruner`] prunes partition info based on given expressions. -use std::cmp::Ordering; -use std::ops::Bound; - -use GluonOp::*; use ahash::{HashMap, HashSet}; use common_telemetry::debug; use datatypes::prelude::ConcreteDataType; -use datatypes::value::{OrderedF64, OrderedFloat, Value}; -use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr}; +use datatypes::value::{OrderedFloat, Value}; +use partition::collider::{AtomicExpr, Collider}; use partition::expr::{Operand, PartitionExpr}; use partition::manager::PartitionInfo; +use partition::overlap::atomic_exprs_overlap; use store_api::storage::RegionId; use crate::error::Result; @@ -109,13 +106,9 @@ impl ConstraintPruner { } fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool { - for query_atomic in query_atomics { - if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) { - return true; - } - } - - false + query_atomics + .iter() + .any(|qa| atomic_exprs_overlap(qa, partition_atomic)) } fn normalize_datatype( @@ -213,196 +206,8 @@ impl ConstraintPruner { _ => false, } } - - /// Check if a single atomic constraint can be satisfied - fn atomic_constraint_satisfied( - query_atomic: &AtomicExpr, - partition_atomic: &AtomicExpr, - ) -> bool { - let mut query_index = 0; - let mut partition_index = 0; - - while query_index < query_atomic.nucleons.len() - && partition_index < partition_atomic.nucleons.len() - { - let query_col = query_atomic.nucleons[query_index].column(); - let partition_col = partition_atomic.nucleons[partition_index].column(); - - match query_col.cmp(partition_col) { - Ordering::Equal => { - let mut query_index_for_next_col = query_index; - let mut partition_index_for_next_col = partition_index; - - while query_index_for_next_col < query_atomic.nucleons.len() - && query_atomic.nucleons[query_index_for_next_col].column() == query_col - { - query_index_for_next_col += 1; - } - while partition_index_for_next_col < partition_atomic.nucleons.len() - && partition_atomic.nucleons[partition_index_for_next_col].column() - == partition_col - { - partition_index_for_next_col += 1; - } - - let query_range = Self::nucleons_to_range( - &query_atomic.nucleons[query_index..query_index_for_next_col], - ); - let partition_range = Self::nucleons_to_range( - &partition_atomic.nucleons[partition_index..partition_index_for_next_col], - ); - - debug!("Comparing two ranges, {query_range:?} and {partition_range:?}"); - - query_index = query_index_for_next_col; - partition_index = partition_index_for_next_col; - - if !query_range.overlaps_with(&partition_range) { - return false; - } - } - Ordering::Less => { - // Query column comes before partition column - skip query column - while query_index < query_atomic.nucleons.len() - && query_atomic.nucleons[query_index].column() == query_col - { - query_index += 1; - } - } - Ordering::Greater => { - // Partition column comes before query column - skip partition column - while partition_index < partition_atomic.nucleons.len() - && partition_atomic.nucleons[partition_index].column() == partition_col - { - partition_index += 1; - } - } - } - } - - true - } - - /// Convert a slice of nucleons (all for the same column) into a ValueRange - fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange { - let mut range = ValueRange::new(); - - for nucleon in nucleons { - let value = nucleon.value(); - match nucleon.op() { - Eq => { - range.lower = Bound::Included(value); - range.upper = Bound::Included(value); - break; // exact value, most restrictive - } - Lt => { - // upper < value - range.update_upper(Bound::Excluded(value)); - } - LtEq => { - range.update_upper(Bound::Included(value)); - } - Gt => { - range.update_lower(Bound::Excluded(value)); - } - GtEq => { - range.update_lower(Bound::Included(value)); - } - NotEq => { - // handled as two separate atomic exprs elsewhere - continue; - } - } - } - - range - } -} - -/// Represents a value range derived from a group of nucleons for the same column -#[derive(Debug, Clone)] -struct ValueRange { - // lower and upper bounds using standard library Bound semantics - lower: Bound, - upper: Bound, -} - -impl ValueRange { - fn new() -> Self { - Self { - lower: Bound::Unbounded, - upper: Bound::Unbounded, - } - } - - // Update lower bound choosing the more restrictive one - fn update_lower(&mut self, new_lower: Bound) { - match (&self.lower, &new_lower) { - (Bound::Unbounded, _) => self.lower = new_lower, - (_, Bound::Unbounded) => { /* keep existing */ } - (Bound::Included(cur), Bound::Included(new)) - | (Bound::Excluded(cur), Bound::Included(new)) - | (Bound::Included(cur), Bound::Excluded(new)) - | (Bound::Excluded(cur), Bound::Excluded(new)) => { - if new > cur { - self.lower = new_lower; - } else if new == cur { - // prefer Excluded over Included for the same value (more restrictive) - if matches!(new_lower, Bound::Excluded(_)) - && matches!(self.lower, Bound::Included(_)) - { - self.lower = new_lower; - } - } - } - } - } - - // Update upper bound choosing the more restrictive one - fn update_upper(&mut self, new_upper: Bound) { - match (&self.upper, &new_upper) { - (Bound::Unbounded, _) => self.upper = new_upper, - (_, Bound::Unbounded) => { /* keep existing */ } - (Bound::Included(cur), Bound::Included(new)) - | (Bound::Excluded(cur), Bound::Included(new)) - | (Bound::Included(cur), Bound::Excluded(new)) - | (Bound::Excluded(cur), Bound::Excluded(new)) => { - if new < cur { - self.upper = new_upper; - } else if new == cur { - // prefer Excluded over Included for the same value (more restrictive) - if matches!(new_upper, Bound::Excluded(_)) - && matches!(self.upper, Bound::Included(_)) - { - self.upper = new_upper; - } - } - } - } - } - - /// Check if this range overlaps with another range - fn overlaps_with(&self, other: &ValueRange) -> bool { - fn no_overlap(upper: &Bound, lower: &Bound) -> bool { - match (upper, lower) { - (Bound::Unbounded, _) | (_, Bound::Unbounded) => false, - // u], [l - (Bound::Included(u), Bound::Included(l)) => u < l, - // u], (l - (Bound::Included(u), Bound::Excluded(l)) - // u), [l - | (Bound::Excluded(u), Bound::Included(l)) - // u), (l - | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l, - } - } - - if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) { - return false; - } - true - } } +// Value range and atomic overlap logic is now refactored into `partition::diff`. #[cfg(test)] mod tests {