feat: check partition rule (#3711)

* feat: check partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy and fmt

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add more tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* correct test comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-04-16 16:13:49 +08:00
committed by GitHub
parent d2e081c1f9
commit 96a40e0300
4 changed files with 486 additions and 17 deletions

View File

@@ -23,6 +23,8 @@ use snafu::{Location, Snafu};
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::TableId;
use crate::expr::PartitionExpr;
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
@@ -126,14 +128,40 @@ pub enum Error {
err_msg: String,
source: common_meta::error::Error,
},
#[snafu(display("Conjunct expr with non-expr is invalid"))]
ConjunctExprWithNonExpr {
expr: PartitionExpr,
location: Location,
},
#[snafu(display("Unclosed value {} on column {}", value, column))]
UnclosedValue {
value: String,
column: String,
location: Location,
},
#[snafu(display("Invalid partition expr: {:?}", expr))]
InvalidExpr {
expr: PartitionExpr,
location: Location,
},
#[snafu(display("Undefined column: {}", column))]
UndefinedColumn { column: String, location: Location },
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::GetCache { .. } | Error::FindLeader { .. } => StatusCode::StorageUnavailable,
Error::FindRegionRoutes { .. } => StatusCode::InvalidArguments,
Error::FindTableRoutes { .. } => StatusCode::InvalidArguments,
Error::FindRegionRoutes { .. }
| Error::ConjunctExprWithNonExpr { .. }
| Error::UnclosedValue { .. }
| Error::InvalidExpr { .. }
| Error::FindTableRoutes { .. }
| Error::UndefinedColumn { .. } => StatusCode::InvalidArguments,
Error::FindRegion { .. }
| Error::FindRegions { .. }
| Error::RegionKeysSize { .. }

View File

@@ -14,6 +14,8 @@
#![feature(assert_matches)]
//! Structs and traits for partitioning rule.
pub mod columns;
pub mod error;
pub mod expr;

View File

@@ -140,11 +140,9 @@ impl PartitionRuleManager {
_ => None,
})
.collect::<Vec<_>>();
Ok(Arc::new(MultiDimPartitionRule::new(
partition_columns.clone(),
regions,
exprs,
)) as _)
let rule = MultiDimPartitionRule::try_new(partition_columns.clone(), regions, exprs)?;
Ok(Arc::new(rule) as _)
}
/// Get partition rule of given table.

View File

@@ -18,40 +18,57 @@ use std::collections::HashMap;
use datatypes::prelude::Value;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use snafu::{ensure, OptionExt};
use store_api::storage::RegionNumber;
use crate::error::{self, Result};
use crate::error::{
self, ConjunctExprWithNonExprSnafu, InvalidExprSnafu, Result, UnclosedValueSnafu,
UndefinedColumnSnafu,
};
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
use crate::PartitionRule;
/// Multi-Dimiension partition rule. RFC [here](https://github.com/GreptimeTeam/greptimedb/blob/main/docs/rfcs/2024-02-21-multi-dimension-partition-rule/rfc.md)
///
/// This partition rule is defined by a set of simple expressions on the partition
/// key columns. Compare to RANGE partition, which can be considered as
/// single-dimension rule, this will evaluate expression on each column separately.
#[derive(Debug, Serialize, Deserialize)]
pub struct MultiDimPartitionRule {
/// Allow list of which columns can be used for partitioning.
partition_columns: Vec<String>,
// name to index of `partition_columns`
/// Name to index of `partition_columns`. Used for quick lookup.
name_to_index: HashMap<String, usize>,
/// Region number for each partition. This list has the same length as `exprs`
/// (dispiting the default region).
regions: Vec<RegionNumber>,
/// Partition expressions.
exprs: Vec<PartitionExpr>,
}
impl MultiDimPartitionRule {
pub fn new(
pub fn try_new(
partition_columns: Vec<String>,
regions: Vec<RegionNumber>,
exprs: Vec<PartitionExpr>,
) -> Self {
) -> Result<Self> {
let name_to_index = partition_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect::<HashMap<_, _>>();
Self {
let rule = Self {
partition_columns,
name_to_index,
regions,
exprs,
}
};
let mut checker = RuleChecker::new(&rule);
checker.check(&rule)?;
Ok(rule)
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
@@ -140,12 +157,142 @@ impl PartitionRule for MultiDimPartitionRule {
}
}
/// Helper for [RuleChecker]
type Axis = HashMap<Value, SplitPoint>;
/// Helper for [RuleChecker]
struct SplitPoint {
is_equal: bool,
less_than_counter: isize,
}
/// Check if the rule set covers all the possible values.
///
/// Note this checker have false-negative on duplicated exprs. E.g.:
/// `a != 20`, `a <= 20` and `a > 20`.
///
/// It works on the observation that each projected split point should be included (`is_equal`)
/// and have a balanced `<` and `>` counter.
struct RuleChecker<'a> {
axis: Vec<Axis>,
rule: &'a MultiDimPartitionRule,
}
impl<'a> RuleChecker<'a> {
pub fn new(rule: &'a MultiDimPartitionRule) -> Self {
let mut projections = Vec::with_capacity(rule.partition_columns.len());
projections.resize_with(rule.partition_columns.len(), Default::default);
Self {
axis: projections,
rule,
}
}
pub fn check(&mut self, rule: &MultiDimPartitionRule) -> Result<()> {
for expr in &rule.exprs {
self.walk_expr(expr)?
}
self.check_axis()
}
#[allow(clippy::mutable_key_type)]
fn walk_expr(&mut self, expr: &PartitionExpr) -> Result<()> {
// recursively check the expr
match expr.op {
RestrictedOp::And | RestrictedOp::Or => {
match (expr.lhs.as_ref(), expr.rhs.as_ref()) {
(Operand::Expr(lhs), Operand::Expr(rhs)) => {
self.walk_expr(lhs)?;
self.walk_expr(rhs)?
}
_ => ConjunctExprWithNonExprSnafu { expr: expr.clone() }.fail()?,
}
return Ok(());
}
// Not conjunction
_ => {}
}
let (col, val) = match (expr.lhs.as_ref(), expr.rhs.as_ref()) {
(Operand::Expr(_), _)
| (_, Operand::Expr(_))
| (Operand::Column(_), Operand::Column(_))
| (Operand::Value(_), Operand::Value(_)) => {
InvalidExprSnafu { expr: expr.clone() }.fail()?
}
(Operand::Column(col), Operand::Value(val))
| (Operand::Value(val), Operand::Column(col)) => (col, val),
};
let col_index =
*self
.rule
.name_to_index
.get(col)
.with_context(|| UndefinedColumnSnafu {
column: col.clone(),
})?;
let axis = &mut self.axis[col_index];
let split_point = axis.entry(val.clone()).or_insert(SplitPoint {
is_equal: false,
less_than_counter: 0,
});
match expr.op {
RestrictedOp::Eq => {
split_point.is_equal = true;
}
RestrictedOp::NotEq => {
// less_than +1 -1
}
RestrictedOp::Lt => {
split_point.less_than_counter += 1;
}
RestrictedOp::LtEq => {
split_point.less_than_counter += 1;
split_point.is_equal = true;
}
RestrictedOp::Gt => {
split_point.less_than_counter -= 1;
}
RestrictedOp::GtEq => {
split_point.less_than_counter -= 1;
split_point.is_equal = true;
}
RestrictedOp::And | RestrictedOp::Or => {
unreachable!("conjunct expr should be handled above")
}
}
Ok(())
}
/// Return if the rule is legal.
fn check_axis(&self) -> Result<()> {
for (col_index, axis) in self.axis.iter().enumerate() {
for (val, split_point) in axis {
if split_point.less_than_counter != 0 || !split_point.is_equal {
UnclosedValueSnafu {
value: format!("{val:?}"),
column: self.rule.partition_columns[col_index].clone(),
}
.fail()?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::error;
use crate::error::{self, Error};
#[test]
fn test_find_region() {
@@ -154,7 +301,7 @@ mod tests {
// b >= 'hz' AND b < 'sh',
// b >= 'sh'
// )
let rule = MultiDimPartitionRule::new(
let rule = MultiDimPartitionRule::try_new(
vec!["b".to_string()],
vec![1, 2, 3],
vec![
@@ -182,7 +329,8 @@ mod tests {
Operand::Value(datatypes::value::Value::String("sh".into())),
),
],
);
)
.unwrap();
assert_matches!(
rule.find_region(&["foo".into(), 1000_i32.into()]),
Err(error::Error::RegionKeysSize {
@@ -198,4 +346,297 @@ mod tests {
assert_matches!(rule.find_region(&["sh".into()]), Ok(3));
assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3));
}
#[test]
fn invalid_expr_case_1() {
// PARTITION ON COLUMNS (b) (
// b <= b >= 'hz' AND b < 'sh',
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1],
vec![PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::String("hz".into())),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::String("sh".into())),
)),
)),
)],
);
// check rule
assert_matches!(rule.unwrap_err(), Error::InvalidExpr { .. });
}
#[test]
fn invalid_expr_case_2() {
// PARTITION ON COLUMNS (b) (
// b >= 'hz' AND 'sh',
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1],
vec![PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::String("hz".into())),
)),
RestrictedOp::And,
Operand::Value(datatypes::value::Value::String("sh".into())),
)],
);
// check rule
assert_matches!(rule.unwrap_err(), Error::ConjunctExprWithNonExpr { .. });
}
/// ```ignore
/// │ │
/// │ │
/// ─────────┼──────────┼────────────► b
/// │ │
/// │ │
/// b <= h b >= s
/// ```
#[test]
fn empty_expr_case_1() {
// PARTITION ON COLUMNS (b) (
// b <= 'h',
// b >= 's'
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1, 2],
vec![
PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::String("h".into())),
),
PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::String("s".into())),
),
],
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
}
/// ```
/// a
/// ▲
/// │ ‖
/// │ ‖
/// 200 │ ┌─────────┤
/// │ │ │
/// │ │ │
/// │ │ │
/// 100 │ ======┴─────────┘
/// │
/// └──────────────────────────►b
/// 10 20
/// ```
#[test]
fn empty_expr_case_2() {
// PARTITION ON COLUMNS (b) (
// a >= 100 AND b <= 10 OR a > 100 AND a <= 200 AND b <= 10 OR a >= 200 AND b > 10 AND b <= 20 OR a > 200 AND b <= 20
// a < 100 AND b <= 20 OR a >= 100 AND b > 20
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1, 2],
vec![
PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
// a >= 100 AND b <= 10
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int64(100)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(10)),
)),
)),
RestrictedOp::Or,
// a > 100 AND a <= 200 AND b <= 10
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Gt,
Operand::Value(datatypes::value::Value::Int64(100)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(200)),
)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(10)),
)),
)),
)),
RestrictedOp::Or,
// a >= 200 AND b > 10 AND b <= 20
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int64(200)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::Gt,
Operand::Value(datatypes::value::Value::Int64(10)),
)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
)),
)),
)),
RestrictedOp::Or,
// a > 200 AND b <= 20
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Gt,
Operand::Value(datatypes::value::Value::Int64(200)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
)),
)),
),
PartitionExpr::new(
// a < 100 AND b <= 20
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::Int64(100)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
)),
)),
RestrictedOp::Or,
// a >= 100 AND b > 20
Operand::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int64(100)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
)),
)),
),
],
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
}
#[test]
fn duplicate_expr_case_1() {
// PARTITION ON COLUMNS (a) (
// a <= 20,
// a >= 10
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1, 2],
vec![
PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
),
PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int64(10)),
),
],
);
// check rule
assert_matches!(rule.unwrap_err(), Error::UnclosedValue { .. });
}
#[test]
#[ignore = "checker cannot detect this kind of duplicate for now"]
fn duplicate_expr_case_2() {
// PARTITION ON COLUMNS (a) (
// a != 20,
// a <= 20,
// a > 20,
// )
let rule = MultiDimPartitionRule::try_new(
vec!["a".to_string(), "b".to_string()],
vec![1, 2],
vec![
PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::NotEq,
Operand::Value(datatypes::value::Value::Int64(20)),
),
PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(datatypes::value::Value::Int64(20)),
),
PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Gt,
Operand::Value(datatypes::value::Value::Int64(20)),
),
],
);
// check rule
assert!(rule.is_err());
}
}