diff --git a/src/partition/src/checker.rs b/src/partition/src/checker.rs new file mode 100644 index 0000000000..b616489d48 --- /dev/null +++ b/src/partition/src/checker.rs @@ -0,0 +1,603 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + +use datatypes::arrow::array::{BooleanArray, Float64Array, Float64Builder, RecordBatch}; +use datatypes::arrow::datatypes::{DataType, Field, Schema}; +use datatypes::value::OrderedF64; + +use crate::collider::{Collider, CHECK_STEP, NORMALIZE_STEP}; +use crate::error::{ + CheckpointNotCoveredSnafu, CheckpointOverlappedSnafu, DuplicateExprSnafu, Result, +}; +use crate::expr::{PartitionExpr, RestrictedOp}; +use crate::multi_dim::MultiDimPartitionRule; + +pub struct PartitionChecker<'a> { + rule: &'a MultiDimPartitionRule, + collider: Collider<'a>, +} + +impl<'a> PartitionChecker<'a> { + pub fn try_new(rule: &'a MultiDimPartitionRule) -> Result { + let collider = Collider::new(rule.exprs())?; + Ok(Self { rule, collider }) + } + + pub fn check(&self) -> Result<()> { + self.run()?; + Ok(()) + } +} + +// Logic of checking rules +impl<'a> PartitionChecker<'a> { + fn run(&self) -> Result<()> { + // Sort atomic exprs and check uniqueness + let mut atomic_exprs = BTreeMap::new(); + for expr in self.collider.atomic_exprs.iter() { + let key = &expr.nucleons; + atomic_exprs.insert(key, expr); + } + if atomic_exprs.len() != self.collider.atomic_exprs.len() { + // Find the duplication for error message + for expr in self.collider.atomic_exprs.iter() { + if atomic_exprs.get(&expr.nucleons).unwrap().source_expr_index + != expr.source_expr_index + { + let expr = self.rule.exprs()[expr.source_expr_index].clone(); + return DuplicateExprSnafu { expr }.fail(); + } + } + // Or return a placeholder. This should never happen. + return DuplicateExprSnafu { + expr: PartitionExpr::new( + crate::expr::Operand::Column("unknown".to_string()), + RestrictedOp::Eq, + crate::expr::Operand::Column("expr".to_string()), + ), + } + .fail(); + } + + // TODO(ruihang): merge atomic exprs to improve checker's performance + + // matrix test + let mut matrix_foundation = HashMap::new(); + for (col, values) in self.collider.normalized_values.iter() { + if values.is_empty() { + continue; + } + + let mut cornerstones = Vec::with_capacity(values.len() * 2 + 1); + cornerstones.push(values[0].1 - CHECK_STEP); + for value in values { + cornerstones.push(value.1); + cornerstones.push(value.1 + CHECK_STEP); + } + matrix_foundation.insert(col.as_str(), cornerstones); + } + + // If there are no values, the rule is empty and valid. + if matrix_foundation.is_empty() { + return Ok(()); + } + + let matrix_generator = MatrixGenerator::new(matrix_foundation); + + // Process data in batches using iterator + let mut results = Vec::with_capacity(self.collider.atomic_exprs.len()); + let physical_exprs = self + .collider + .atomic_exprs + .iter() + .map(|expr| expr.to_physical_expr(matrix_generator.schema())) + .collect::>(); + for batch in matrix_generator { + results.clear(); + for physical_expr in &physical_exprs { + let columnar_result = physical_expr.evaluate(&batch).unwrap(); + let array_result = columnar_result.into_array(batch.num_rows()).unwrap(); + results.push(array_result); + } + let boolean_results = results + .iter() + .map(|result| result.as_any().downcast_ref::().unwrap()) + .collect::>(); + + // sum and check results for this batch + for i in 0..batch.num_rows() { + let mut true_count = 0; + for result in boolean_results.iter() { + if result.value(i) { + true_count += 1; + } + } + + if true_count == 0 { + return CheckpointNotCoveredSnafu { + checkpoint: self.remap_checkpoint(i, &batch), + } + .fail(); + } else if true_count > 1 { + return CheckpointOverlappedSnafu { + checkpoint: self.remap_checkpoint(i, &batch), + } + .fail(); + } + } + } + + Ok(()) + } + + /// Remap the normalized checkpoint data to the original values. + fn remap_checkpoint(&self, i: usize, batch: &RecordBatch) -> String { + let normalized_row = batch + .columns() + .iter() + .map(|col| { + let array = col.as_any().downcast_ref::().unwrap(); + array.value(i) + }) + .collect::>(); + + let mut check_point = String::new(); + let schema = batch.schema(); + for (col_index, normalized_value) in normalized_row.iter().enumerate() { + let col_name = schema.field(col_index).name(); + + if col_index > 0 { + check_point.push_str(", "); + } + + // Check if point is on NORMALIZE_STEP or between steps + if let Some(values) = self.collider.normalized_values.get(col_name) { + let normalize_step = NORMALIZE_STEP.0; + + // Check if the normalized value is on a NORMALIZE_STEP boundary + let remainder = normalized_value % normalize_step; + let is_on_step = remainder.abs() < f64::EPSILON + || (normalize_step - remainder).abs() < f64::EPSILON * 2.0; + + if is_on_step { + let index = (normalized_value / normalize_step).round() as usize; + if index < values.len() { + let original_value = &values[index].0; + check_point.push_str(&format!("{}={}", col_name, original_value)); + } else { + check_point.push_str(&format!("{}=unknown", col_name)); + } + } else { + let lower_index = (normalized_value / normalize_step).floor() as usize; + let upper_index = (normalized_value / normalize_step).ceil() as usize; + + // Handle edge cases: value is outside the valid range + if lower_index == upper_index && lower_index == 0 { + // Value is less than the first value + let first_original = &values[0].0; + check_point.push_str(&format!("{}<{}", col_name, first_original)); + } else if upper_index == values.len() { + // Value is greater than the last value + let last_original = &values[values.len() - 1].0; + check_point.push_str(&format!("{}>{}", col_name, last_original)); + } else { + // Normal case: value is between two valid values + let lower_original = if lower_index < values.len() { + values[lower_index].0.to_string() + } else { + "unknown".to_string() + }; + + let upper_original = if upper_index < values.len() { + values[upper_index].0.to_string() + } else { + "unknown".to_string() + }; + + check_point.push_str(&format!( + "{}<{}<{}", + lower_original, col_name, upper_original + )); + } + } + } else { + // Fallback if column not found in normalized values + check_point.push_str(&format!("{}:unknown", col_name)); + } + } + + check_point + } +} + +/// Generates a point matrix that contains permutations of `matrix_foundation`'s values +struct MatrixGenerator { + matrix_foundation: HashMap>, + // Iterator state + current_index: usize, + schema: Schema, + column_names: Vec, + // Preprocessed attributes + /// Total number of combinations of `matrix_foundation`'s values + total_combinations: usize, + /// Biased suffix product of `matrix_foundation`'s values + /// + /// The i-th element is the product of the sizes of all columns after the i-th column. + /// For example, if `matrix_foundation` is `{"a": [1, 2, 3], "b": [4, 5, 6]}`, + /// then `biased_suffix_product` is `[3, 1]`. + biased_suffix_product: Vec, +} + +const MAX_BATCH_SIZE: usize = 8192; + +impl MatrixGenerator { + pub fn new(matrix_foundation: HashMap<&str, Vec>) -> Self { + // Convert to owned HashMap to avoid lifetime issues + let owned_matrix_foundation: HashMap> = matrix_foundation + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .collect(); + + let mut fields = owned_matrix_foundation + .keys() + .map(|k| Field::new(k.clone(), DataType::Float64, false)) + .collect::>(); + fields.sort_unstable(); + let schema = Schema::new(fields.clone()); + + // Store column names in the same order as fields + let column_names: Vec = fields.iter().map(|field| field.name().clone()).collect(); + + // Calculate total number of combinations and suffix product + let mut biased_suffix_product = Vec::with_capacity(column_names.len() + 1); + let mut product = 1; + biased_suffix_product.push(product); + for col_name in column_names.iter().rev() { + product *= owned_matrix_foundation[col_name].len(); + biased_suffix_product.push(product); + } + biased_suffix_product.pop(); + biased_suffix_product.reverse(); + + Self { + matrix_foundation: owned_matrix_foundation, + current_index: 0, + schema, + column_names, + total_combinations: product, + biased_suffix_product, + } + } + + pub fn schema(&self) -> &Schema { + &self.schema + } + + fn generate_batch(&self, start_index: usize, batch_size: usize) -> RecordBatch { + let actual_batch_size = batch_size.min(self.total_combinations - start_index); + + // Create array builders + let mut array_builders: Vec = Vec::with_capacity(self.column_names.len()); + for _ in 0..self.column_names.len() { + array_builders.push(Float64Builder::with_capacity(actual_batch_size)); + } + + // Generate combinations for this batch + for combination_offset in 0..actual_batch_size { + let combination_index = start_index + combination_offset; + + // For each column, determine which value to use for this combination + for (col_idx, col_name) in self.column_names.iter().enumerate() { + let values = &self.matrix_foundation[col_name]; + let stride = self.biased_suffix_product[col_idx]; + let value_index = (combination_index / stride) % values.len(); + let value = *values[value_index].as_ref(); + + array_builders[col_idx].append_value(value); + } + } + + // Finish arrays and create RecordBatch + let arrays: Vec<_> = array_builders + .into_iter() + .map(|mut builder| Arc::new(builder.finish()) as _) + .collect(); + + RecordBatch::try_new(Arc::new(self.schema.clone()), arrays) + .expect("Failed to create RecordBatch from generated arrays") + } +} + +impl Iterator for MatrixGenerator { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + if self.current_index >= self.total_combinations { + return None; + } + + let remaining = self.total_combinations - self.current_index; + let batch_size = remaining.min(MAX_BATCH_SIZE); + + let batch = self.generate_batch(self.current_index, batch_size); + self.current_index += batch_size; + + Some(batch) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use datatypes::value::Value; + + use super::*; + use crate::expr::col; + use crate::multi_dim::MultiDimPartitionRule; + + #[test] + fn test_matrix_generator_single_column() { + let mut matrix_foundation = HashMap::new(); + matrix_foundation.insert( + "col1", + vec![ + OrderedF64::from(1.0), + OrderedF64::from(2.0), + OrderedF64::from(3.0), + ], + ); + + let mut generator = MatrixGenerator::new(matrix_foundation); + let batch = generator.next().unwrap(); + + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "col1"); + + let col1_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col1_array.value(0), 1.0); + assert_eq!(col1_array.value(1), 2.0); + assert_eq!(col1_array.value(2), 3.0); + + // Should be no more batches for such a small dataset + assert!(generator.next().is_none()); + } + + #[test] + fn test_matrix_generator_three_columns_cartesian_product() { + let mut matrix_foundation = HashMap::new(); + matrix_foundation.insert("a", vec![OrderedF64::from(1.0), OrderedF64::from(2.0)]); + matrix_foundation.insert("b", vec![OrderedF64::from(10.0), OrderedF64::from(20.0)]); + matrix_foundation.insert( + "c", + vec![ + OrderedF64::from(100.0), + OrderedF64::from(200.0), + OrderedF64::from(300.0), + ], + ); + + let mut generator = MatrixGenerator::new(matrix_foundation); + let batch = generator.next().unwrap(); + + // Should have 2 * 2 * 3 = 12 combinations + assert_eq!(batch.num_rows(), 12); + assert_eq!(batch.num_columns(), 3); + + let a_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let b_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let c_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + // Verify first few combinations (a changes slowest, c changes fastest) + let expected = vec![ + (1.0, 10.0, 100.0), + (1.0, 10.0, 200.0), + (1.0, 10.0, 300.0), + (1.0, 20.0, 100.0), + (1.0, 20.0, 200.0), + (1.0, 20.0, 300.0), + (2.0, 10.0, 100.0), + (2.0, 10.0, 200.0), + (2.0, 10.0, 300.0), + (2.0, 20.0, 100.0), + (2.0, 20.0, 200.0), + (2.0, 20.0, 300.0), + ]; + #[allow(clippy::needless_range_loop)] + for i in 0..batch.num_rows() { + assert_eq!( + (a_array.value(i), b_array.value(i), c_array.value(i)), + expected[i] + ); + } + + // Should be no more batches for such a small dataset + assert!(generator.next().is_none()); + } + + #[test] + fn test_matrix_generator_iterator_small_batches() { + let mut matrix_foundation = HashMap::new(); + matrix_foundation.insert("col1", vec![OrderedF64::from(1.0), OrderedF64::from(2.0)]); + matrix_foundation.insert( + "col2", + vec![ + OrderedF64::from(10.0), + OrderedF64::from(20.0), + OrderedF64::from(30.0), + ], + ); + + let generator = MatrixGenerator::new(matrix_foundation); + + // Total combinations should be 2 * 3 = 6 + assert_eq!(generator.total_combinations, 6); + + let mut total_rows = 0; + + for batch in generator { + total_rows += batch.num_rows(); + assert_eq!(batch.num_columns(), 2); + + // Verify each batch is valid + assert!(batch.num_rows() > 0); + assert!(batch.num_rows() <= MAX_BATCH_SIZE); + } + + assert_eq!(total_rows, 6); + } + + #[test] + fn test_matrix_generator_empty_column_values() { + let mut matrix_foundation = HashMap::new(); + matrix_foundation.insert("col1", vec![]); + + let mut generator = MatrixGenerator::new(matrix_foundation); + + // Should have 0 total combinations when any column is empty + assert_eq!(generator.total_combinations, 0); + + // Should have no batches when total combinations is 0 + assert!(generator.next().is_none()); + } + + #[test] + fn test_matrix_generator_large_dataset_batching() { + // Create a dataset that will exceed MAX_BATCH_SIZE (8192) + // 20 * 20 * 21 = 8400 > 8192 + let mut matrix_foundation = HashMap::new(); + + let values1: Vec = (0..20).map(|i| OrderedF64::from(i as f64)).collect(); + let values2: Vec = (0..20) + .map(|i| OrderedF64::from(i as f64 + 100.0)) + .collect(); + let values3: Vec = (0..21) + .map(|i| OrderedF64::from(i as f64 + 1000.0)) + .collect(); + + matrix_foundation.insert("col1", values1); + matrix_foundation.insert("col2", values2); + matrix_foundation.insert("col3", values3); + + let generator = MatrixGenerator::new(matrix_foundation); + + assert_eq!(generator.total_combinations, 8400); + + let mut total_rows = 0; + let mut batch_count = 0; + let mut first_batch_size = None; + + for batch in generator { + batch_count += 1; + let batch_size = batch.num_rows(); + total_rows += batch_size; + + if first_batch_size.is_none() { + first_batch_size = Some(batch_size); + } + + // Each batch should not exceed MAX_BATCH_SIZE + assert!(batch_size <= MAX_BATCH_SIZE); + assert_eq!(batch.num_columns(), 3); + } + + assert_eq!(total_rows, 8400); + assert!(batch_count > 1); + assert_eq!(first_batch_size.unwrap(), MAX_BATCH_SIZE); + } + + #[test] + fn test_remap_checkpoint_values() { + // Create rule with single column + let rule = MultiDimPartitionRule::try_new( + vec!["host".to_string(), "value".to_string()], + vec![1, 2, 3], + vec![ + col("host") + .lt(Value::Int64(0)) + .and(col("value").lt(Value::Int64(0))), + col("host") + .lt(Value::Int64(0)) + .and(col("value").gt_eq(Value::Int64(0))), + col("host") + .gt_eq(Value::Int64(0)) + .and(col("host").lt(Value::Int64(1))) + .and(col("value").lt(Value::Int64(1))), + col("host") + .gt_eq(Value::Int64(0)) + .and(col("host").lt(Value::Int64(1))) + .and(col("value").gt_eq(Value::Int64(1))), + col("host") + .gt_eq(Value::Int64(1)) + .and(col("host").lt(Value::Int64(2))) + .and(col("value").lt(Value::Int64(2))), + col("host") + .gt_eq(Value::Int64(1)) + .and(col("host").lt(Value::Int64(2))) + .and(col("value").gt_eq(Value::Int64(2))), + col("host") + .gt_eq(Value::Int64(2)) + .and(col("host").lt(Value::Int64(3))) + .and(col("value").lt(Value::Int64(3))), + col("host") + .gt_eq(Value::Int64(2)) + .and(col("host").lt(Value::Int64(3))) + .and(col("value").gt_eq(Value::Int64(3))), + col("host").gt_eq(Value::Int64(3)), + ], + ) + .unwrap(); + let checker = PartitionChecker::try_new(&rule).unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Float64, false), + Field::new("value", DataType::Float64, false), + ])); + let host_array = Float64Array::from(vec![-0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]); + let value_array = Float64Array::from(vec![-0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]); + let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)]) + .unwrap(); + + let checkpoint = checker.remap_checkpoint(0, &batch); + assert_eq!(checkpoint, "host<0, value<0"); + let checkpoint = checker.remap_checkpoint(1, &batch); + assert_eq!(checkpoint, "host=0, value=0"); + let checkpoint = checker.remap_checkpoint(6, &batch); + assert_eq!(checkpoint, "23, value>3"); + } +} diff --git a/src/partition/src/collider.rs b/src/partition/src/collider.rs index 1004b6dd05..e77afc345d 100644 --- a/src/partition/src/collider.rs +++ b/src/partition/src/collider.rs @@ -27,7 +27,12 @@ use std::collections::HashMap; use std::fmt::Debug; +use std::sync::Arc; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{col, lit, BinaryExpr}; +use datafusion_physical_expr::PhysicalExpr; +use datatypes::arrow::datatypes::Schema; use datatypes::value::{OrderedF64, OrderedFloat, Value}; use crate::error; @@ -35,34 +40,66 @@ use crate::error::Result; use crate::expr::{Operand, PartitionExpr, RestrictedOp}; const ZERO: OrderedF64 = OrderedFloat(0.0f64); -const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64); +pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64); +pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64); /// Represents an "atomic" Expression, which isn't composed (OR-ed) of other expressions. #[allow(unused)] +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct AtomicExpr { /// A (ordered) list of simplified expressions. They are [`RestrictedOp::And`]'ed together. - nucleons: Vec, + pub(crate) nucleons: Vec, /// Index to reference the [`PartitionExpr`] that this [`AtomicExpr`] is derived from. /// This index is used with `exprs` field in [`MultiDimPartitionRule`](crate::multi_dim::MultiDimPartitionRule). - source_expr_index: usize, + pub(crate) source_expr_index: usize, +} + +impl AtomicExpr { + pub fn to_physical_expr(&self, schema: &Schema) -> Arc { + let mut exprs = Vec::with_capacity(self.nucleons.len()); + for nucleon in &self.nucleons { + exprs.push(nucleon.to_physical_expr(schema)); + } + let result: Arc = exprs + .into_iter() + .reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r))) + .unwrap(); + result + } +} + +impl PartialOrd for AtomicExpr { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.nucleons.cmp(&other.nucleons)) + } } /// A simplified expression representation. /// /// This struct is used to compose [`AtomicExpr`], hence "nucleon". -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] -struct NucleonExpr { +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(crate) struct NucleonExpr { column: String, op: GluonOp, /// Normalized [`Value`]. value: OrderedF64, } +impl NucleonExpr { + pub fn to_physical_expr(&self, schema: &Schema) -> Arc { + Arc::new(BinaryExpr::new( + col(&self.column, schema).unwrap(), + self.op.to_operator(), + lit(*self.value.as_ref()), + )) + } +} + /// Further restricted operation set. /// /// Conjunction operations are removed from [`RestrictedOp`]. /// This enumeration is used to bind elements in [`NucleonExpr`], hence "gluon". -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] enum GluonOp { Eq, NotEq, @@ -72,6 +109,19 @@ enum GluonOp { GtEq, } +impl GluonOp { + pub fn to_operator(&self) -> Operator { + match self { + GluonOp::Eq => Operator::Eq, + GluonOp::NotEq => Operator::NotEq, + GluonOp::Lt => Operator::Lt, + GluonOp::LtEq => Operator::LtEq, + GluonOp::Gt => Operator::Gt, + GluonOp::GtEq => Operator::GtEq, + } + } +} + /// Collider is used to collide a list of [`PartitionExpr`] into a list of [`AtomicExpr`] /// /// It also normalizes the values of the columns in the expressions. @@ -79,11 +129,11 @@ enum GluonOp { pub struct Collider<'a> { source_exprs: &'a [PartitionExpr], - atomic_exprs: Vec, + pub(crate) atomic_exprs: Vec, /// A map of column name to a list of `(value, normalized value)` pairs. /// /// The normalized value is used for comparison. The normalization process keeps the order of the values. - normalized_values: HashMap>, + pub(crate) normalized_values: HashMap>, } impl<'a> Collider<'a> { @@ -114,6 +164,10 @@ impl<'a> Collider<'a> { for (index, expr) in source_exprs.iter().enumerate() { Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?; } + // sort nucleon exprs + for expr in &mut atomic_exprs { + expr.nucleons.sort_unstable(); + } // convert normalized values to a map let normalized_values = normalized_values diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 30ae94531d..3e423ed9f0 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -140,21 +140,6 @@ pub enum Error { source: common_meta::error::Error, }, - #[snafu(display("Conjunct expr with non-expr is invalid"))] - ConjunctExprWithNonExpr { - expr: PartitionExpr, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Unclosed value {} on column {}", value, column))] - UnclosedValue { - value: String, - column: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid partition expr: {:?}", expr))] InvalidExpr { expr: PartitionExpr, @@ -235,6 +220,27 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Duplicate expr: {:?}", expr))] + DuplicateExpr { + expr: PartitionExpr, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Checkpoint `{}` is not covered", checkpoint))] + CheckpointNotCovered { + checkpoint: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Checkpoint `{}` is overlapped", checkpoint))] + CheckpointOverlapped { + checkpoint: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -243,11 +249,12 @@ impl ErrorExt for Error { Error::GetCache { .. } => StatusCode::StorageUnavailable, Error::FindLeader { .. } => StatusCode::TableUnavailable, - Error::ConjunctExprWithNonExpr { .. } - | Error::UnclosedValue { .. } - | Error::InvalidExpr { .. } + Error::InvalidExpr { .. } | Error::NoExprOperand { .. } - | Error::UndefinedColumn { .. } => StatusCode::InvalidArguments, + | Error::UndefinedColumn { .. } + | Error::DuplicateExpr { .. } + | Error::CheckpointNotCovered { .. } + | Error::CheckpointOverlapped { .. } => StatusCode::InvalidArguments, Error::RegionKeysSize { .. } | Error::InvalidInsertRequest { .. } diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 307c26af12..41aa55a785 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -16,6 +16,7 @@ #![feature(let_chains)] //! Structs and traits for partitioning rule. +pub mod checker; pub mod collider; pub mod error; pub mod expr; diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index 83c1deab61..45a64df6f4 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -30,10 +30,8 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; -use crate::error::{ - self, ConjunctExprWithNonExprSnafu, InvalidExprSnafu, Result, UnclosedValueSnafu, - UndefinedColumnSnafu, -}; +use crate::checker::PartitionChecker; +use crate::error::{self, Result, UndefinedColumnSnafu}; use crate::expr::{Operand, PartitionExpr, RestrictedOp}; use crate::partition::RegionMask; use crate::PartitionRule; @@ -84,12 +82,16 @@ impl MultiDimPartitionRule { physical_expr_cache: RwLock::new(None), }; - let mut checker = RuleChecker::new(&rule); + let checker = PartitionChecker::try_new(&rule)?; checker.check()?; Ok(rule) } + pub fn exprs(&self) -> &[PartitionExpr] { + &self.exprs + } + fn find_region(&self, values: &[Value]) -> Result { ensure!( values.len() == self.partition_columns.len(), @@ -167,7 +169,7 @@ impl MultiDimPartitionRule { .map(|col_name| { record_batch .column_by_name(col_name) - .context(error::UndefinedColumnSnafu { column: col_name }) + .context(UndefinedColumnSnafu { column: col_name }) .and_then(|array| { Helper::try_into_vector(array).context(error::ConvertToVectorSnafu) }) @@ -341,142 +343,13 @@ impl PartitionRule for MultiDimPartitionRule { } } -/// Helper for [RuleChecker] -type Axis = HashMap; - -/// Helper for [RuleChecker] -struct SplitPoint { - is_equal: bool, - less_than_counter: isize, -} - -/// Check if the rule set covers all the possible values. -/// -/// Note this checker have false-negative on duplicated exprs. E.g.: -/// `a != 20`, `a <= 20` and `a > 20`. -/// -/// It works on the observation that each projected split point should be included (`is_equal`) -/// and have a balanced `<` and `>` counter. -struct RuleChecker<'a> { - axis: Vec, - rule: &'a MultiDimPartitionRule, -} - -impl<'a> RuleChecker<'a> { - pub fn new(rule: &'a MultiDimPartitionRule) -> Self { - let mut projections = Vec::with_capacity(rule.partition_columns.len()); - projections.resize_with(rule.partition_columns.len(), Default::default); - - Self { - axis: projections, - rule, - } - } - - pub fn check(&mut self) -> Result<()> { - for expr in &self.rule.exprs { - self.walk_expr(expr)? - } - - self.check_axis() - } - - #[allow(clippy::mutable_key_type)] - fn walk_expr(&mut self, expr: &PartitionExpr) -> Result<()> { - // recursively check the expr - match expr.op { - RestrictedOp::And | RestrictedOp::Or => { - match (expr.lhs.as_ref(), expr.rhs.as_ref()) { - (Operand::Expr(lhs), Operand::Expr(rhs)) => { - self.walk_expr(lhs)?; - self.walk_expr(rhs)? - } - _ => ConjunctExprWithNonExprSnafu { expr: expr.clone() }.fail()?, - } - - return Ok(()); - } - // Not conjunction - _ => {} - } - - let (col, val) = match (expr.lhs.as_ref(), expr.rhs.as_ref()) { - (Operand::Expr(_), _) - | (_, Operand::Expr(_)) - | (Operand::Column(_), Operand::Column(_)) - | (Operand::Value(_), Operand::Value(_)) => { - InvalidExprSnafu { expr: expr.clone() }.fail()? - } - - (Operand::Column(col), Operand::Value(val)) - | (Operand::Value(val), Operand::Column(col)) => (col, val), - }; - - let col_index = - *self - .rule - .name_to_index - .get(col) - .with_context(|| UndefinedColumnSnafu { - column: col.clone(), - })?; - let axis = &mut self.axis[col_index]; - let split_point = axis.entry(val.clone()).or_insert(SplitPoint { - is_equal: false, - less_than_counter: 0, - }); - match expr.op { - RestrictedOp::Eq => { - split_point.is_equal = true; - } - RestrictedOp::NotEq => { - // less_than +1 -1 - } - RestrictedOp::Lt => { - split_point.less_than_counter += 1; - } - RestrictedOp::LtEq => { - split_point.less_than_counter += 1; - split_point.is_equal = true; - } - RestrictedOp::Gt => { - split_point.less_than_counter -= 1; - } - RestrictedOp::GtEq => { - split_point.less_than_counter -= 1; - split_point.is_equal = true; - } - RestrictedOp::And | RestrictedOp::Or => { - unreachable!("conjunct expr should be handled above") - } - } - - Ok(()) - } - - /// Return if the rule is legal. - fn check_axis(&self) -> Result<()> { - for (col_index, axis) in self.axis.iter().enumerate() { - for (val, split_point) in axis { - if split_point.less_than_counter != 0 || !split_point.is_equal { - UnclosedValueSnafu { - value: format!("{val:?}"), - column: self.rule.partition_columns[col_index].clone(), - } - .fail()?; - } - } - } - Ok(()) - } -} - #[cfg(test)] mod tests { use std::assert_matches::assert_matches; use super::*; use crate::error::{self, Error}; + use crate::expr::col; #[test] fn test_find_region() { @@ -582,7 +455,7 @@ mod tests { ); // check rule - assert_matches!(rule.unwrap_err(), Error::ConjunctExprWithNonExpr { .. }); + assert_matches!(rule.unwrap_err(), Error::InvalidExpr { .. }); } /// ```ignore @@ -617,7 +490,7 @@ mod tests { ); // check rule - assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + assert_matches!(rule.unwrap_err(), Error::CheckpointNotCovered { .. }); } /// ``` @@ -760,7 +633,7 @@ mod tests { ); // check rule - assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + assert_matches!(rule.unwrap_err(), Error::CheckpointNotCovered { .. }); } #[test] @@ -787,11 +660,10 @@ mod tests { ); // check rule - assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. }); + assert_matches!(rule.unwrap_err(), Error::CheckpointOverlapped { .. }); } #[test] - #[ignore = "checker cannot detect this kind of duplicate for now"] fn duplicate_expr_case_2() { // PARTITION ON COLUMNS (a) ( // a != 20, @@ -821,7 +693,35 @@ mod tests { ); // check rule - assert!(rule.is_err()); + assert_matches!(rule.unwrap_err(), Error::CheckpointOverlapped { .. }); + } + + /// ```ignore + /// value + /// │ + /// │ + /// value=10 --------------------│ + /// │ + /// ────────────────────────────────┼──► host + /// │ + /// host=server10 + /// ``` + #[test] + fn test_partial_divided() { + let _rule = MultiDimPartitionRule::try_new( + vec!["host".to_string(), "value".to_string()], + vec![0, 1, 2, 3], + vec![ + col("host") + .lt(Value::String("server10".into())) + .and(col("value").lt(Value::Int64(10))), + col("host") + .lt(Value::String("server10".into())) + .and(col("value").gt_eq(Value::Int64(10))), + col("host").gt_eq(Value::String("server10".into())), + ], + ) + .unwrap(); } } @@ -892,11 +792,10 @@ mod test_split_record_batch { let rule = MultiDimPartitionRule::try_new( vec!["host".to_string()], vec![1], - vec![PartitionExpr::new( - Operand::Column("host".to_string()), - RestrictedOp::Eq, - Operand::Value(Value::String("server1".into())), - )], + vec![ + col("host").lt(Value::String("server1".into())), + col("host").gt_eq(Value::String("server1".into())), + ], ) .unwrap(); @@ -941,118 +840,6 @@ mod test_split_record_batch { } } - #[test] - fn test_default_region() { - let rule = MultiDimPartitionRule::try_new( - vec!["host".to_string(), "value".to_string()], - vec![0, 1, 2, 3], - vec![ - col("host") - .lt(Value::String("server10".into())) - .and(col("value").eq(Value::Int64(10))), - col("host") - .lt(Value::String("server10".into())) - .and(col("value").eq(Value::Int64(20))), - col("host") - .gt_eq(Value::String("server10".into())) - .and(col("value").eq(Value::Int64(10))), - col("host") - .gt_eq(Value::String("server10".into())) - .and(col("value").eq(Value::Int64(20))), - ], - ) - .unwrap(); - - let schema = test_schema(); - let host_array = StringArray::from(vec!["server1", "server1", "server1", "server100"]); - let value_array = Int64Array::from(vec![10, 20, 30, 10]); - let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)]) - .unwrap(); - let result = rule.split_record_batch(&batch).unwrap(); - let expected = rule.split_record_batch_naive(&batch).unwrap(); - for (region, value) in &result { - assert_eq!(value.array(), expected.get(region).unwrap()); - } - } - - #[test] - fn test_default_region_with_unselected_rows() { - // Create a rule where some rows won't match any partition - let rule = MultiDimPartitionRule::try_new( - vec!["host".to_string(), "value".to_string()], - vec![1, 2, 3], - vec![ - col("value").eq(Value::Int64(10)), - col("value").eq(Value::Int64(20)), - col("value").eq(Value::Int64(30)), - ], - ) - .unwrap(); - - let schema = test_schema(); - let host_array = - StringArray::from(vec!["server1", "server2", "server3", "server4", "server5"]); - let value_array = Int64Array::from(vec![10, 20, 30, 40, 50]); - let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)]) - .unwrap(); - - let result = rule.split_record_batch(&batch).unwrap(); - - // Check that we have 4 regions (3 defined + default) - assert_eq!(result.len(), 4); - - // Check that default region (0) contains the unselected rows - assert!(result.contains_key(&DEFAULT_REGION)); - let default_mask = result.get(&DEFAULT_REGION).unwrap(); - - // The default region should have 2 rows (with values 40 and 50) - assert_eq!(default_mask.selected_rows(), 2); - - // Verify each region has the correct number of rows - assert_eq!(result.get(&1).unwrap().selected_rows(), 1); // value = 10 - assert_eq!(result.get(&2).unwrap().selected_rows(), 1); // value = 20 - assert_eq!(result.get(&3).unwrap().selected_rows(), 1); // value = 30 - } - - #[test] - fn test_default_region_with_existing_default() { - // Create a rule where some rows are explicitly assigned to default region - // and some rows are implicitly assigned to default region - let rule = MultiDimPartitionRule::try_new( - vec!["host".to_string(), "value".to_string()], - vec![0, 1, 2], - vec![ - col("value").eq(Value::Int64(10)), // Explicitly assign value=10 to region 0 (default) - col("value").eq(Value::Int64(20)), - col("value").eq(Value::Int64(30)), - ], - ) - .unwrap(); - - let schema = test_schema(); - let host_array = - StringArray::from(vec!["server1", "server2", "server3", "server4", "server5"]); - let value_array = Int64Array::from(vec![10, 20, 30, 40, 50]); - let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)]) - .unwrap(); - - let result = rule.split_record_batch(&batch).unwrap(); - - // Check that we have 3 regions - assert_eq!(result.len(), 3); - - // Check that default region contains both explicitly assigned and unselected rows - assert!(result.contains_key(&DEFAULT_REGION)); - let default_mask = result.get(&DEFAULT_REGION).unwrap(); - - // The default region should have 3 rows (value=10, 40, 50) - assert_eq!(default_mask.selected_rows(), 3); - - // Verify each region has the correct number of rows - assert_eq!(result.get(&1).unwrap().selected_rows(), 1); // value = 20 - assert_eq!(result.get(&2).unwrap().selected_rows(), 1); // value = 30 - } - #[test] fn test_all_rows_selected() { // Test the fast path where all rows are selected by some partition diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result index cdf0f51be5..c5be5aaec5 100644 --- a/tests/cases/standalone/common/partition.result +++ b/tests/cases/standalone/common/partition.result @@ -171,7 +171,7 @@ DROP TABLE my_table; Affected Rows: 0 --- incorrect partition rule +-- incorrect partition rules CREATE TABLE invalid_rule ( a INT PRIMARY KEY, b STRING, @@ -183,7 +183,52 @@ PARTITION ON COLUMNS (a) ( a >= 20 ); -Error: 1004(InvalidArguments), Unclosed value Int32(10) on column a +Error: 1004(InvalidArguments), Checkpoint `a=10` is not covered + +CREATE TABLE invalid_rule2 ( + a INT, + b STRING PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (b) ( + b < 'abc', + b >= 'abca' AND b < 'o', + b >= 'o', +); + +Error: 1004(InvalidArguments), Checkpoint `b=abc` is not covered + +CREATE TABLE invalid_rule3 ( + a INT, + b STRING PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (b) ( + b >= 'a', + b <= 'o', +); + +Error: 1004(InvalidArguments), Checkpoint `b=a` is overlapped + +CREATE TABLE valid_rule ( + a INT, + b STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (a, b) +) +PARTITION ON COLUMNS (a, b) ( + a < 10, + a = 10 AND b < 'a', + a = 10 AND b >= 'a' AND b < 'o', + a = 10 AND b >= 'o', + a > 10, +); + +Affected Rows: 0 + +DROP TABLE valid_rule; + +Affected Rows: 0 -- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247 -- Partition rule with unary operator diff --git a/tests/cases/standalone/common/partition.sql b/tests/cases/standalone/common/partition.sql index c65f2c9e97..dc305e99dd 100644 --- a/tests/cases/standalone/common/partition.sql +++ b/tests/cases/standalone/common/partition.sql @@ -74,7 +74,7 @@ SELECT * FROM my_table; DROP TABLE my_table; --- incorrect partition rule +-- incorrect partition rules CREATE TABLE invalid_rule ( a INT PRIMARY KEY, b STRING, @@ -86,6 +86,43 @@ PARTITION ON COLUMNS (a) ( a >= 20 ); +CREATE TABLE invalid_rule2 ( + a INT, + b STRING PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (b) ( + b < 'abc', + b >= 'abca' AND b < 'o', + b >= 'o', +); + +CREATE TABLE invalid_rule3 ( + a INT, + b STRING PRIMARY KEY, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (b) ( + b >= 'a', + b <= 'o', +); + +CREATE TABLE valid_rule ( + a INT, + b STRING, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY (a, b) +) +PARTITION ON COLUMNS (a, b) ( + a < 10, + a = 10 AND b < 'a', + a = 10 AND b >= 'a' AND b < 'o', + a = 10 AND b >= 'o', + a > 10, +); + +DROP TABLE valid_rule; + -- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247 -- Partition rule with unary operator CREATE TABLE `molestiAe` (