From 6f2ec120598f5ed5eca436d6497968ef7a49453c Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 27 Mar 2026 15:22:02 +0800 Subject: [PATCH] feat(partition): add expression split utility (#7822) * feat(partition): add expression split utility Implement MVP split logic with checker-safe degrade paths and move module under utils/split with aligned split naming and tests. Signed-off-by: WenyXu * refactor: minor Signed-off-by: WenyXu * chore: header Signed-off-by: WenyXu * chore: styling Signed-off-by: WenyXu * fix(partition): degrade split when branch becomes unsatisfiable Detect empty conjunction branches after split and return EmptyBranch instead of silently succeeding. This keeps split behavior aligned with expected partition semantics and adds regression tests for contradictory cuts. Signed-off-by: WenyXu * fix(partition): tighten empty-branch split detection Handle Eq/NotEq contradictions and discrete-gap unsatisfiable ranges in split empty-branch checks. Add regression tests for equality conflicts and impossible int/date intervals. Signed-off-by: WenyXu * fix(partition): degrade singleton and uint impossible split branches Signed-off-by: WenyXu * fix(partition): enforce finite float bounds in split degradation Signed-off-by: WenyXu * fix(partition): drop date and timestamp support from expr split Signed-off-by: WenyXu * fix(partition): reject nan and infinity in expr split Signed-off-by: WenyXu * refactor(partition): reuse conjunction bound collection in expr split Signed-off-by: WenyXu * chore: fmt Signed-off-by: WenyXu * chore: add comments Signed-off-by: WenyXu * fix(partition): respect null-first semantics in empty branch checks Signed-off-by: WenyXu * refactor(partition): restrict expr split to range-only shapes Signed-off-by: WenyXu * docs(partition): clarify split helper scope and test names Signed-off-by: WenyXu * chore: add comments Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/partition/src/lib.rs | 1 + src/partition/src/utils.rs | 15 + src/partition/src/utils/split.rs | 1263 ++++++++++++++++++++++++++++++ 3 files changed, 1279 insertions(+) create mode 100644 src/partition/src/utils.rs create mode 100644 src/partition/src/utils/split.rs diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index c9257e8ee5..647210d1d5 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -27,5 +27,6 @@ pub mod partition; pub mod simplify; pub mod splitter; pub mod subtask; +pub mod utils; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/utils.rs b/src/partition/src/utils.rs new file mode 100644 index 0000000000..de212b5589 --- /dev/null +++ b/src/partition/src/utils.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod split; diff --git a/src/partition/src/utils/split.rs b/src/partition/src/utils/split.rs new file mode 100644 index 0000000000..4b1980e34e --- /dev/null +++ b/src/partition/src/utils/split.rs @@ -0,0 +1,1263 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Expression split utilities for partition rules. +//! +//! This module provides a conservative way to split one partition expression `R` +//! by a split expression `S` into: +//! - `left = R AND S` +//! - `right = R AND NOT(S)` +//! +//! The implementation intentionally reuses existing partition components +//! (`Collider`, `simplify`, `PartitionChecker`) and degrades to no-split when an +//! unsupported shape/type is encountered. + +use std::collections::{BTreeMap, HashSet}; + +use datatypes::value::Value; +use snafu::ensure; + +use crate::collider::Collider; +use crate::error::{self, Result}; +use crate::expr::{Operand, PartitionExpr, RestrictedOp}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExprSplitDegradeReason { + UnsupportedType, + UnsupportedNotExpansion, + ColliderRejected, + EmptyBranch, +} + +/// Splits one partition expression with a split predicate. +/// +/// Returns `(left, right)` on success, where: +/// - `left = R AND S` +/// - `right = R AND NOT(S)` +/// +/// Supported shape: +/// - `split_expr` must be a single atomic range predicate (`<`, `<=`, `>`, `>=`). +/// - `base_expr` must be a pure `AND` tree of atomic range predicates, possibly +/// across unrelated columns. +/// +/// Returns [`ExprSplitDegradeReason`] when this cannot safely process the shape/type. +pub fn split_partition_expr( + base_expr: PartitionExpr, + split_expr: PartitionExpr, +) -> std::result::Result<(PartitionExpr, PartitionExpr), ExprSplitDegradeReason> { + let base = base_expr.canonicalize(); + let split = split_expr.canonicalize(); + + if validate_supported_expr(&base).is_err() || validate_supported_expr(&split).is_err() { + return Err(ExprSplitDegradeReason::UnsupportedType); + } + + if !validate_base_expr_shape(&base) || !validate_split_expr_shape(&split) { + return Err(ExprSplitDegradeReason::UnsupportedType); + } + + let not_split = match negate_split_expr(&split) { + Ok(expr) => expr, + Err(_) => { + return Err(ExprSplitDegradeReason::UnsupportedNotExpansion); + } + }; + + let left_raw = base.clone().and(split); + let right_raw = base.clone().and(not_split); + + if Collider::new(std::slice::from_ref(&left_raw)).is_err() + || Collider::new(std::slice::from_ref(&right_raw)).is_err() + { + return Err(ExprSplitDegradeReason::ColliderRejected); + } + + let left_expr = simplify_and_bounds(left_raw); + let right_expr = simplify_and_bounds(right_raw); + + if is_empty_and_conjunction(&left_expr) || is_empty_and_conjunction(&right_expr) { + return Err(ExprSplitDegradeReason::EmptyBranch); + } + + Ok((left_expr, right_expr)) +} + +/// Detects whether a pure conjunction expression is definitely unsatisfiable. +/// +/// Scope and intent: +/// - This checker is intentionally conservative. +/// - It only analyzes expressions that can be flattened into: +/// `atom1 AND atom2 AND ...` +/// - If any `OR` is present, it returns `false` (unknown / not handled here). +/// +/// Strategy: +/// - For each column, keep only the tightest lower bound (`>` / `>=`) and +/// tightest upper bound (`<` / `<=`). +/// - `=` is treated as both lower and upper bound at the same value. +/// - `!=` is tracked per column to catch direct conflicts with `=`. +/// - After bounds are collected, the conjunction is empty iff for any column: +/// - lower value is greater than upper value, or +/// - lower value equals upper value but at least one bound is exclusive. +/// - For discrete domains (`Int*`, `UInt*`), adjacent open bounds with no +/// representable value in between are also treated as empty. +/// +/// Notes: +/// - This is still a conservative fast path focused on conjunction emptiness +/// detection for split degradation. +/// - `split_partition_expr` currently restricts its main path to range-only +/// conjunctions, but this helper remains slightly more general so shared +/// bound collection and direct conflict checks stay reusable. +fn is_empty_and_conjunction(expr: &PartitionExpr) -> bool { + let Some(collected) = collect_conjunction_bounds(expr) else { + return false; + }; + + if collected.has_conflict { + return true; + } + + let CollectedConjunction { + lowers, + uppers, + not_equals, + passthrough: _, + has_conflict: _, + } = collected; + + if lowers + .iter() + .any(|(col, lower)| !uppers.contains_key(col) && is_strictly_greater_than_domain_max(lower)) + { + return true; + } + + // Check for contradiction between collected lower/upper bounds per column. + lowers.into_iter().any(|(col, lower)| { + let Some(upper) = uppers.get(&col) else { + return false; + }; + + match lower.value.partial_cmp(&upper.value) { + Some(std::cmp::Ordering::Greater) => true, + Some(std::cmp::Ordering::Equal) => { + if !lower.inclusive || !upper.inclusive { + true + } else { + not_equals + .get(&col) + .is_some_and(|excluded| excluded.contains(&lower.value)) + } + } + Some(std::cmp::Ordering::Less) => { + match ( + discrete_value_index(&lower.value), + discrete_value_index(&upper.value), + ) { + (Some(lower_idx), Some(upper_idx)) => { + let min_candidate = if lower.inclusive { + Some(lower_idx) + } else { + lower_idx.checked_add(1) + }; + let max_candidate = if upper.inclusive { + Some(upper_idx) + } else { + upper_idx.checked_sub(1) + }; + match (min_candidate, max_candidate) { + (Some(min_val), Some(max_val)) => min_val > max_val, + _ => true, + } + } + _ => false, + } + } + _ => false, + } + }) +} + +fn discrete_value_index(v: &Value) -> Option { + match v { + Value::Int8(x) => Some(*x as i128), + Value::Int16(x) => Some(*x as i128), + Value::Int32(x) => Some(*x as i128), + Value::Int64(x) => Some(*x as i128), + Value::UInt8(x) => Some(*x as i128), + Value::UInt16(x) => Some(*x as i128), + Value::UInt32(x) => Some(*x as i128), + Value::UInt64(x) => Some(*x as i128), + _ => None, + } +} + +fn is_strictly_greater_than_domain_max(bound: &LowerBound) -> bool { + if bound.inclusive { + return false; + } + + is_domain_max_value(&bound.value) +} + +fn is_domain_max_value(v: &Value) -> bool { + match v { + Value::Float32(v) => v.0 == f32::MAX, + Value::Float64(v) => v.0 == f64::MAX, + Value::UInt8(v) => *v == u8::MAX, + Value::UInt16(v) => *v == u16::MAX, + Value::UInt32(v) => *v == u32::MAX, + Value::UInt64(v) => *v == u64::MAX, + Value::Int8(v) => *v == i8::MAX, + Value::Int16(v) => *v == i16::MAX, + Value::Int32(v) => *v == i32::MAX, + Value::Int64(v) => *v == i64::MAX, + _ => false, + } +} + +/// Rewrites `NOT(expr)` into an equivalent `PartitionExpr` without introducing a unary NOT node. +/// +/// Why this function exists: +/// - `PartitionExpr` only models binary operators. +/// - Cut logic needs `R AND NOT(S)`. +/// - We therefore rewrite `NOT(S)` into an equivalent binary-expression tree. +/// +/// Rewrite rules: +/// - Atomic comparisons: +/// - `=` <-> `!=` +/// - `<` <-> `>=` +/// - `<=` <-> `>` +/// - `>` <-> `<=` +/// - `>=` <-> `<` +/// - Boolean composition: +/// - `NOT(A AND B)` => `NOT(A) OR NOT(B)` +/// - `NOT(A OR B)` => `NOT(A) AND NOT(B)` +/// +/// Failure behavior: +/// - For `AND/OR`, both sides must be `Operand::Expr`; otherwise returns `NoExprOperand`. +/// - Any unsupported shape bubbles up as an error and the caller degrades to no-split. +pub fn negate_split_expr(expr: &PartitionExpr) -> Result { + match expr.op() { + RestrictedOp::Eq + | RestrictedOp::NotEq + | RestrictedOp::Lt + | RestrictedOp::LtEq + | RestrictedOp::Gt + | RestrictedOp::GtEq => { + // Atomic negate by operator inversion. + let op = match expr.op() { + RestrictedOp::Eq => RestrictedOp::NotEq, + RestrictedOp::NotEq => RestrictedOp::Eq, + RestrictedOp::Lt => RestrictedOp::GtEq, + RestrictedOp::LtEq => RestrictedOp::Gt, + RestrictedOp::Gt => RestrictedOp::LtEq, + RestrictedOp::GtEq => RestrictedOp::Lt, + RestrictedOp::And | RestrictedOp::Or => unreachable!(), + }; + Ok(PartitionExpr::new( + expr.lhs().clone(), + op, + expr.rhs().clone(), + )) + } + RestrictedOp::And | RestrictedOp::Or => { + // De Morgan transform on recursive sub-expressions. + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let not_lhs = negate_split_expr(lhs)?; + let not_rhs = negate_split_expr(rhs)?; + let op = match expr.op() { + // NOT(A AND B) => NOT(A) OR NOT(B) + RestrictedOp::And => RestrictedOp::Or, + // NOT(A OR B) => NOT(A) AND NOT(B) + RestrictedOp::Or => RestrictedOp::And, + _ => unreachable!(), + }; + Ok(PartitionExpr::new( + Operand::Expr(not_lhs), + op, + Operand::Expr(not_rhs), + )) + } + } +} + +pub fn validate_supported_expr(expr: &PartitionExpr) -> Result<()> { + match expr.op() { + RestrictedOp::And | RestrictedOp::Or => { + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + validate_supported_expr(lhs)?; + validate_supported_expr(rhs)?; + Ok(()) + } + _ => validate_atomic(expr), + } +} + +fn validate_atomic(expr: &PartitionExpr) -> Result<()> { + let (lhs, rhs) = (expr.lhs(), expr.rhs()); + match (lhs, rhs) { + (Operand::Column(_), Operand::Value(v)) | (Operand::Value(v), Operand::Column(_)) => { + ensure!( + is_supported_value(v), + error::InvalidExprSnafu { expr: expr.clone() } + ); + if is_nan_value(v) || is_infinite_value(v) { + return error::InvalidExprSnafu { expr: expr.clone() }.fail(); + } + Ok(()) + } + _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(), + } +} + +/// Validates that `base_expr` stays within the range-only split contract. +/// +/// Scope and intent: +/// - The split utility only handles interval-style partition predicates. +/// - `base_expr` may mention multiple columns, but it must remain a pure `AND` +/// tree of atomic range predicates. +fn validate_base_expr_shape(expr: &PartitionExpr) -> bool { + let mut atoms = Vec::new(); + if !collect_and_atoms(expr, &mut atoms) { + return false; + } + + atoms + .into_iter() + .all(|atom| is_atomic_range_expr(&atom.canonicalize())) +} + +/// Validates that `split_expr` is a single atomic range predicate. +/// +/// This restriction keeps `NOT(split_expr)` in the same range-only subset so the +/// resulting left/right branches stay within the supported contract. +fn validate_split_expr_shape(expr: &PartitionExpr) -> bool { + is_atomic_range_expr(expr) +} + +/// Returns whether `expr` is an atomic `column op value` range predicate. +/// +/// Supported operators are limited to `<`, `<=`, `>`, and `>=`. +fn is_atomic_range_expr(expr: &PartitionExpr) -> bool { + atom_col_op_val(expr).is_some_and(|(_, op, _)| { + matches!( + op, + RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq + ) + }) +} + +fn is_supported_value(v: &Value) -> bool { + matches!( + v, + Value::Int8(_) + | Value::Int16(_) + | Value::Int32(_) + | Value::Int64(_) + | Value::UInt8(_) + | Value::UInt16(_) + | Value::UInt32(_) + | Value::UInt64(_) + | Value::Float32(_) + | Value::Float64(_) + | Value::String(_) + ) +} + +fn is_nan_value(v: &Value) -> bool { + match v { + Value::Float32(x) => x.0.is_nan(), + Value::Float64(x) => x.0.is_nan(), + _ => false, + } +} + +fn is_infinite_value(v: &Value) -> bool { + match v { + Value::Float32(x) => x.0.is_infinite(), + Value::Float64(x) => x.0.is_infinite(), + _ => false, + } +} + +#[derive(Debug, Clone)] +struct LowerBound { + value: Value, + inclusive: bool, +} + +#[derive(Debug, Clone)] +struct UpperBound { + value: Value, + inclusive: bool, +} + +struct CollectedConjunction { + lowers: BTreeMap, + uppers: BTreeMap, + not_equals: BTreeMap>, + passthrough: Vec, + has_conflict: bool, +} + +/// Simplifies conjunction-only range predicates by keeping the tightest bounds per column. +/// +/// This pass is intentionally conservative and only runs when the whole expression +/// can be flattened into `atom1 AND atom2 AND ...` without any `OR` node. +/// +/// Behavior: +/// - For each column, collect all lower-bound predicates (`>` / `>=`) and keep the +/// tightest one. +/// - For each column, collect all upper-bound predicates (`<` / `<=`) and keep the +/// tightest one. +/// - Non-range predicates (for example `=` / `!=`) are preserved as-is. +/// - If the expression contains `OR`, this function returns the original expression. +/// +/// Tightness rules: +/// - Upper bound: smaller value is tighter; if equal value, exclusive (`<`) is tighter. +/// - Lower bound: larger value is tighter; if equal value, exclusive (`>`) is tighter. +/// +/// Examples: +/// - `a <= 10 AND a < 10` => `a < 10` +/// - `a >= 10 AND a > 10` => `a > 10` +/// - `a < 10 AND a < 5` => `a < 5` +fn simplify_and_bounds(expr: PartitionExpr) -> PartitionExpr { + let Some(collected) = collect_conjunction_bounds(&expr) else { + return expr; + }; + + let CollectedConjunction { + lowers, + uppers, + not_equals: _, + passthrough, + has_conflict: _, + } = collected; + + let mut out = passthrough; + out.extend(lowers.into_iter().map(|(col, lower)| { + PartitionExpr::new( + Operand::Column(col), + if lower.inclusive { + RestrictedOp::GtEq + } else { + RestrictedOp::Gt + }, + Operand::Value(lower.value), + ) + })); + out.extend(uppers.into_iter().map(|(col, upper)| { + PartitionExpr::new( + Operand::Column(col), + if upper.inclusive { + RestrictedOp::LtEq + } else { + RestrictedOp::Lt + }, + Operand::Value(upper.value), + ) + })); + + fold_and_exprs(out).unwrap_or(expr) +} + +/// Flattens an expression into atomic terms when it is a pure conjunction tree. +/// +/// Returns `false` if any `OR` is encountered, signaling caller to skip this +/// simplification path. +fn collect_and_atoms(expr: &PartitionExpr, out: &mut Vec) -> bool { + match expr.op() { + RestrictedOp::And => { + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + _ => return false, + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + _ => return false, + }; + collect_and_atoms(lhs, out) && collect_and_atoms(rhs, out) + } + RestrictedOp::Or => false, + _ => { + out.push(expr.clone()); + true + } + } +} + +/// Extracts `(column, op, value)` from a canonicalized atomic expression. +fn atom_col_op_val(expr: &PartitionExpr) -> Option<(String, RestrictedOp, Value)> { + let lhs = expr.lhs(); + let rhs = expr.rhs(); + match (lhs, rhs) { + (Operand::Column(col), Operand::Value(v)) => { + Some((col.clone(), expr.op().clone(), v.clone())) + } + _ => None, + } +} + +/// Collects per-column bounds and passthrough atoms from a pure `AND` tree. +/// +/// Scope and intent: +/// - This helper is shared by [`is_empty_and_conjunction`] and +/// [`simplify_and_bounds`] so both paths interpret conjunction atoms the same +/// way. +/// - It only handles conjunction-only expressions. If any `OR` is present, it +/// returns `None` and lets callers keep their conservative fallback behavior. +/// +/// Behavior: +/// - Tightest lower/upper bounds are recorded per column. +/// - `=` contributes both a lower and an upper bound at the same value. +/// - `!=` and non-range atoms are preserved in `passthrough` for callers that +/// need to rebuild the conjunction. +/// - `has_conflict` is set when atomic constraints already contradict each +/// other (for example `a = 1 AND a <> 1`). +/// +/// Notes: +/// - This helper is intentionally a bit more general than the current +/// `split_partition_expr` contract, which now only feeds range-only +/// conjunctions into the main split path. +fn collect_conjunction_bounds(expr: &PartitionExpr) -> Option { + let mut atoms = Vec::new(); + if !collect_and_atoms(expr, &mut atoms) { + return None; + } + + let mut lowers = BTreeMap::new(); + let mut uppers = BTreeMap::new(); + let mut equals = BTreeMap::new(); + let mut not_equals: BTreeMap> = BTreeMap::new(); + let mut passthrough = Vec::new(); + let mut seen = HashSet::new(); + let mut has_conflict = false; + + for atom in atoms { + let atom = atom.canonicalize(); + let Some((col, op, val)) = atom_col_op_val(&atom) else { + push_unique_expr(&mut passthrough, &mut seen, atom); + continue; + }; + + match op { + RestrictedOp::Lt | RestrictedOp::LtEq => update_upper_bound( + &mut uppers, + col, + UpperBound { + value: val, + inclusive: matches!(op, RestrictedOp::LtEq), + }, + ), + RestrictedOp::Gt | RestrictedOp::GtEq => update_lower_bound( + &mut lowers, + col, + LowerBound { + value: val, + inclusive: matches!(op, RestrictedOp::GtEq), + }, + ), + RestrictedOp::Eq => { + if let Some(existing) = equals.get(&col) + && existing != &val + { + has_conflict = true; + } + if not_equals + .get(&col) + .is_some_and(|excluded| excluded.contains(&val)) + { + has_conflict = true; + } + equals.insert(col.clone(), val.clone()); + update_lower_bound( + &mut lowers, + col.clone(), + LowerBound { + value: val.clone(), + inclusive: true, + }, + ); + update_upper_bound( + &mut uppers, + col, + UpperBound { + value: val, + inclusive: true, + }, + ); + push_unique_expr(&mut passthrough, &mut seen, atom); + } + RestrictedOp::NotEq => { + if equals.get(&col).is_some_and(|eq| eq == &val) { + has_conflict = true; + } + not_equals.entry(col).or_default().insert(val); + push_unique_expr(&mut passthrough, &mut seen, atom); + } + RestrictedOp::And | RestrictedOp::Or => { + push_unique_expr(&mut passthrough, &mut seen, atom); + } + } + } + + Some(CollectedConjunction { + lowers, + uppers, + not_equals, + passthrough, + has_conflict, + }) +} + +fn push_unique_expr(out: &mut Vec, seen: &mut HashSet, expr: PartitionExpr) { + let key = expr.to_string(); + if seen.insert(key) { + out.push(expr); + } +} + +fn update_upper_bound( + uppers: &mut BTreeMap, + col: String, + candidate: UpperBound, +) { + match uppers.get_mut(&col) { + Some(current) => { + if prefer_upper(&candidate, current) { + *current = candidate; + } + } + None => { + uppers.insert(col, candidate); + } + } +} + +fn update_lower_bound( + lowers: &mut BTreeMap, + col: String, + candidate: LowerBound, +) { + match lowers.get_mut(&col) { + Some(current) => { + if prefer_lower(&candidate, current) { + *current = candidate; + } + } + None => { + lowers.insert(col, candidate); + } + } +} + +fn prefer_upper(candidate: &UpperBound, current: &UpperBound) -> bool { + // "Smaller" upper bound is tighter. For equal value, exclusive is tighter. + match candidate.value.partial_cmp(¤t.value) { + Some(std::cmp::Ordering::Less) => true, + Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive, + _ => false, + } +} + +fn prefer_lower(candidate: &LowerBound, current: &LowerBound) -> bool { + // "Larger" lower bound is tighter. For equal value, exclusive is tighter. + match candidate.value.partial_cmp(¤t.value) { + Some(std::cmp::Ordering::Greater) => true, + Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive, + _ => false, + } +} + +/// Folds a list of expressions into a left-associated AND tree. +/// Returns `None` if the input list is empty. +fn fold_and_exprs(mut exprs: Vec) -> Option { + exprs.drain(..).reduce(|acc, next| acc.and(next)) +} + +#[cfg(test)] +mod tests { + use datatypes::value::{OrderedFloat, Value}; + use store_api::storage::RegionNumber; + + use super::*; + use crate::checker::PartitionChecker; + use crate::expr::col; + use crate::multi_dim::MultiDimPartitionRule; + + fn validate_cut_result_with_checker( + original_rule_exprs: &[PartitionExpr], + replaced_index: usize, + left: &Option, + right: &Option, + partition_columns: Vec, + regions: Vec, + ) -> Result<()> { + ensure!( + replaced_index < original_rule_exprs.len(), + error::UnexpectedSnafu { + err_msg: format!( + "replaced index out of bounds: {replaced_index} >= {}", + original_rule_exprs.len() + ) + } + ); + + let mut exprs = original_rule_exprs.to_vec(); + exprs.remove(replaced_index); + exprs.extend(left.iter().cloned()); + exprs.extend(right.iter().cloned()); + + ensure!( + !exprs.is_empty(), + error::UnexpectedSnafu { + err_msg: "empty rule exprs after split".to_string() + } + ); + + let final_regions = if regions.len() == exprs.len() { + regions + } else { + (0..exprs.len() as RegionNumber).collect() + }; + + let rule = MultiDimPartitionRule::try_new(partition_columns, final_regions, exprs, false)?; + let checker = PartitionChecker::try_new(&rule)?; + checker.check()?; + Ok(()) + } + + #[test] + fn test_split_simple_range() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + let (left, right) = split_partition_expr(base, split).unwrap(); + // left = R AND S = a < 5 + assert_eq!(left.to_string(), "a < 5"); + // right = R AND NOT(S) = a >= 5 AND a < 10 + assert_eq!(right.to_string(), "a >= 5 AND a < 10"); + } + + #[test] + fn test_split_string_interval() { + // R: v > 'm' AND v < 'n' + let base = col("v") + .gt(Value::String("m".into())) + .and(col("v").lt(Value::String("n".into()))); + // S: v < 'm~' + let split = col("v").lt(Value::String("m~".into())); + let (left, right) = split_partition_expr(base, split).unwrap(); + // left = (v > m AND v < n) AND (v < m~) -> v > m AND v < m~ + assert_eq!(left.to_string(), "v > m AND v < m~"); + // right = (v > m AND v < n) AND (v >= m~) -> v >= m~ AND v < n + assert_eq!(right.to_string(), "v >= m~ AND v < n"); + } + + #[test] + fn test_split_numeric_interval_mid_split() { + // R: a > 3 AND a < 10 + let base = col("a") + .gt(Value::Int64(3)) + .and(col("a").lt(Value::Int64(10))); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + + // left = (a > 3 AND a < 10) AND (a < 5) -> a > 3 AND a < 5 + assert_eq!(left.to_string(), "a > 3 AND a < 5"); + // right = (a > 3 AND a < 10) AND (a >= 5) -> a >= 5 AND a < 10 + assert_eq!(right.to_string(), "a >= 5 AND a < 10"); + } + + #[test] + fn test_split_base_expr_allows_unrelated_range_columns() { + // R: a > 20 AND b < 20 + let base = col("a") + .gt(Value::Int64(20)) + .and(col("b").lt(Value::Int64(20))); + // S: a < 30 + let split = col("a").lt(Value::Int64(30)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + + // left keeps the unrelated `b < 20` bound while splitting column `a`. + assert_eq!(left.to_string(), "a > 20 AND a < 30 AND b < 20"); + // right also preserves the unrelated column bound. + assert_eq!(right.to_string(), "a >= 30 AND b < 20"); + } + + #[test] + fn test_split_degrade_on_unsupported_type() { + // intentionally excludes boolean from split-able value types. + let base = col("a").eq(Value::Boolean(true)); + let split = col("a").eq(Value::Boolean(true)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_validate_cut_result_with_checker() { + // Original partition set: a < 10, a >= 10 + let original = vec![ + col("a").lt(Value::Int64(10)), + col("a").gt_eq(Value::Int64(10)), + ]; + let left = Some(col("a").lt(Value::Int64(5))); + let right = Some( + col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt(Value::Int64(10))), + ); + + validate_cut_result_with_checker( + &original, + 0, + &left, + &right, + vec!["a".to_string()], + vec![1, 2, 3], + ) + .unwrap(); + } + + #[test] + fn test_split_degrade_on_empty_branch() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: a < 20 + let split = col("a").lt(Value::Int64(20)); + + // right = (a < 10) AND (a >= 20) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_rejects_eq_in_base_expr() { + // R: a = 5 falls outside the range-only base_expr contract. + let base = col("a").eq(Value::Int64(5)); + // S: a < 6 remains a valid range split. + let split = col("a").lt(Value::Int64(6)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_discrete_gap_int() { + // R: a < 5 + let base = col("a").lt(Value::Int64(5)); + // S: a <= 4 + let split = col("a").lt_eq(Value::Int64(4)); + + // right = (a < 5) AND (a > 4) has no integer solution, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_unsupported_date_type() { + // Date is intentionally excluded from split-supported value types. + let base = col("d").lt(Value::Date(5.into())); + let split = col("d").lt_eq(Value::Date(4.into())); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_unsupported_timestamp_type() { + // Timestamp is intentionally excluded from split-supported value types. + let base = col("ts").lt(Value::Timestamp(0.into())); + let split = col("ts").lt_eq(Value::Timestamp(1.into())); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_not_eq_in_split_expr() { + // R: a >= 5 AND a <= 5 + let base = col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt_eq(Value::Int64(5))); + // S: a <> 5 falls outside the range-only split_expr contract. + let split = col("a").not_eq(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_eq_in_split_expr() { + // R: a >= 5 AND a <= 5 + let base = col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt_eq(Value::Int64(5))); + // S: a = 5 falls outside the range-only split_expr contract. + let split = col("a").eq(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_uint_one_sided_impossible_upper_bound() { + // R: a < 10 (UInt64 domain) + let base = col("a").lt(Value::UInt64(10)); + // S: a < 0 is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::UInt64(0)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), "a < 0"); + assert_eq!(right.to_string(), "a >= 0 AND a < 10"); + } + + #[test] + fn test_split_degrade_on_uint_one_sided_impossible_lower_bound() { + // R: a < 10 (UInt64 domain) + let base = col("a").lt(Value::UInt64(10)); + // S: a > u64::MAX (impossible on UInt64) + let split = col("a").gt(Value::UInt64(u64::MAX)); + + // left = (a < 10) AND (a > u64::MAX) is unsatisfiable on UInt64, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_int_one_sided_impossible_upper_bound() { + // R: a < 10 (Int64 domain) + let base = col("a").lt(Value::Int64(10)); + // S: a < i64::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Int64(i64::MIN)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", i64::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", i64::MIN)); + } + + #[test] + fn test_split_degrade_on_int_one_sided_impossible_lower_bound() { + // R: a < 10 (Int64 domain) + let base = col("a").lt(Value::Int64(10)); + // S: a > i64::MAX (impossible on Int64) + let split = col("a").gt(Value::Int64(i64::MAX)); + + // left = (a < 10) AND (a > i64::MAX) is unsatisfiable on Int64, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_string_one_sided_impossible_upper_bound() { + // R: s < "z" (String domain) + let base = col("s").lt(Value::String("z".into())); + // S: s < "" is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("s").lt(Value::String("".into())); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), "s < "); + assert_eq!(right.to_string(), "s >= AND s < z"); + } + + #[test] + fn test_split_degrade_on_float64_one_sided_impossible_upper_bound() { + // R: a < 10.0 (Float64 domain) + let base = col("a").lt(Value::Float64(OrderedFloat(10.0))); + // S: a < f64::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Float64(OrderedFloat(f64::MIN))); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", f64::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f64::MIN)); + } + + #[test] + fn test_split_degrade_on_float64_one_sided_impossible_lower_bound() { + // R: a < 10.0 (Float64 domain) + let base = col("a").lt(Value::Float64(OrderedFloat(10.0))); + // S: a > f64::MAX (impossible with finite-only float policy) + let split = col("a").gt(Value::Float64(OrderedFloat(f64::MAX))); + + // left = (a < 10.0) AND (a > f64::MAX) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_float32_one_sided_impossible_upper_bound() { + // R: a < 10.0f32 (Float32 domain) + let base = col("a").lt(Value::Float32(OrderedFloat(10.0))); + // S: a < f32::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Float32(OrderedFloat(f32::MIN))); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", f32::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f32::MIN)); + } + + #[test] + fn test_split_degrade_on_float32_one_sided_impossible_lower_bound() { + // R: a < 10.0f32 (Float32 domain) + let base = col("a").lt(Value::Float32(OrderedFloat(10.0))); + // S: a > f32::MAX (impossible with finite-only float policy) + let split = col("a").gt(Value::Float32(OrderedFloat(f32::MAX))); + + // left = (a < 10.0f32) AND (a > f32::MAX) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_simplify_same_upper_bound_prefers_strict() { + // a <= 10 AND a < 10 => a < 10 + let expr = col("a") + .lt_eq(Value::Int64(10)) + .and(col("a").lt(Value::Int64(10))); + + let simplified = simplify_and_bounds(expr); + assert_eq!(simplified.to_string(), "a < 10"); + } + + #[test] + fn test_simplify_same_lower_bound_prefers_strict() { + // a >= 10 AND a > 10 => a > 10 + let expr = col("a") + .gt_eq(Value::Int64(10)) + .and(col("a").gt(Value::Int64(10))); + + let simplified = simplify_and_bounds(expr); + assert_eq!(simplified.to_string(), "a > 10"); + } + + #[test] + fn test_negate_split_expr_demorgan_and() { + // expr: (a < 10) AND (a >= 3) + let expr = col("a") + .lt(Value::Int64(10)) + .and(col("a").gt_eq(Value::Int64(3))); + let not_expr = negate_split_expr(&expr).unwrap(); + // NOT(expr) => (a >= 10) OR (a < 3) + assert_eq!(not_expr.to_string(), "a >= 10 OR a < 3"); + } + + #[test] + fn test_negate_split_expr_demorgan_or() { + // expr: (a = 1) OR (a <> 2) + let expr = PartitionExpr::new( + Operand::Expr(col("a").eq(Value::Int64(1))), + RestrictedOp::Or, + Operand::Expr(col("a").not_eq(Value::Int64(2))), + ); + let not_expr = negate_split_expr(&expr).unwrap(); + // NOT(expr) => (a <> 1) AND (a = 2) + assert_eq!(not_expr.to_string(), "a <> 1 AND a = 2"); + } + + #[test] + fn test_negate_split_expr_invalid_and_operand() { + // malformed AND: rhs is a scalar value, not an Expr subtree. + let malformed = PartitionExpr { + lhs: Box::new(Operand::Expr(col("a").lt(Value::Int64(10)))), + op: RestrictedOp::And, + rhs: Box::new(Operand::Value(Value::Int64(1))), + }; + assert!(negate_split_expr(&malformed).is_err()); + } + + #[test] + fn test_validate_supported_expr_value_column_allowed() { + // Canonicalization can flip to column-value; validator must accept value-column input. + let expr = PartitionExpr::new( + Operand::Value(Value::Int64(10)), + RestrictedOp::Lt, + Operand::Column("a".to_string()), + ); + assert!(validate_supported_expr(&expr).is_ok()); + } + + #[test] + fn test_validate_supported_expr_invalid_atomic_shape() { + // column-column atomic comparison is out of shape. + let expr = PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Eq, + Operand::Column("b".to_string()), + ); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_nan_comparison_rejected() { + // NaN cannot be used in any supported comparison predicate. + let expr = col("a").lt(Value::Float64(OrderedFloat(f64::NAN))); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_infinite_comparison_rejected() { + // Infinity cannot be used in any supported comparison predicate under + // finite-only float policy. + let pos_inf = col("a").gt(Value::Float64(OrderedFloat(f64::INFINITY))); + let neg_inf = col("a").lt(Value::Float32(OrderedFloat(f32::NEG_INFINITY))); + assert!(validate_supported_expr(&pos_inf).is_err()); + assert!(validate_supported_expr(&neg_inf).is_err()); + } + + #[test] + fn test_validate_supported_expr_nan_eq_rejected() { + let expr = col("a").eq(Value::Float64(OrderedFloat(f64::NAN))); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_infinite_eq_rejected() { + let pos_inf = col("a").eq(Value::Float64(OrderedFloat(f64::INFINITY))); + let neg_inf = col("a").not_eq(Value::Float32(OrderedFloat(f32::NEG_INFINITY))); + assert!(validate_supported_expr(&pos_inf).is_err()); + assert!(validate_supported_expr(&neg_inf).is_err()); + } + + #[test] + fn test_simplify_and_bounds_or_keeps_original() { + // OR tree is intentionally not flattened by AND-only simplifier. + let expr = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr(col("a").gt_eq(Value::Int64(20))), + ); + let simplified = simplify_and_bounds(expr.clone()); + assert_eq!(simplified.to_string(), expr.to_string()); + } + + #[test] + fn test_simplify_and_bounds_keep_stronger_when_weaker_seen_later() { + // upper: stronger bound first, weaker later -> keep stronger (< 5). + let upper = col("a") + .lt(Value::Int64(5)) + .and(col("a").lt(Value::Int64(10))); + assert_eq!(simplify_and_bounds(upper).to_string(), "a < 5"); + + // lower: stronger bound first, weaker later -> keep stronger (> 10). + let lower = col("a") + .gt(Value::Int64(10)) + .and(col("a").gt(Value::Int64(5))); + assert_eq!(simplify_and_bounds(lower).to_string(), "a > 10"); + } + + #[test] + fn test_internal_helpers_uncovered_branches() { + // Empty AND fold should return None. + assert!(fold_and_exprs(vec![]).is_none()); + + // Any OR in tree disables AND-bound simplification path. + let mut out = Vec::new(); + let or_expr = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr(col("a").gt_eq(Value::Int64(20))), + ); + assert!(!collect_and_atoms(&or_expr, &mut out)); + + // value-value atom has no (column, op, value) projection. + let value_value = PartitionExpr::new( + Operand::Value(Value::Int64(1)), + RestrictedOp::Eq, + Operand::Value(Value::Int64(2)), + ); + assert!(atom_col_op_val(&value_value).is_none()); + } + + #[test] + fn test_split_rejects_or_in_base_expr() { + // R: (a < 10) OR (a >= 20 AND a < 30) falls outside the AND-only base_expr contract. + let base = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr( + col("a") + .gt_eq(Value::Int64(20)) + .and(col("a").lt(Value::Int64(30))), + ), + ); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_or_in_split_expr() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: (a < 5) OR (a >= 8 AND a < 9) falls outside the atomic split_expr contract. + let split = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(5))), + RestrictedOp::Or, + Operand::Expr( + col("a") + .gt_eq(Value::Int64(8)) + .and(col("a").lt(Value::Int64(9))), + ), + ); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } +}