feat!: point matrix based partition rule checker (#6431)

* bare implementation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* stateful generator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* error report

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix remap checkpoint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use matrix generator as iterator

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* pre-calculate suffix product

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update existing test cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix ut

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2025-07-03 14:50:02 +08:00
committed by Yingwen
parent 7ab9b335a1
commit d8261dda51
7 changed files with 823 additions and 289 deletions

View File

@@ -0,0 +1,603 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use datatypes::arrow::array::{BooleanArray, Float64Array, Float64Builder, RecordBatch};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::value::OrderedF64;
use crate::collider::{Collider, CHECK_STEP, NORMALIZE_STEP};
use crate::error::{
CheckpointNotCoveredSnafu, CheckpointOverlappedSnafu, DuplicateExprSnafu, Result,
};
use crate::expr::{PartitionExpr, RestrictedOp};
use crate::multi_dim::MultiDimPartitionRule;
pub struct PartitionChecker<'a> {
rule: &'a MultiDimPartitionRule,
collider: Collider<'a>,
}
impl<'a> PartitionChecker<'a> {
pub fn try_new(rule: &'a MultiDimPartitionRule) -> Result<Self> {
let collider = Collider::new(rule.exprs())?;
Ok(Self { rule, collider })
}
pub fn check(&self) -> Result<()> {
self.run()?;
Ok(())
}
}
// Logic of checking rules
impl<'a> PartitionChecker<'a> {
fn run(&self) -> Result<()> {
// Sort atomic exprs and check uniqueness
let mut atomic_exprs = BTreeMap::new();
for expr in self.collider.atomic_exprs.iter() {
let key = &expr.nucleons;
atomic_exprs.insert(key, expr);
}
if atomic_exprs.len() != self.collider.atomic_exprs.len() {
// Find the duplication for error message
for expr in self.collider.atomic_exprs.iter() {
if atomic_exprs.get(&expr.nucleons).unwrap().source_expr_index
!= expr.source_expr_index
{
let expr = self.rule.exprs()[expr.source_expr_index].clone();
return DuplicateExprSnafu { expr }.fail();
}
}
// Or return a placeholder. This should never happen.
return DuplicateExprSnafu {
expr: PartitionExpr::new(
crate::expr::Operand::Column("unknown".to_string()),
RestrictedOp::Eq,
crate::expr::Operand::Column("expr".to_string()),
),
}
.fail();
}
// TODO(ruihang): merge atomic exprs to improve checker's performance
// matrix test
let mut matrix_foundation = HashMap::new();
for (col, values) in self.collider.normalized_values.iter() {
if values.is_empty() {
continue;
}
let mut cornerstones = Vec::with_capacity(values.len() * 2 + 1);
cornerstones.push(values[0].1 - CHECK_STEP);
for value in values {
cornerstones.push(value.1);
cornerstones.push(value.1 + CHECK_STEP);
}
matrix_foundation.insert(col.as_str(), cornerstones);
}
// If there are no values, the rule is empty and valid.
if matrix_foundation.is_empty() {
return Ok(());
}
let matrix_generator = MatrixGenerator::new(matrix_foundation);
// Process data in batches using iterator
let mut results = Vec::with_capacity(self.collider.atomic_exprs.len());
let physical_exprs = self
.collider
.atomic_exprs
.iter()
.map(|expr| expr.to_physical_expr(matrix_generator.schema()))
.collect::<Vec<_>>();
for batch in matrix_generator {
results.clear();
for physical_expr in &physical_exprs {
let columnar_result = physical_expr.evaluate(&batch).unwrap();
let array_result = columnar_result.into_array(batch.num_rows()).unwrap();
results.push(array_result);
}
let boolean_results = results
.iter()
.map(|result| result.as_any().downcast_ref::<BooleanArray>().unwrap())
.collect::<Vec<_>>();
// sum and check results for this batch
for i in 0..batch.num_rows() {
let mut true_count = 0;
for result in boolean_results.iter() {
if result.value(i) {
true_count += 1;
}
}
if true_count == 0 {
return CheckpointNotCoveredSnafu {
checkpoint: self.remap_checkpoint(i, &batch),
}
.fail();
} else if true_count > 1 {
return CheckpointOverlappedSnafu {
checkpoint: self.remap_checkpoint(i, &batch),
}
.fail();
}
}
}
Ok(())
}
/// Remap the normalized checkpoint data to the original values.
fn remap_checkpoint(&self, i: usize, batch: &RecordBatch) -> String {
let normalized_row = batch
.columns()
.iter()
.map(|col| {
let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
array.value(i)
})
.collect::<Vec<_>>();
let mut check_point = String::new();
let schema = batch.schema();
for (col_index, normalized_value) in normalized_row.iter().enumerate() {
let col_name = schema.field(col_index).name();
if col_index > 0 {
check_point.push_str(", ");
}
// Check if point is on NORMALIZE_STEP or between steps
if let Some(values) = self.collider.normalized_values.get(col_name) {
let normalize_step = NORMALIZE_STEP.0;
// Check if the normalized value is on a NORMALIZE_STEP boundary
let remainder = normalized_value % normalize_step;
let is_on_step = remainder.abs() < f64::EPSILON
|| (normalize_step - remainder).abs() < f64::EPSILON * 2.0;
if is_on_step {
let index = (normalized_value / normalize_step).round() as usize;
if index < values.len() {
let original_value = &values[index].0;
check_point.push_str(&format!("{}={}", col_name, original_value));
} else {
check_point.push_str(&format!("{}=unknown", col_name));
}
} else {
let lower_index = (normalized_value / normalize_step).floor() as usize;
let upper_index = (normalized_value / normalize_step).ceil() as usize;
// Handle edge cases: value is outside the valid range
if lower_index == upper_index && lower_index == 0 {
// Value is less than the first value
let first_original = &values[0].0;
check_point.push_str(&format!("{}<{}", col_name, first_original));
} else if upper_index == values.len() {
// Value is greater than the last value
let last_original = &values[values.len() - 1].0;
check_point.push_str(&format!("{}>{}", col_name, last_original));
} else {
// Normal case: value is between two valid values
let lower_original = if lower_index < values.len() {
values[lower_index].0.to_string()
} else {
"unknown".to_string()
};
let upper_original = if upper_index < values.len() {
values[upper_index].0.to_string()
} else {
"unknown".to_string()
};
check_point.push_str(&format!(
"{}<{}<{}",
lower_original, col_name, upper_original
));
}
}
} else {
// Fallback if column not found in normalized values
check_point.push_str(&format!("{}:unknown", col_name));
}
}
check_point
}
}
/// Generates a point matrix that contains permutations of `matrix_foundation`'s values
struct MatrixGenerator {
matrix_foundation: HashMap<String, Vec<OrderedF64>>,
// Iterator state
current_index: usize,
schema: Schema,
column_names: Vec<String>,
// Preprocessed attributes
/// Total number of combinations of `matrix_foundation`'s values
total_combinations: usize,
/// Biased suffix product of `matrix_foundation`'s values
///
/// The i-th element is the product of the sizes of all columns after the i-th column.
/// For example, if `matrix_foundation` is `{"a": [1, 2, 3], "b": [4, 5, 6]}`,
/// then `biased_suffix_product` is `[3, 1]`.
biased_suffix_product: Vec<usize>,
}
const MAX_BATCH_SIZE: usize = 8192;
impl MatrixGenerator {
pub fn new(matrix_foundation: HashMap<&str, Vec<OrderedF64>>) -> Self {
// Convert to owned HashMap to avoid lifetime issues
let owned_matrix_foundation: HashMap<String, Vec<OrderedF64>> = matrix_foundation
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect();
let mut fields = owned_matrix_foundation
.keys()
.map(|k| Field::new(k.clone(), DataType::Float64, false))
.collect::<Vec<_>>();
fields.sort_unstable();
let schema = Schema::new(fields.clone());
// Store column names in the same order as fields
let column_names: Vec<String> = fields.iter().map(|field| field.name().clone()).collect();
// Calculate total number of combinations and suffix product
let mut biased_suffix_product = Vec::with_capacity(column_names.len() + 1);
let mut product = 1;
biased_suffix_product.push(product);
for col_name in column_names.iter().rev() {
product *= owned_matrix_foundation[col_name].len();
biased_suffix_product.push(product);
}
biased_suffix_product.pop();
biased_suffix_product.reverse();
Self {
matrix_foundation: owned_matrix_foundation,
current_index: 0,
schema,
column_names,
total_combinations: product,
biased_suffix_product,
}
}
pub fn schema(&self) -> &Schema {
&self.schema
}
fn generate_batch(&self, start_index: usize, batch_size: usize) -> RecordBatch {
let actual_batch_size = batch_size.min(self.total_combinations - start_index);
// Create array builders
let mut array_builders: Vec<Float64Builder> = Vec::with_capacity(self.column_names.len());
for _ in 0..self.column_names.len() {
array_builders.push(Float64Builder::with_capacity(actual_batch_size));
}
// Generate combinations for this batch
for combination_offset in 0..actual_batch_size {
let combination_index = start_index + combination_offset;
// For each column, determine which value to use for this combination
for (col_idx, col_name) in self.column_names.iter().enumerate() {
let values = &self.matrix_foundation[col_name];
let stride = self.biased_suffix_product[col_idx];
let value_index = (combination_index / stride) % values.len();
let value = *values[value_index].as_ref();
array_builders[col_idx].append_value(value);
}
}
// Finish arrays and create RecordBatch
let arrays: Vec<_> = array_builders
.into_iter()
.map(|mut builder| Arc::new(builder.finish()) as _)
.collect();
RecordBatch::try_new(Arc::new(self.schema.clone()), arrays)
.expect("Failed to create RecordBatch from generated arrays")
}
}
impl Iterator for MatrixGenerator {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.total_combinations {
return None;
}
let remaining = self.total_combinations - self.current_index;
let batch_size = remaining.min(MAX_BATCH_SIZE);
let batch = self.generate_batch(self.current_index, batch_size);
self.current_index += batch_size;
Some(batch)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use datatypes::value::Value;
use super::*;
use crate::expr::col;
use crate::multi_dim::MultiDimPartitionRule;
#[test]
fn test_matrix_generator_single_column() {
let mut matrix_foundation = HashMap::new();
matrix_foundation.insert(
"col1",
vec![
OrderedF64::from(1.0),
OrderedF64::from(2.0),
OrderedF64::from(3.0),
],
);
let mut generator = MatrixGenerator::new(matrix_foundation);
let batch = generator.next().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "col1");
let col1_array = batch
.column(0)
.as_any()
.downcast_ref::<datatypes::arrow::array::Float64Array>()
.unwrap();
assert_eq!(col1_array.value(0), 1.0);
assert_eq!(col1_array.value(1), 2.0);
assert_eq!(col1_array.value(2), 3.0);
// Should be no more batches for such a small dataset
assert!(generator.next().is_none());
}
#[test]
fn test_matrix_generator_three_columns_cartesian_product() {
let mut matrix_foundation = HashMap::new();
matrix_foundation.insert("a", vec![OrderedF64::from(1.0), OrderedF64::from(2.0)]);
matrix_foundation.insert("b", vec![OrderedF64::from(10.0), OrderedF64::from(20.0)]);
matrix_foundation.insert(
"c",
vec![
OrderedF64::from(100.0),
OrderedF64::from(200.0),
OrderedF64::from(300.0),
],
);
let mut generator = MatrixGenerator::new(matrix_foundation);
let batch = generator.next().unwrap();
// Should have 2 * 2 * 3 = 12 combinations
assert_eq!(batch.num_rows(), 12);
assert_eq!(batch.num_columns(), 3);
let a_array = batch
.column(0)
.as_any()
.downcast_ref::<datatypes::arrow::array::Float64Array>()
.unwrap();
let b_array = batch
.column(1)
.as_any()
.downcast_ref::<datatypes::arrow::array::Float64Array>()
.unwrap();
let c_array = batch
.column(2)
.as_any()
.downcast_ref::<datatypes::arrow::array::Float64Array>()
.unwrap();
// Verify first few combinations (a changes slowest, c changes fastest)
let expected = vec![
(1.0, 10.0, 100.0),
(1.0, 10.0, 200.0),
(1.0, 10.0, 300.0),
(1.0, 20.0, 100.0),
(1.0, 20.0, 200.0),
(1.0, 20.0, 300.0),
(2.0, 10.0, 100.0),
(2.0, 10.0, 200.0),
(2.0, 10.0, 300.0),
(2.0, 20.0, 100.0),
(2.0, 20.0, 200.0),
(2.0, 20.0, 300.0),
];
#[allow(clippy::needless_range_loop)]
for i in 0..batch.num_rows() {
assert_eq!(
(a_array.value(i), b_array.value(i), c_array.value(i)),
expected[i]
);
}
// Should be no more batches for such a small dataset
assert!(generator.next().is_none());
}
#[test]
fn test_matrix_generator_iterator_small_batches() {
let mut matrix_foundation = HashMap::new();
matrix_foundation.insert("col1", vec![OrderedF64::from(1.0), OrderedF64::from(2.0)]);
matrix_foundation.insert(
"col2",
vec![
OrderedF64::from(10.0),
OrderedF64::from(20.0),
OrderedF64::from(30.0),
],
);
let generator = MatrixGenerator::new(matrix_foundation);
// Total combinations should be 2 * 3 = 6
assert_eq!(generator.total_combinations, 6);
let mut total_rows = 0;
for batch in generator {
total_rows += batch.num_rows();
assert_eq!(batch.num_columns(), 2);
// Verify each batch is valid
assert!(batch.num_rows() > 0);
assert!(batch.num_rows() <= MAX_BATCH_SIZE);
}
assert_eq!(total_rows, 6);
}
#[test]
fn test_matrix_generator_empty_column_values() {
let mut matrix_foundation = HashMap::new();
matrix_foundation.insert("col1", vec![]);
let mut generator = MatrixGenerator::new(matrix_foundation);
// Should have 0 total combinations when any column is empty
assert_eq!(generator.total_combinations, 0);
// Should have no batches when total combinations is 0
assert!(generator.next().is_none());
}
#[test]
fn test_matrix_generator_large_dataset_batching() {
// Create a dataset that will exceed MAX_BATCH_SIZE (8192)
// 20 * 20 * 21 = 8400 > 8192
let mut matrix_foundation = HashMap::new();
let values1: Vec<OrderedF64> = (0..20).map(|i| OrderedF64::from(i as f64)).collect();
let values2: Vec<OrderedF64> = (0..20)
.map(|i| OrderedF64::from(i as f64 + 100.0))
.collect();
let values3: Vec<OrderedF64> = (0..21)
.map(|i| OrderedF64::from(i as f64 + 1000.0))
.collect();
matrix_foundation.insert("col1", values1);
matrix_foundation.insert("col2", values2);
matrix_foundation.insert("col3", values3);
let generator = MatrixGenerator::new(matrix_foundation);
assert_eq!(generator.total_combinations, 8400);
let mut total_rows = 0;
let mut batch_count = 0;
let mut first_batch_size = None;
for batch in generator {
batch_count += 1;
let batch_size = batch.num_rows();
total_rows += batch_size;
if first_batch_size.is_none() {
first_batch_size = Some(batch_size);
}
// Each batch should not exceed MAX_BATCH_SIZE
assert!(batch_size <= MAX_BATCH_SIZE);
assert_eq!(batch.num_columns(), 3);
}
assert_eq!(total_rows, 8400);
assert!(batch_count > 1);
assert_eq!(first_batch_size.unwrap(), MAX_BATCH_SIZE);
}
#[test]
fn test_remap_checkpoint_values() {
// Create rule with single column
let rule = MultiDimPartitionRule::try_new(
vec!["host".to_string(), "value".to_string()],
vec![1, 2, 3],
vec![
col("host")
.lt(Value::Int64(0))
.and(col("value").lt(Value::Int64(0))),
col("host")
.lt(Value::Int64(0))
.and(col("value").gt_eq(Value::Int64(0))),
col("host")
.gt_eq(Value::Int64(0))
.and(col("host").lt(Value::Int64(1)))
.and(col("value").lt(Value::Int64(1))),
col("host")
.gt_eq(Value::Int64(0))
.and(col("host").lt(Value::Int64(1)))
.and(col("value").gt_eq(Value::Int64(1))),
col("host")
.gt_eq(Value::Int64(1))
.and(col("host").lt(Value::Int64(2)))
.and(col("value").lt(Value::Int64(2))),
col("host")
.gt_eq(Value::Int64(1))
.and(col("host").lt(Value::Int64(2)))
.and(col("value").gt_eq(Value::Int64(2))),
col("host")
.gt_eq(Value::Int64(2))
.and(col("host").lt(Value::Int64(3)))
.and(col("value").lt(Value::Int64(3))),
col("host")
.gt_eq(Value::Int64(2))
.and(col("host").lt(Value::Int64(3)))
.and(col("value").gt_eq(Value::Int64(3))),
col("host").gt_eq(Value::Int64(3)),
],
)
.unwrap();
let checker = PartitionChecker::try_new(&rule).unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("host", DataType::Float64, false),
Field::new("value", DataType::Float64, false),
]));
let host_array = Float64Array::from(vec![-0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]);
let value_array = Float64Array::from(vec![-0.5, 0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)])
.unwrap();
let checkpoint = checker.remap_checkpoint(0, &batch);
assert_eq!(checkpoint, "host<0, value<0");
let checkpoint = checker.remap_checkpoint(1, &batch);
assert_eq!(checkpoint, "host=0, value=0");
let checkpoint = checker.remap_checkpoint(6, &batch);
assert_eq!(checkpoint, "2<host<3, 2<value<3");
let checkpoint = checker.remap_checkpoint(7, &batch);
assert_eq!(checkpoint, "host=3, value=3");
let checkpoint = checker.remap_checkpoint(8, &batch);
assert_eq!(checkpoint, "host>3, value>3");
}
}

View File

@@ -27,7 +27,12 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{col, lit, BinaryExpr};
use datafusion_physical_expr::PhysicalExpr;
use datatypes::arrow::datatypes::Schema;
use datatypes::value::{OrderedF64, OrderedFloat, Value};
use crate::error;
@@ -35,34 +40,66 @@ use crate::error::Result;
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
const ZERO: OrderedF64 = OrderedFloat(0.0f64);
const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
/// Represents an "atomic" Expression, which isn't composed (OR-ed) of other expressions.
#[allow(unused)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct AtomicExpr {
/// A (ordered) list of simplified expressions. They are [`RestrictedOp::And`]'ed together.
nucleons: Vec<NucleonExpr>,
pub(crate) nucleons: Vec<NucleonExpr>,
/// Index to reference the [`PartitionExpr`] that this [`AtomicExpr`] is derived from.
/// This index is used with `exprs` field in [`MultiDimPartitionRule`](crate::multi_dim::MultiDimPartitionRule).
source_expr_index: usize,
pub(crate) source_expr_index: usize,
}
impl AtomicExpr {
pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
let mut exprs = Vec::with_capacity(self.nucleons.len());
for nucleon in &self.nucleons {
exprs.push(nucleon.to_physical_expr(schema));
}
let result: Arc<dyn PhysicalExpr> = exprs
.into_iter()
.reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r)))
.unwrap();
result
}
}
impl PartialOrd for AtomicExpr {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.nucleons.cmp(&other.nucleons))
}
}
/// A simplified expression representation.
///
/// This struct is used to compose [`AtomicExpr`], hence "nucleon".
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
struct NucleonExpr {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct NucleonExpr {
column: String,
op: GluonOp,
/// Normalized [`Value`].
value: OrderedF64,
}
impl NucleonExpr {
pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
Arc::new(BinaryExpr::new(
col(&self.column, schema).unwrap(),
self.op.to_operator(),
lit(*self.value.as_ref()),
))
}
}
/// Further restricted operation set.
///
/// Conjunction operations are removed from [`RestrictedOp`].
/// This enumeration is used to bind elements in [`NucleonExpr`], hence "gluon".
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
enum GluonOp {
Eq,
NotEq,
@@ -72,6 +109,19 @@ enum GluonOp {
GtEq,
}
impl GluonOp {
pub fn to_operator(&self) -> Operator {
match self {
GluonOp::Eq => Operator::Eq,
GluonOp::NotEq => Operator::NotEq,
GluonOp::Lt => Operator::Lt,
GluonOp::LtEq => Operator::LtEq,
GluonOp::Gt => Operator::Gt,
GluonOp::GtEq => Operator::GtEq,
}
}
}
/// Collider is used to collide a list of [`PartitionExpr`] into a list of [`AtomicExpr`]
///
/// It also normalizes the values of the columns in the expressions.
@@ -79,11 +129,11 @@ enum GluonOp {
pub struct Collider<'a> {
source_exprs: &'a [PartitionExpr],
atomic_exprs: Vec<AtomicExpr>,
pub(crate) atomic_exprs: Vec<AtomicExpr>,
/// A map of column name to a list of `(value, normalized value)` pairs.
///
/// The normalized value is used for comparison. The normalization process keeps the order of the values.
normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
pub(crate) normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
}
impl<'a> Collider<'a> {
@@ -114,6 +164,10 @@ impl<'a> Collider<'a> {
for (index, expr) in source_exprs.iter().enumerate() {
Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
}
// sort nucleon exprs
for expr in &mut atomic_exprs {
expr.nucleons.sort_unstable();
}
// convert normalized values to a map
let normalized_values = normalized_values

View File

@@ -140,21 +140,6 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Conjunct expr with non-expr is invalid"))]
ConjunctExprWithNonExpr {
expr: PartitionExpr,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unclosed value {} on column {}", value, column))]
UnclosedValue {
value: String,
column: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid partition expr: {:?}", expr))]
InvalidExpr {
expr: PartitionExpr,
@@ -235,6 +220,27 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Duplicate expr: {:?}", expr))]
DuplicateExpr {
expr: PartitionExpr,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Checkpoint `{}` is not covered", checkpoint))]
CheckpointNotCovered {
checkpoint: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Checkpoint `{}` is overlapped", checkpoint))]
CheckpointOverlapped {
checkpoint: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -243,11 +249,12 @@ impl ErrorExt for Error {
Error::GetCache { .. } => StatusCode::StorageUnavailable,
Error::FindLeader { .. } => StatusCode::TableUnavailable,
Error::ConjunctExprWithNonExpr { .. }
| Error::UnclosedValue { .. }
| Error::InvalidExpr { .. }
Error::InvalidExpr { .. }
| Error::NoExprOperand { .. }
| Error::UndefinedColumn { .. } => StatusCode::InvalidArguments,
| Error::UndefinedColumn { .. }
| Error::DuplicateExpr { .. }
| Error::CheckpointNotCovered { .. }
| Error::CheckpointOverlapped { .. } => StatusCode::InvalidArguments,
Error::RegionKeysSize { .. }
| Error::InvalidInsertRequest { .. }

View File

@@ -16,6 +16,7 @@
#![feature(let_chains)]
//! Structs and traits for partitioning rule.
pub mod checker;
pub mod collider;
pub mod error;
pub mod expr;

View File

@@ -30,10 +30,8 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::error::{
self, ConjunctExprWithNonExprSnafu, InvalidExprSnafu, Result, UnclosedValueSnafu,
UndefinedColumnSnafu,
};
use crate::checker::PartitionChecker;
use crate::error::{self, Result, UndefinedColumnSnafu};
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
use crate::partition::RegionMask;
use crate::PartitionRule;
@@ -84,12 +82,16 @@ impl MultiDimPartitionRule {
physical_expr_cache: RwLock::new(None),
};
let mut checker = RuleChecker::new(&rule);
let checker = PartitionChecker::try_new(&rule)?;
checker.check()?;
Ok(rule)
}
pub fn exprs(&self) -> &[PartitionExpr] {
&self.exprs
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
ensure!(
values.len() == self.partition_columns.len(),
@@ -167,7 +169,7 @@ impl MultiDimPartitionRule {
.map(|col_name| {
record_batch
.column_by_name(col_name)
.context(error::UndefinedColumnSnafu { column: col_name })
.context(UndefinedColumnSnafu { column: col_name })
.and_then(|array| {
Helper::try_into_vector(array).context(error::ConvertToVectorSnafu)
})
@@ -341,142 +343,13 @@ impl PartitionRule for MultiDimPartitionRule {
}
}
/// Helper for [RuleChecker]
type Axis = HashMap<Value, SplitPoint>;
/// Helper for [RuleChecker]
struct SplitPoint {
is_equal: bool,
less_than_counter: isize,
}
/// Check if the rule set covers all the possible values.
///
/// Note this checker have false-negative on duplicated exprs. E.g.:
/// `a != 20`, `a <= 20` and `a > 20`.
///
/// It works on the observation that each projected split point should be included (`is_equal`)
/// and have a balanced `<` and `>` counter.
struct RuleChecker<'a> {
axis: Vec<Axis>,
rule: &'a MultiDimPartitionRule,
}
impl<'a> RuleChecker<'a> {
pub fn new(rule: &'a MultiDimPartitionRule) -> Self {
let mut projections = Vec::with_capacity(rule.partition_columns.len());
projections.resize_with(rule.partition_columns.len(), Default::default);
Self {
axis: projections,
rule,
}
}
pub fn check(&mut self) -> Result<()> {
for expr in &self.rule.exprs {
self.walk_expr(expr)?
}
self.check_axis()
}
#[allow(clippy::mutable_key_type)]
fn walk_expr(&mut self, expr: &PartitionExpr) -> Result<()> {
// recursively check the expr
match expr.op {
RestrictedOp::And | RestrictedOp::Or => {
match (expr.lhs.as_ref(), expr.rhs.as_ref()) {
(Operand::Expr(lhs), Operand::Expr(rhs)) => {
self.walk_expr(lhs)?;
self.walk_expr(rhs)?
}
_ => ConjunctExprWithNonExprSnafu { expr: expr.clone() }.fail()?,
}
return Ok(());
}
// Not conjunction
_ => {}
}
let (col, val) = match (expr.lhs.as_ref(), expr.rhs.as_ref()) {
(Operand::Expr(_), _)
| (_, Operand::Expr(_))
| (Operand::Column(_), Operand::Column(_))
| (Operand::Value(_), Operand::Value(_)) => {
InvalidExprSnafu { expr: expr.clone() }.fail()?
}
(Operand::Column(col), Operand::Value(val))
| (Operand::Value(val), Operand::Column(col)) => (col, val),
};
let col_index =
*self
.rule
.name_to_index
.get(col)
.with_context(|| UndefinedColumnSnafu {
column: col.clone(),
})?;
let axis = &mut self.axis[col_index];
let split_point = axis.entry(val.clone()).or_insert(SplitPoint {
is_equal: false,
less_than_counter: 0,
});
match expr.op {
RestrictedOp::Eq => {
split_point.is_equal = true;
}
RestrictedOp::NotEq => {
// less_than +1 -1
}
RestrictedOp::Lt => {
split_point.less_than_counter += 1;
}
RestrictedOp::LtEq => {
split_point.less_than_counter += 1;
split_point.is_equal = true;
}
RestrictedOp::Gt => {
split_point.less_than_counter -= 1;
}
RestrictedOp::GtEq => {
split_point.less_than_counter -= 1;
split_point.is_equal = true;
}
RestrictedOp::And | RestrictedOp::Or => {
unreachable!("conjunct expr should be handled above")
}
}
Ok(())
}
/// Return if the rule is legal.
fn check_axis(&self) -> Result<()> {
for (col_index, axis) in self.axis.iter().enumerate() {
for (val, split_point) in axis {
if split_point.less_than_counter != 0 || !split_point.is_equal {
UnclosedValueSnafu {
value: format!("{val:?}"),
column: self.rule.partition_columns[col_index].clone(),
}
.fail()?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::error::{self, Error};
use crate::expr::col;
#[test]
fn test_find_region() {
@@ -582,7 +455,7 @@ mod tests {
);
// check rule
assert_matches!(rule.unwrap_err(), Error::ConjunctExprWithNonExpr { .. });
assert_matches!(rule.unwrap_err(), Error::InvalidExpr { .. });
}
/// ```ignore
@@ -617,7 +490,7 @@ mod tests {
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
assert_matches!(rule.unwrap_err(), Error::CheckpointNotCovered { .. });
}
/// ```
@@ -760,7 +633,7 @@ mod tests {
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
assert_matches!(rule.unwrap_err(), Error::CheckpointNotCovered { .. });
}
#[test]
@@ -787,11 +660,10 @@ mod tests {
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
assert_matches!(rule.unwrap_err(), Error::CheckpointOverlapped { .. });
}
#[test]
#[ignore = "checker cannot detect this kind of duplicate for now"]
fn duplicate_expr_case_2() {
// PARTITION ON COLUMNS (a) (
// a != 20,
@@ -821,7 +693,35 @@ mod tests {
);
// check rule
assert!(rule.is_err());
assert_matches!(rule.unwrap_err(), Error::CheckpointOverlapped { .. });
}
/// ```ignore
/// value
/// │
/// │
/// value=10 --------------------│
/// │
/// ────────────────────────────────┼──► host
/// │
/// host=server10
/// ```
#[test]
fn test_partial_divided() {
let _rule = MultiDimPartitionRule::try_new(
vec!["host".to_string(), "value".to_string()],
vec![0, 1, 2, 3],
vec![
col("host")
.lt(Value::String("server10".into()))
.and(col("value").lt(Value::Int64(10))),
col("host")
.lt(Value::String("server10".into()))
.and(col("value").gt_eq(Value::Int64(10))),
col("host").gt_eq(Value::String("server10".into())),
],
)
.unwrap();
}
}
@@ -892,11 +792,10 @@ mod test_split_record_batch {
let rule = MultiDimPartitionRule::try_new(
vec!["host".to_string()],
vec![1],
vec![PartitionExpr::new(
Operand::Column("host".to_string()),
RestrictedOp::Eq,
Operand::Value(Value::String("server1".into())),
)],
vec![
col("host").lt(Value::String("server1".into())),
col("host").gt_eq(Value::String("server1".into())),
],
)
.unwrap();
@@ -941,118 +840,6 @@ mod test_split_record_batch {
}
}
#[test]
fn test_default_region() {
let rule = MultiDimPartitionRule::try_new(
vec!["host".to_string(), "value".to_string()],
vec![0, 1, 2, 3],
vec![
col("host")
.lt(Value::String("server10".into()))
.and(col("value").eq(Value::Int64(10))),
col("host")
.lt(Value::String("server10".into()))
.and(col("value").eq(Value::Int64(20))),
col("host")
.gt_eq(Value::String("server10".into()))
.and(col("value").eq(Value::Int64(10))),
col("host")
.gt_eq(Value::String("server10".into()))
.and(col("value").eq(Value::Int64(20))),
],
)
.unwrap();
let schema = test_schema();
let host_array = StringArray::from(vec!["server1", "server1", "server1", "server100"]);
let value_array = Int64Array::from(vec![10, 20, 30, 10]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)])
.unwrap();
let result = rule.split_record_batch(&batch).unwrap();
let expected = rule.split_record_batch_naive(&batch).unwrap();
for (region, value) in &result {
assert_eq!(value.array(), expected.get(region).unwrap());
}
}
#[test]
fn test_default_region_with_unselected_rows() {
// Create a rule where some rows won't match any partition
let rule = MultiDimPartitionRule::try_new(
vec!["host".to_string(), "value".to_string()],
vec![1, 2, 3],
vec![
col("value").eq(Value::Int64(10)),
col("value").eq(Value::Int64(20)),
col("value").eq(Value::Int64(30)),
],
)
.unwrap();
let schema = test_schema();
let host_array =
StringArray::from(vec!["server1", "server2", "server3", "server4", "server5"]);
let value_array = Int64Array::from(vec![10, 20, 30, 40, 50]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)])
.unwrap();
let result = rule.split_record_batch(&batch).unwrap();
// Check that we have 4 regions (3 defined + default)
assert_eq!(result.len(), 4);
// Check that default region (0) contains the unselected rows
assert!(result.contains_key(&DEFAULT_REGION));
let default_mask = result.get(&DEFAULT_REGION).unwrap();
// The default region should have 2 rows (with values 40 and 50)
assert_eq!(default_mask.selected_rows(), 2);
// Verify each region has the correct number of rows
assert_eq!(result.get(&1).unwrap().selected_rows(), 1); // value = 10
assert_eq!(result.get(&2).unwrap().selected_rows(), 1); // value = 20
assert_eq!(result.get(&3).unwrap().selected_rows(), 1); // value = 30
}
#[test]
fn test_default_region_with_existing_default() {
// Create a rule where some rows are explicitly assigned to default region
// and some rows are implicitly assigned to default region
let rule = MultiDimPartitionRule::try_new(
vec!["host".to_string(), "value".to_string()],
vec![0, 1, 2],
vec![
col("value").eq(Value::Int64(10)), // Explicitly assign value=10 to region 0 (default)
col("value").eq(Value::Int64(20)),
col("value").eq(Value::Int64(30)),
],
)
.unwrap();
let schema = test_schema();
let host_array =
StringArray::from(vec!["server1", "server2", "server3", "server4", "server5"]);
let value_array = Int64Array::from(vec![10, 20, 30, 40, 50]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(host_array), Arc::new(value_array)])
.unwrap();
let result = rule.split_record_batch(&batch).unwrap();
// Check that we have 3 regions
assert_eq!(result.len(), 3);
// Check that default region contains both explicitly assigned and unselected rows
assert!(result.contains_key(&DEFAULT_REGION));
let default_mask = result.get(&DEFAULT_REGION).unwrap();
// The default region should have 3 rows (value=10, 40, 50)
assert_eq!(default_mask.selected_rows(), 3);
// Verify each region has the correct number of rows
assert_eq!(result.get(&1).unwrap().selected_rows(), 1); // value = 20
assert_eq!(result.get(&2).unwrap().selected_rows(), 1); // value = 30
}
#[test]
fn test_all_rows_selected() {
// Test the fast path where all rows are selected by some partition

View File

@@ -171,7 +171,7 @@ DROP TABLE my_table;
Affected Rows: 0
-- incorrect partition rule
-- incorrect partition rules
CREATE TABLE invalid_rule (
a INT PRIMARY KEY,
b STRING,
@@ -183,7 +183,52 @@ PARTITION ON COLUMNS (a) (
a >= 20
);
Error: 1004(InvalidArguments), Unclosed value Int32(10) on column a
Error: 1004(InvalidArguments), Checkpoint `a=10` is not covered
CREATE TABLE invalid_rule2 (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (b) (
b < 'abc',
b >= 'abca' AND b < 'o',
b >= 'o',
);
Error: 1004(InvalidArguments), Checkpoint `b=abc` is not covered
CREATE TABLE invalid_rule3 (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (b) (
b >= 'a',
b <= 'o',
);
Error: 1004(InvalidArguments), Checkpoint `b=a` is overlapped
CREATE TABLE valid_rule (
a INT,
b STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (a, b)
)
PARTITION ON COLUMNS (a, b) (
a < 10,
a = 10 AND b < 'a',
a = 10 AND b >= 'a' AND b < 'o',
a = 10 AND b >= 'o',
a > 10,
);
Affected Rows: 0
DROP TABLE valid_rule;
Affected Rows: 0
-- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247
-- Partition rule with unary operator

View File

@@ -74,7 +74,7 @@ SELECT * FROM my_table;
DROP TABLE my_table;
-- incorrect partition rule
-- incorrect partition rules
CREATE TABLE invalid_rule (
a INT PRIMARY KEY,
b STRING,
@@ -86,6 +86,43 @@ PARTITION ON COLUMNS (a) (
a >= 20
);
CREATE TABLE invalid_rule2 (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (b) (
b < 'abc',
b >= 'abca' AND b < 'o',
b >= 'o',
);
CREATE TABLE invalid_rule3 (
a INT,
b STRING PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (b) (
b >= 'a',
b <= 'o',
);
CREATE TABLE valid_rule (
a INT,
b STRING,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY (a, b)
)
PARTITION ON COLUMNS (a, b) (
a < 10,
a = 10 AND b < 'a',
a = 10 AND b >= 'a' AND b < 'o',
a = 10 AND b >= 'o',
a > 10,
);
DROP TABLE valid_rule;
-- Issue https://github.com/GreptimeTeam/greptimedb/issues/4247
-- Partition rule with unary operator
CREATE TABLE `molestiAe` (