diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index c9257e8ee5..647210d1d5 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -27,5 +27,6 @@ pub mod partition; pub mod simplify; pub mod splitter; pub mod subtask; +pub mod utils; pub use crate::partition::{PartitionRule, PartitionRuleRef}; diff --git a/src/partition/src/utils.rs b/src/partition/src/utils.rs new file mode 100644 index 0000000000..de212b5589 --- /dev/null +++ b/src/partition/src/utils.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod split; diff --git a/src/partition/src/utils/split.rs b/src/partition/src/utils/split.rs new file mode 100644 index 0000000000..4b1980e34e --- /dev/null +++ b/src/partition/src/utils/split.rs @@ -0,0 +1,1263 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Expression split utilities for partition rules. +//! +//! This module provides a conservative way to split one partition expression `R` +//! by a split expression `S` into: +//! - `left = R AND S` +//! - `right = R AND NOT(S)` +//! +//! The implementation intentionally reuses existing partition components +//! (`Collider`, `simplify`, `PartitionChecker`) and degrades to no-split when an +//! unsupported shape/type is encountered. + +use std::collections::{BTreeMap, HashSet}; + +use datatypes::value::Value; +use snafu::ensure; + +use crate::collider::Collider; +use crate::error::{self, Result}; +use crate::expr::{Operand, PartitionExpr, RestrictedOp}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExprSplitDegradeReason { + UnsupportedType, + UnsupportedNotExpansion, + ColliderRejected, + EmptyBranch, +} + +/// Splits one partition expression with a split predicate. +/// +/// Returns `(left, right)` on success, where: +/// - `left = R AND S` +/// - `right = R AND NOT(S)` +/// +/// Supported shape: +/// - `split_expr` must be a single atomic range predicate (`<`, `<=`, `>`, `>=`). +/// - `base_expr` must be a pure `AND` tree of atomic range predicates, possibly +/// across unrelated columns. +/// +/// Returns [`ExprSplitDegradeReason`] when this cannot safely process the shape/type. +pub fn split_partition_expr( + base_expr: PartitionExpr, + split_expr: PartitionExpr, +) -> std::result::Result<(PartitionExpr, PartitionExpr), ExprSplitDegradeReason> { + let base = base_expr.canonicalize(); + let split = split_expr.canonicalize(); + + if validate_supported_expr(&base).is_err() || validate_supported_expr(&split).is_err() { + return Err(ExprSplitDegradeReason::UnsupportedType); + } + + if !validate_base_expr_shape(&base) || !validate_split_expr_shape(&split) { + return Err(ExprSplitDegradeReason::UnsupportedType); + } + + let not_split = match negate_split_expr(&split) { + Ok(expr) => expr, + Err(_) => { + return Err(ExprSplitDegradeReason::UnsupportedNotExpansion); + } + }; + + let left_raw = base.clone().and(split); + let right_raw = base.clone().and(not_split); + + if Collider::new(std::slice::from_ref(&left_raw)).is_err() + || Collider::new(std::slice::from_ref(&right_raw)).is_err() + { + return Err(ExprSplitDegradeReason::ColliderRejected); + } + + let left_expr = simplify_and_bounds(left_raw); + let right_expr = simplify_and_bounds(right_raw); + + if is_empty_and_conjunction(&left_expr) || is_empty_and_conjunction(&right_expr) { + return Err(ExprSplitDegradeReason::EmptyBranch); + } + + Ok((left_expr, right_expr)) +} + +/// Detects whether a pure conjunction expression is definitely unsatisfiable. +/// +/// Scope and intent: +/// - This checker is intentionally conservative. +/// - It only analyzes expressions that can be flattened into: +/// `atom1 AND atom2 AND ...` +/// - If any `OR` is present, it returns `false` (unknown / not handled here). +/// +/// Strategy: +/// - For each column, keep only the tightest lower bound (`>` / `>=`) and +/// tightest upper bound (`<` / `<=`). +/// - `=` is treated as both lower and upper bound at the same value. +/// - `!=` is tracked per column to catch direct conflicts with `=`. +/// - After bounds are collected, the conjunction is empty iff for any column: +/// - lower value is greater than upper value, or +/// - lower value equals upper value but at least one bound is exclusive. +/// - For discrete domains (`Int*`, `UInt*`), adjacent open bounds with no +/// representable value in between are also treated as empty. +/// +/// Notes: +/// - This is still a conservative fast path focused on conjunction emptiness +/// detection for split degradation. +/// - `split_partition_expr` currently restricts its main path to range-only +/// conjunctions, but this helper remains slightly more general so shared +/// bound collection and direct conflict checks stay reusable. +fn is_empty_and_conjunction(expr: &PartitionExpr) -> bool { + let Some(collected) = collect_conjunction_bounds(expr) else { + return false; + }; + + if collected.has_conflict { + return true; + } + + let CollectedConjunction { + lowers, + uppers, + not_equals, + passthrough: _, + has_conflict: _, + } = collected; + + if lowers + .iter() + .any(|(col, lower)| !uppers.contains_key(col) && is_strictly_greater_than_domain_max(lower)) + { + return true; + } + + // Check for contradiction between collected lower/upper bounds per column. + lowers.into_iter().any(|(col, lower)| { + let Some(upper) = uppers.get(&col) else { + return false; + }; + + match lower.value.partial_cmp(&upper.value) { + Some(std::cmp::Ordering::Greater) => true, + Some(std::cmp::Ordering::Equal) => { + if !lower.inclusive || !upper.inclusive { + true + } else { + not_equals + .get(&col) + .is_some_and(|excluded| excluded.contains(&lower.value)) + } + } + Some(std::cmp::Ordering::Less) => { + match ( + discrete_value_index(&lower.value), + discrete_value_index(&upper.value), + ) { + (Some(lower_idx), Some(upper_idx)) => { + let min_candidate = if lower.inclusive { + Some(lower_idx) + } else { + lower_idx.checked_add(1) + }; + let max_candidate = if upper.inclusive { + Some(upper_idx) + } else { + upper_idx.checked_sub(1) + }; + match (min_candidate, max_candidate) { + (Some(min_val), Some(max_val)) => min_val > max_val, + _ => true, + } + } + _ => false, + } + } + _ => false, + } + }) +} + +fn discrete_value_index(v: &Value) -> Option { + match v { + Value::Int8(x) => Some(*x as i128), + Value::Int16(x) => Some(*x as i128), + Value::Int32(x) => Some(*x as i128), + Value::Int64(x) => Some(*x as i128), + Value::UInt8(x) => Some(*x as i128), + Value::UInt16(x) => Some(*x as i128), + Value::UInt32(x) => Some(*x as i128), + Value::UInt64(x) => Some(*x as i128), + _ => None, + } +} + +fn is_strictly_greater_than_domain_max(bound: &LowerBound) -> bool { + if bound.inclusive { + return false; + } + + is_domain_max_value(&bound.value) +} + +fn is_domain_max_value(v: &Value) -> bool { + match v { + Value::Float32(v) => v.0 == f32::MAX, + Value::Float64(v) => v.0 == f64::MAX, + Value::UInt8(v) => *v == u8::MAX, + Value::UInt16(v) => *v == u16::MAX, + Value::UInt32(v) => *v == u32::MAX, + Value::UInt64(v) => *v == u64::MAX, + Value::Int8(v) => *v == i8::MAX, + Value::Int16(v) => *v == i16::MAX, + Value::Int32(v) => *v == i32::MAX, + Value::Int64(v) => *v == i64::MAX, + _ => false, + } +} + +/// Rewrites `NOT(expr)` into an equivalent `PartitionExpr` without introducing a unary NOT node. +/// +/// Why this function exists: +/// - `PartitionExpr` only models binary operators. +/// - Cut logic needs `R AND NOT(S)`. +/// - We therefore rewrite `NOT(S)` into an equivalent binary-expression tree. +/// +/// Rewrite rules: +/// - Atomic comparisons: +/// - `=` <-> `!=` +/// - `<` <-> `>=` +/// - `<=` <-> `>` +/// - `>` <-> `<=` +/// - `>=` <-> `<` +/// - Boolean composition: +/// - `NOT(A AND B)` => `NOT(A) OR NOT(B)` +/// - `NOT(A OR B)` => `NOT(A) AND NOT(B)` +/// +/// Failure behavior: +/// - For `AND/OR`, both sides must be `Operand::Expr`; otherwise returns `NoExprOperand`. +/// - Any unsupported shape bubbles up as an error and the caller degrades to no-split. +pub fn negate_split_expr(expr: &PartitionExpr) -> Result { + match expr.op() { + RestrictedOp::Eq + | RestrictedOp::NotEq + | RestrictedOp::Lt + | RestrictedOp::LtEq + | RestrictedOp::Gt + | RestrictedOp::GtEq => { + // Atomic negate by operator inversion. + let op = match expr.op() { + RestrictedOp::Eq => RestrictedOp::NotEq, + RestrictedOp::NotEq => RestrictedOp::Eq, + RestrictedOp::Lt => RestrictedOp::GtEq, + RestrictedOp::LtEq => RestrictedOp::Gt, + RestrictedOp::Gt => RestrictedOp::LtEq, + RestrictedOp::GtEq => RestrictedOp::Lt, + RestrictedOp::And | RestrictedOp::Or => unreachable!(), + }; + Ok(PartitionExpr::new( + expr.lhs().clone(), + op, + expr.rhs().clone(), + )) + } + RestrictedOp::And | RestrictedOp::Or => { + // De Morgan transform on recursive sub-expressions. + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let not_lhs = negate_split_expr(lhs)?; + let not_rhs = negate_split_expr(rhs)?; + let op = match expr.op() { + // NOT(A AND B) => NOT(A) OR NOT(B) + RestrictedOp::And => RestrictedOp::Or, + // NOT(A OR B) => NOT(A) AND NOT(B) + RestrictedOp::Or => RestrictedOp::And, + _ => unreachable!(), + }; + Ok(PartitionExpr::new( + Operand::Expr(not_lhs), + op, + Operand::Expr(not_rhs), + )) + } + } +} + +pub fn validate_supported_expr(expr: &PartitionExpr) -> Result<()> { + match expr.op() { + RestrictedOp::And | RestrictedOp::Or => { + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + other => { + return error::NoExprOperandSnafu { + operand: other.clone(), + } + .fail(); + } + }; + validate_supported_expr(lhs)?; + validate_supported_expr(rhs)?; + Ok(()) + } + _ => validate_atomic(expr), + } +} + +fn validate_atomic(expr: &PartitionExpr) -> Result<()> { + let (lhs, rhs) = (expr.lhs(), expr.rhs()); + match (lhs, rhs) { + (Operand::Column(_), Operand::Value(v)) | (Operand::Value(v), Operand::Column(_)) => { + ensure!( + is_supported_value(v), + error::InvalidExprSnafu { expr: expr.clone() } + ); + if is_nan_value(v) || is_infinite_value(v) { + return error::InvalidExprSnafu { expr: expr.clone() }.fail(); + } + Ok(()) + } + _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(), + } +} + +/// Validates that `base_expr` stays within the range-only split contract. +/// +/// Scope and intent: +/// - The split utility only handles interval-style partition predicates. +/// - `base_expr` may mention multiple columns, but it must remain a pure `AND` +/// tree of atomic range predicates. +fn validate_base_expr_shape(expr: &PartitionExpr) -> bool { + let mut atoms = Vec::new(); + if !collect_and_atoms(expr, &mut atoms) { + return false; + } + + atoms + .into_iter() + .all(|atom| is_atomic_range_expr(&atom.canonicalize())) +} + +/// Validates that `split_expr` is a single atomic range predicate. +/// +/// This restriction keeps `NOT(split_expr)` in the same range-only subset so the +/// resulting left/right branches stay within the supported contract. +fn validate_split_expr_shape(expr: &PartitionExpr) -> bool { + is_atomic_range_expr(expr) +} + +/// Returns whether `expr` is an atomic `column op value` range predicate. +/// +/// Supported operators are limited to `<`, `<=`, `>`, and `>=`. +fn is_atomic_range_expr(expr: &PartitionExpr) -> bool { + atom_col_op_val(expr).is_some_and(|(_, op, _)| { + matches!( + op, + RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq + ) + }) +} + +fn is_supported_value(v: &Value) -> bool { + matches!( + v, + Value::Int8(_) + | Value::Int16(_) + | Value::Int32(_) + | Value::Int64(_) + | Value::UInt8(_) + | Value::UInt16(_) + | Value::UInt32(_) + | Value::UInt64(_) + | Value::Float32(_) + | Value::Float64(_) + | Value::String(_) + ) +} + +fn is_nan_value(v: &Value) -> bool { + match v { + Value::Float32(x) => x.0.is_nan(), + Value::Float64(x) => x.0.is_nan(), + _ => false, + } +} + +fn is_infinite_value(v: &Value) -> bool { + match v { + Value::Float32(x) => x.0.is_infinite(), + Value::Float64(x) => x.0.is_infinite(), + _ => false, + } +} + +#[derive(Debug, Clone)] +struct LowerBound { + value: Value, + inclusive: bool, +} + +#[derive(Debug, Clone)] +struct UpperBound { + value: Value, + inclusive: bool, +} + +struct CollectedConjunction { + lowers: BTreeMap, + uppers: BTreeMap, + not_equals: BTreeMap>, + passthrough: Vec, + has_conflict: bool, +} + +/// Simplifies conjunction-only range predicates by keeping the tightest bounds per column. +/// +/// This pass is intentionally conservative and only runs when the whole expression +/// can be flattened into `atom1 AND atom2 AND ...` without any `OR` node. +/// +/// Behavior: +/// - For each column, collect all lower-bound predicates (`>` / `>=`) and keep the +/// tightest one. +/// - For each column, collect all upper-bound predicates (`<` / `<=`) and keep the +/// tightest one. +/// - Non-range predicates (for example `=` / `!=`) are preserved as-is. +/// - If the expression contains `OR`, this function returns the original expression. +/// +/// Tightness rules: +/// - Upper bound: smaller value is tighter; if equal value, exclusive (`<`) is tighter. +/// - Lower bound: larger value is tighter; if equal value, exclusive (`>`) is tighter. +/// +/// Examples: +/// - `a <= 10 AND a < 10` => `a < 10` +/// - `a >= 10 AND a > 10` => `a > 10` +/// - `a < 10 AND a < 5` => `a < 5` +fn simplify_and_bounds(expr: PartitionExpr) -> PartitionExpr { + let Some(collected) = collect_conjunction_bounds(&expr) else { + return expr; + }; + + let CollectedConjunction { + lowers, + uppers, + not_equals: _, + passthrough, + has_conflict: _, + } = collected; + + let mut out = passthrough; + out.extend(lowers.into_iter().map(|(col, lower)| { + PartitionExpr::new( + Operand::Column(col), + if lower.inclusive { + RestrictedOp::GtEq + } else { + RestrictedOp::Gt + }, + Operand::Value(lower.value), + ) + })); + out.extend(uppers.into_iter().map(|(col, upper)| { + PartitionExpr::new( + Operand::Column(col), + if upper.inclusive { + RestrictedOp::LtEq + } else { + RestrictedOp::Lt + }, + Operand::Value(upper.value), + ) + })); + + fold_and_exprs(out).unwrap_or(expr) +} + +/// Flattens an expression into atomic terms when it is a pure conjunction tree. +/// +/// Returns `false` if any `OR` is encountered, signaling caller to skip this +/// simplification path. +fn collect_and_atoms(expr: &PartitionExpr, out: &mut Vec) -> bool { + match expr.op() { + RestrictedOp::And => { + let lhs = match expr.lhs() { + Operand::Expr(lhs) => lhs, + _ => return false, + }; + let rhs = match expr.rhs() { + Operand::Expr(rhs) => rhs, + _ => return false, + }; + collect_and_atoms(lhs, out) && collect_and_atoms(rhs, out) + } + RestrictedOp::Or => false, + _ => { + out.push(expr.clone()); + true + } + } +} + +/// Extracts `(column, op, value)` from a canonicalized atomic expression. +fn atom_col_op_val(expr: &PartitionExpr) -> Option<(String, RestrictedOp, Value)> { + let lhs = expr.lhs(); + let rhs = expr.rhs(); + match (lhs, rhs) { + (Operand::Column(col), Operand::Value(v)) => { + Some((col.clone(), expr.op().clone(), v.clone())) + } + _ => None, + } +} + +/// Collects per-column bounds and passthrough atoms from a pure `AND` tree. +/// +/// Scope and intent: +/// - This helper is shared by [`is_empty_and_conjunction`] and +/// [`simplify_and_bounds`] so both paths interpret conjunction atoms the same +/// way. +/// - It only handles conjunction-only expressions. If any `OR` is present, it +/// returns `None` and lets callers keep their conservative fallback behavior. +/// +/// Behavior: +/// - Tightest lower/upper bounds are recorded per column. +/// - `=` contributes both a lower and an upper bound at the same value. +/// - `!=` and non-range atoms are preserved in `passthrough` for callers that +/// need to rebuild the conjunction. +/// - `has_conflict` is set when atomic constraints already contradict each +/// other (for example `a = 1 AND a <> 1`). +/// +/// Notes: +/// - This helper is intentionally a bit more general than the current +/// `split_partition_expr` contract, which now only feeds range-only +/// conjunctions into the main split path. +fn collect_conjunction_bounds(expr: &PartitionExpr) -> Option { + let mut atoms = Vec::new(); + if !collect_and_atoms(expr, &mut atoms) { + return None; + } + + let mut lowers = BTreeMap::new(); + let mut uppers = BTreeMap::new(); + let mut equals = BTreeMap::new(); + let mut not_equals: BTreeMap> = BTreeMap::new(); + let mut passthrough = Vec::new(); + let mut seen = HashSet::new(); + let mut has_conflict = false; + + for atom in atoms { + let atom = atom.canonicalize(); + let Some((col, op, val)) = atom_col_op_val(&atom) else { + push_unique_expr(&mut passthrough, &mut seen, atom); + continue; + }; + + match op { + RestrictedOp::Lt | RestrictedOp::LtEq => update_upper_bound( + &mut uppers, + col, + UpperBound { + value: val, + inclusive: matches!(op, RestrictedOp::LtEq), + }, + ), + RestrictedOp::Gt | RestrictedOp::GtEq => update_lower_bound( + &mut lowers, + col, + LowerBound { + value: val, + inclusive: matches!(op, RestrictedOp::GtEq), + }, + ), + RestrictedOp::Eq => { + if let Some(existing) = equals.get(&col) + && existing != &val + { + has_conflict = true; + } + if not_equals + .get(&col) + .is_some_and(|excluded| excluded.contains(&val)) + { + has_conflict = true; + } + equals.insert(col.clone(), val.clone()); + update_lower_bound( + &mut lowers, + col.clone(), + LowerBound { + value: val.clone(), + inclusive: true, + }, + ); + update_upper_bound( + &mut uppers, + col, + UpperBound { + value: val, + inclusive: true, + }, + ); + push_unique_expr(&mut passthrough, &mut seen, atom); + } + RestrictedOp::NotEq => { + if equals.get(&col).is_some_and(|eq| eq == &val) { + has_conflict = true; + } + not_equals.entry(col).or_default().insert(val); + push_unique_expr(&mut passthrough, &mut seen, atom); + } + RestrictedOp::And | RestrictedOp::Or => { + push_unique_expr(&mut passthrough, &mut seen, atom); + } + } + } + + Some(CollectedConjunction { + lowers, + uppers, + not_equals, + passthrough, + has_conflict, + }) +} + +fn push_unique_expr(out: &mut Vec, seen: &mut HashSet, expr: PartitionExpr) { + let key = expr.to_string(); + if seen.insert(key) { + out.push(expr); + } +} + +fn update_upper_bound( + uppers: &mut BTreeMap, + col: String, + candidate: UpperBound, +) { + match uppers.get_mut(&col) { + Some(current) => { + if prefer_upper(&candidate, current) { + *current = candidate; + } + } + None => { + uppers.insert(col, candidate); + } + } +} + +fn update_lower_bound( + lowers: &mut BTreeMap, + col: String, + candidate: LowerBound, +) { + match lowers.get_mut(&col) { + Some(current) => { + if prefer_lower(&candidate, current) { + *current = candidate; + } + } + None => { + lowers.insert(col, candidate); + } + } +} + +fn prefer_upper(candidate: &UpperBound, current: &UpperBound) -> bool { + // "Smaller" upper bound is tighter. For equal value, exclusive is tighter. + match candidate.value.partial_cmp(¤t.value) { + Some(std::cmp::Ordering::Less) => true, + Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive, + _ => false, + } +} + +fn prefer_lower(candidate: &LowerBound, current: &LowerBound) -> bool { + // "Larger" lower bound is tighter. For equal value, exclusive is tighter. + match candidate.value.partial_cmp(¤t.value) { + Some(std::cmp::Ordering::Greater) => true, + Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive, + _ => false, + } +} + +/// Folds a list of expressions into a left-associated AND tree. +/// Returns `None` if the input list is empty. +fn fold_and_exprs(mut exprs: Vec) -> Option { + exprs.drain(..).reduce(|acc, next| acc.and(next)) +} + +#[cfg(test)] +mod tests { + use datatypes::value::{OrderedFloat, Value}; + use store_api::storage::RegionNumber; + + use super::*; + use crate::checker::PartitionChecker; + use crate::expr::col; + use crate::multi_dim::MultiDimPartitionRule; + + fn validate_cut_result_with_checker( + original_rule_exprs: &[PartitionExpr], + replaced_index: usize, + left: &Option, + right: &Option, + partition_columns: Vec, + regions: Vec, + ) -> Result<()> { + ensure!( + replaced_index < original_rule_exprs.len(), + error::UnexpectedSnafu { + err_msg: format!( + "replaced index out of bounds: {replaced_index} >= {}", + original_rule_exprs.len() + ) + } + ); + + let mut exprs = original_rule_exprs.to_vec(); + exprs.remove(replaced_index); + exprs.extend(left.iter().cloned()); + exprs.extend(right.iter().cloned()); + + ensure!( + !exprs.is_empty(), + error::UnexpectedSnafu { + err_msg: "empty rule exprs after split".to_string() + } + ); + + let final_regions = if regions.len() == exprs.len() { + regions + } else { + (0..exprs.len() as RegionNumber).collect() + }; + + let rule = MultiDimPartitionRule::try_new(partition_columns, final_regions, exprs, false)?; + let checker = PartitionChecker::try_new(&rule)?; + checker.check()?; + Ok(()) + } + + #[test] + fn test_split_simple_range() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + let (left, right) = split_partition_expr(base, split).unwrap(); + // left = R AND S = a < 5 + assert_eq!(left.to_string(), "a < 5"); + // right = R AND NOT(S) = a >= 5 AND a < 10 + assert_eq!(right.to_string(), "a >= 5 AND a < 10"); + } + + #[test] + fn test_split_string_interval() { + // R: v > 'm' AND v < 'n' + let base = col("v") + .gt(Value::String("m".into())) + .and(col("v").lt(Value::String("n".into()))); + // S: v < 'm~' + let split = col("v").lt(Value::String("m~".into())); + let (left, right) = split_partition_expr(base, split).unwrap(); + // left = (v > m AND v < n) AND (v < m~) -> v > m AND v < m~ + assert_eq!(left.to_string(), "v > m AND v < m~"); + // right = (v > m AND v < n) AND (v >= m~) -> v >= m~ AND v < n + assert_eq!(right.to_string(), "v >= m~ AND v < n"); + } + + #[test] + fn test_split_numeric_interval_mid_split() { + // R: a > 3 AND a < 10 + let base = col("a") + .gt(Value::Int64(3)) + .and(col("a").lt(Value::Int64(10))); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + + // left = (a > 3 AND a < 10) AND (a < 5) -> a > 3 AND a < 5 + assert_eq!(left.to_string(), "a > 3 AND a < 5"); + // right = (a > 3 AND a < 10) AND (a >= 5) -> a >= 5 AND a < 10 + assert_eq!(right.to_string(), "a >= 5 AND a < 10"); + } + + #[test] + fn test_split_base_expr_allows_unrelated_range_columns() { + // R: a > 20 AND b < 20 + let base = col("a") + .gt(Value::Int64(20)) + .and(col("b").lt(Value::Int64(20))); + // S: a < 30 + let split = col("a").lt(Value::Int64(30)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + + // left keeps the unrelated `b < 20` bound while splitting column `a`. + assert_eq!(left.to_string(), "a > 20 AND a < 30 AND b < 20"); + // right also preserves the unrelated column bound. + assert_eq!(right.to_string(), "a >= 30 AND b < 20"); + } + + #[test] + fn test_split_degrade_on_unsupported_type() { + // intentionally excludes boolean from split-able value types. + let base = col("a").eq(Value::Boolean(true)); + let split = col("a").eq(Value::Boolean(true)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_validate_cut_result_with_checker() { + // Original partition set: a < 10, a >= 10 + let original = vec![ + col("a").lt(Value::Int64(10)), + col("a").gt_eq(Value::Int64(10)), + ]; + let left = Some(col("a").lt(Value::Int64(5))); + let right = Some( + col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt(Value::Int64(10))), + ); + + validate_cut_result_with_checker( + &original, + 0, + &left, + &right, + vec!["a".to_string()], + vec![1, 2, 3], + ) + .unwrap(); + } + + #[test] + fn test_split_degrade_on_empty_branch() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: a < 20 + let split = col("a").lt(Value::Int64(20)); + + // right = (a < 10) AND (a >= 20) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_rejects_eq_in_base_expr() { + // R: a = 5 falls outside the range-only base_expr contract. + let base = col("a").eq(Value::Int64(5)); + // S: a < 6 remains a valid range split. + let split = col("a").lt(Value::Int64(6)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_discrete_gap_int() { + // R: a < 5 + let base = col("a").lt(Value::Int64(5)); + // S: a <= 4 + let split = col("a").lt_eq(Value::Int64(4)); + + // right = (a < 5) AND (a > 4) has no integer solution, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_unsupported_date_type() { + // Date is intentionally excluded from split-supported value types. + let base = col("d").lt(Value::Date(5.into())); + let split = col("d").lt_eq(Value::Date(4.into())); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_unsupported_timestamp_type() { + // Timestamp is intentionally excluded from split-supported value types. + let base = col("ts").lt(Value::Timestamp(0.into())); + let split = col("ts").lt_eq(Value::Timestamp(1.into())); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_not_eq_in_split_expr() { + // R: a >= 5 AND a <= 5 + let base = col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt_eq(Value::Int64(5))); + // S: a <> 5 falls outside the range-only split_expr contract. + let split = col("a").not_eq(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_eq_in_split_expr() { + // R: a >= 5 AND a <= 5 + let base = col("a") + .gt_eq(Value::Int64(5)) + .and(col("a").lt_eq(Value::Int64(5))); + // S: a = 5 falls outside the range-only split_expr contract. + let split = col("a").eq(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_degrade_on_uint_one_sided_impossible_upper_bound() { + // R: a < 10 (UInt64 domain) + let base = col("a").lt(Value::UInt64(10)); + // S: a < 0 is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::UInt64(0)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), "a < 0"); + assert_eq!(right.to_string(), "a >= 0 AND a < 10"); + } + + #[test] + fn test_split_degrade_on_uint_one_sided_impossible_lower_bound() { + // R: a < 10 (UInt64 domain) + let base = col("a").lt(Value::UInt64(10)); + // S: a > u64::MAX (impossible on UInt64) + let split = col("a").gt(Value::UInt64(u64::MAX)); + + // left = (a < 10) AND (a > u64::MAX) is unsatisfiable on UInt64, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_int_one_sided_impossible_upper_bound() { + // R: a < 10 (Int64 domain) + let base = col("a").lt(Value::Int64(10)); + // S: a < i64::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Int64(i64::MIN)); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", i64::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", i64::MIN)); + } + + #[test] + fn test_split_degrade_on_int_one_sided_impossible_lower_bound() { + // R: a < 10 (Int64 domain) + let base = col("a").lt(Value::Int64(10)); + // S: a > i64::MAX (impossible on Int64) + let split = col("a").gt(Value::Int64(i64::MAX)); + + // left = (a < 10) AND (a > i64::MAX) is unsatisfiable on Int64, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_string_one_sided_impossible_upper_bound() { + // R: s < "z" (String domain) + let base = col("s").lt(Value::String("z".into())); + // S: s < "" is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("s").lt(Value::String("".into())); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), "s < "); + assert_eq!(right.to_string(), "s >= AND s < z"); + } + + #[test] + fn test_split_degrade_on_float64_one_sided_impossible_upper_bound() { + // R: a < 10.0 (Float64 domain) + let base = col("a").lt(Value::Float64(OrderedFloat(10.0))); + // S: a < f64::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Float64(OrderedFloat(f64::MIN))); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", f64::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f64::MIN)); + } + + #[test] + fn test_split_degrade_on_float64_one_sided_impossible_lower_bound() { + // R: a < 10.0 (Float64 domain) + let base = col("a").lt(Value::Float64(OrderedFloat(10.0))); + // S: a > f64::MAX (impossible with finite-only float policy) + let split = col("a").gt(Value::Float64(OrderedFloat(f64::MAX))); + + // left = (a < 10.0) AND (a > f64::MAX) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_split_degrade_on_float32_one_sided_impossible_upper_bound() { + // R: a < 10.0f32 (Float32 domain) + let base = col("a").lt(Value::Float32(OrderedFloat(10.0))); + // S: a < f32::MIN is still satisfiable by NULL under null-first partition semantics. + // The split keeps a nullable left branch instead of degrading it as empty. + let split = col("a").lt(Value::Float32(OrderedFloat(f32::MIN))); + + let (left, right) = split_partition_expr(base, split).unwrap(); + assert_eq!(left.to_string(), format!("a < {}", f32::MIN)); + assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f32::MIN)); + } + + #[test] + fn test_split_degrade_on_float32_one_sided_impossible_lower_bound() { + // R: a < 10.0f32 (Float32 domain) + let base = col("a").lt(Value::Float32(OrderedFloat(10.0))); + // S: a > f32::MAX (impossible with finite-only float policy) + let split = col("a").gt(Value::Float32(OrderedFloat(f32::MAX))); + + // left = (a < 10.0f32) AND (a > f32::MAX) is unsatisfiable, should degrade. + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch); + } + + #[test] + fn test_simplify_same_upper_bound_prefers_strict() { + // a <= 10 AND a < 10 => a < 10 + let expr = col("a") + .lt_eq(Value::Int64(10)) + .and(col("a").lt(Value::Int64(10))); + + let simplified = simplify_and_bounds(expr); + assert_eq!(simplified.to_string(), "a < 10"); + } + + #[test] + fn test_simplify_same_lower_bound_prefers_strict() { + // a >= 10 AND a > 10 => a > 10 + let expr = col("a") + .gt_eq(Value::Int64(10)) + .and(col("a").gt(Value::Int64(10))); + + let simplified = simplify_and_bounds(expr); + assert_eq!(simplified.to_string(), "a > 10"); + } + + #[test] + fn test_negate_split_expr_demorgan_and() { + // expr: (a < 10) AND (a >= 3) + let expr = col("a") + .lt(Value::Int64(10)) + .and(col("a").gt_eq(Value::Int64(3))); + let not_expr = negate_split_expr(&expr).unwrap(); + // NOT(expr) => (a >= 10) OR (a < 3) + assert_eq!(not_expr.to_string(), "a >= 10 OR a < 3"); + } + + #[test] + fn test_negate_split_expr_demorgan_or() { + // expr: (a = 1) OR (a <> 2) + let expr = PartitionExpr::new( + Operand::Expr(col("a").eq(Value::Int64(1))), + RestrictedOp::Or, + Operand::Expr(col("a").not_eq(Value::Int64(2))), + ); + let not_expr = negate_split_expr(&expr).unwrap(); + // NOT(expr) => (a <> 1) AND (a = 2) + assert_eq!(not_expr.to_string(), "a <> 1 AND a = 2"); + } + + #[test] + fn test_negate_split_expr_invalid_and_operand() { + // malformed AND: rhs is a scalar value, not an Expr subtree. + let malformed = PartitionExpr { + lhs: Box::new(Operand::Expr(col("a").lt(Value::Int64(10)))), + op: RestrictedOp::And, + rhs: Box::new(Operand::Value(Value::Int64(1))), + }; + assert!(negate_split_expr(&malformed).is_err()); + } + + #[test] + fn test_validate_supported_expr_value_column_allowed() { + // Canonicalization can flip to column-value; validator must accept value-column input. + let expr = PartitionExpr::new( + Operand::Value(Value::Int64(10)), + RestrictedOp::Lt, + Operand::Column("a".to_string()), + ); + assert!(validate_supported_expr(&expr).is_ok()); + } + + #[test] + fn test_validate_supported_expr_invalid_atomic_shape() { + // column-column atomic comparison is out of shape. + let expr = PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Eq, + Operand::Column("b".to_string()), + ); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_nan_comparison_rejected() { + // NaN cannot be used in any supported comparison predicate. + let expr = col("a").lt(Value::Float64(OrderedFloat(f64::NAN))); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_infinite_comparison_rejected() { + // Infinity cannot be used in any supported comparison predicate under + // finite-only float policy. + let pos_inf = col("a").gt(Value::Float64(OrderedFloat(f64::INFINITY))); + let neg_inf = col("a").lt(Value::Float32(OrderedFloat(f32::NEG_INFINITY))); + assert!(validate_supported_expr(&pos_inf).is_err()); + assert!(validate_supported_expr(&neg_inf).is_err()); + } + + #[test] + fn test_validate_supported_expr_nan_eq_rejected() { + let expr = col("a").eq(Value::Float64(OrderedFloat(f64::NAN))); + assert!(validate_supported_expr(&expr).is_err()); + } + + #[test] + fn test_validate_supported_expr_infinite_eq_rejected() { + let pos_inf = col("a").eq(Value::Float64(OrderedFloat(f64::INFINITY))); + let neg_inf = col("a").not_eq(Value::Float32(OrderedFloat(f32::NEG_INFINITY))); + assert!(validate_supported_expr(&pos_inf).is_err()); + assert!(validate_supported_expr(&neg_inf).is_err()); + } + + #[test] + fn test_simplify_and_bounds_or_keeps_original() { + // OR tree is intentionally not flattened by AND-only simplifier. + let expr = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr(col("a").gt_eq(Value::Int64(20))), + ); + let simplified = simplify_and_bounds(expr.clone()); + assert_eq!(simplified.to_string(), expr.to_string()); + } + + #[test] + fn test_simplify_and_bounds_keep_stronger_when_weaker_seen_later() { + // upper: stronger bound first, weaker later -> keep stronger (< 5). + let upper = col("a") + .lt(Value::Int64(5)) + .and(col("a").lt(Value::Int64(10))); + assert_eq!(simplify_and_bounds(upper).to_string(), "a < 5"); + + // lower: stronger bound first, weaker later -> keep stronger (> 10). + let lower = col("a") + .gt(Value::Int64(10)) + .and(col("a").gt(Value::Int64(5))); + assert_eq!(simplify_and_bounds(lower).to_string(), "a > 10"); + } + + #[test] + fn test_internal_helpers_uncovered_branches() { + // Empty AND fold should return None. + assert!(fold_and_exprs(vec![]).is_none()); + + // Any OR in tree disables AND-bound simplification path. + let mut out = Vec::new(); + let or_expr = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr(col("a").gt_eq(Value::Int64(20))), + ); + assert!(!collect_and_atoms(&or_expr, &mut out)); + + // value-value atom has no (column, op, value) projection. + let value_value = PartitionExpr::new( + Operand::Value(Value::Int64(1)), + RestrictedOp::Eq, + Operand::Value(Value::Int64(2)), + ); + assert!(atom_col_op_val(&value_value).is_none()); + } + + #[test] + fn test_split_rejects_or_in_base_expr() { + // R: (a < 10) OR (a >= 20 AND a < 30) falls outside the AND-only base_expr contract. + let base = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(10))), + RestrictedOp::Or, + Operand::Expr( + col("a") + .gt_eq(Value::Int64(20)) + .and(col("a").lt(Value::Int64(30))), + ), + ); + // S: a < 5 + let split = col("a").lt(Value::Int64(5)); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } + + #[test] + fn test_split_rejects_or_in_split_expr() { + // R: a < 10 + let base = col("a").lt(Value::Int64(10)); + // S: (a < 5) OR (a >= 8 AND a < 9) falls outside the atomic split_expr contract. + let split = PartitionExpr::new( + Operand::Expr(col("a").lt(Value::Int64(5))), + RestrictedOp::Or, + Operand::Expr( + col("a") + .gt_eq(Value::Int64(8)) + .and(col("a").lt(Value::Int64(9))), + ), + ); + + let result = split_partition_expr(base, split); + assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType); + } +}