feat: range columns partitioning rule (#374)

* feat: parse partition syntax in "create table"

* feat: partition rule

* fix: rebase develop

* feat: range partitioning rule

* fix: resolve PR comments

* feat: range columns partitioning rule

* refactor: remove unused codes

* fix: resolve PR comments

* fix: resolve PR comments

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-11-02 22:36:32 +08:00
committed by GitHub
parent 6f1f697bfc
commit 5abff7a536
4 changed files with 415 additions and 9 deletions

View File

@@ -1,3 +1,4 @@
mod columns;
mod range;
use std::fmt::Debug;
@@ -6,8 +7,6 @@ pub use datafusion_expr::Operator;
use datatypes::prelude::Value;
use store_api::storage::RegionId;
pub(crate) type ValueList = Vec<Value>;
pub trait PartitionRule {
type Error: Debug;
@@ -15,11 +14,20 @@ pub trait PartitionRule {
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
// Or find better names since one is mainly for writes and the other is for reads.
fn find_region(&self, values: &ValueList) -> Result<RegionId, Self::Error>;
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error>;
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error>;
}
/// The right bound(exclusive) of partition range.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
enum PartitionBound {
Value(Value),
// FIXME(LFC): no allow, for clippy temporarily
#[allow(dead_code)]
MaxValue,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PartitionExpr {
column: String,
@@ -32,3 +40,17 @@ impl PartitionExpr {
&self.value
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_partition_bound() {
let b1 = PartitionBound::Value(1_i32.into());
let b2 = PartitionBound::Value(100_i32.into());
let b3 = PartitionBound::MaxValue;
assert!(b1 < b2);
assert!(b2 < b3);
}
}

View File

@@ -0,0 +1,382 @@
use datafusion_expr::Operator;
use datatypes::value::Value;
use snafu::ensure;
use crate::error::{self, Error};
use crate::partitioning::{PartitionBound, PartitionExpr, PartitionRule, RegionId};
/// A [RangeColumnsPartitionRule] is very similar to [RangePartitionRule] except that it allows
/// partitioning by multiple columns.
///
/// This rule is generated from create table request, using MySQL's syntax:
///
/// ```SQL
/// CREATE TABLE table_name (
/// columns definition
/// )
/// PARTITION BY RANGE COLUMNS(column_list) (
/// PARTITION region_name VALUES LESS THAN (value_list)[,
/// PARTITION region_name VALUES LESS THAN (value_list)][,
/// ...]
/// )
///
/// column_list:
/// column_name[, column_name][, ...]
///
/// value_list:
/// value[, value][, ...]
/// ```
///
/// Please refer to MySQL's ["RANGE COLUMNS Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html)
/// document for more details.
struct RangeColumnsPartitionRule {
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionId>,
// TODO(LFC): Implement finding regions by all partitioning columns, not by the first one only.
// Singled out the first partitioning column's bounds for finding regions by range.
//
// Theoretically, finding regions in `value_list`s should use all the partition columns values
// as a whole in the comparison (think of how Rust's vector is compared to each other). And
// this is how we do it if provided with concrete values (see `find_region` method).
//
// However, when we need to find regions by range, for example, a filter of "x < 100" defined
// in SQL, currently I'm not quite sure how that could be implemented. Especially facing the complex
// filter expression like "a < 1 AND (b > 2 OR c != 3)".
//
// So I decided to use the first partitioning column temporarily in finding regions by range,
// and further investigate how MySQL (and others) implemented this feature in detail.
//
// Finding regions only using the first partitioning column is fine. It might return regions that
// actually do not contain the range's value (causing unnecessary table scans), but will
// not lose any data that should have been scanned.
//
// The following two fields are acted as caches, so we don't need to recalculate them every time.
first_column_bounds: Vec<PartitionBound>,
first_column_regions: Vec<Vec<RegionId>>,
}
impl RangeColumnsPartitionRule {
// It's assured that input arguments are valid because they are checked in SQL parsing stage.
// So we can skip validating them.
// FIXME(LFC): no allow, for clippy temporarily
#[allow(dead_code)]
fn new(
column_list: Vec<String>,
value_lists: Vec<Vec<PartitionBound>>,
regions: Vec<RegionId>,
) -> Self {
let first_column_bounds = value_lists
.iter()
.map(|x| &x[0])
.collect::<Vec<&PartitionBound>>();
let mut distinct_bounds = Vec::<PartitionBound>::new();
distinct_bounds.push(first_column_bounds[0].clone());
let mut first_column_regions = Vec::<Vec<RegionId>>::new();
first_column_regions.push(vec![regions[0]]);
for i in 1..first_column_bounds.len() {
if first_column_bounds[i] == &distinct_bounds[distinct_bounds.len() - 1] {
first_column_regions[distinct_bounds.len() - 1].push(regions[i]);
} else {
distinct_bounds.push(first_column_bounds[i].clone());
first_column_regions.push(vec![regions[i]]);
}
}
Self {
column_list,
value_lists,
regions,
first_column_bounds: distinct_bounds,
first_column_regions,
}
}
}
impl PartitionRule for RangeColumnsPartitionRule {
type Error = Error;
fn partition_columns(&self) -> Vec<String> {
self.column_list.clone()
}
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error> {
ensure!(
values.len() == self.column_list.len(),
error::RegionKeysSizeSnafu {
expect: self.column_list.len(),
actual: values.len(),
}
);
// How tuple is compared:
// (a, b) < (x, y) <= (a < x) || ((a == x) && (b < y))
let values = values
.iter()
.map(|v| PartitionBound::Value(v.clone()))
.collect::<Vec<PartitionBound>>();
Ok(match self.value_lists.binary_search(&values) {
Ok(i) => self.regions[i + 1],
Err(i) => self.regions[i],
})
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
let regions = if exprs.iter().all(|x| self.column_list.contains(&x.column)) {
let PartitionExpr {
column: _,
op,
value,
} = exprs
.iter()
.find(|x| x.column == self.column_list[0])
// "unwrap" is safe because we have checked that "self.column_list" contains all columns in "exprs"
.unwrap();
// an example of bounds and regions:
// SQL:
// PARTITION p1 VALUES LESS THAN (10, 'c'),
// PARTITION p2 VALUES LESS THAN (20, 'h'),
// PARTITION p3 VALUES LESS THAN (20, 'm'),
// PARTITION p4 VALUES LESS THAN (50, 'p'),
// PARTITION p5 VALUES LESS THAN (MAXVALUE, 'x'),
// PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE),
// bounds: [10, 20, 50, MAXVALUE]
// regions: [[1], [2, 3], [4], [5, 6]]
let regions = &self.first_column_regions;
match self
.first_column_bounds
.binary_search(&PartitionBound::Value(value.clone()))
{
Ok(i) => match op {
Operator::Lt => &regions[..=i],
Operator::LtEq => &regions[..=(i + 1)],
Operator::Eq => &regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &regions[(i + 1)..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
Err(i) => match op {
Operator::Lt | Operator::LtEq => &regions[..=i],
Operator::Eq => &regions[i..=i],
Operator::Gt | Operator::GtEq => &regions[i..],
Operator::NotEq => &regions[..],
_ => unimplemented!(),
},
}
.iter()
.flatten()
.cloned()
.collect::<Vec<RegionId>>()
} else {
self.regions.clone()
};
Ok(regions)
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
#[test]
fn test_find_regions() {
// PARTITION BY RANGE COLUMNS(a, b)
// PARTITION p1 VALUES LESS THAN ('hz', 10),
// PARTITION p2 VALUES LESS THAN ('sh', 20),
// PARTITION p3 VALUES LESS THAN ('sh', 50),
// PARTITION p4 VALUES LESS THAN ('sz', 100),
// PARTITION p5 VALUES LESS THAN (MAXVALUE, 200),
// PARTITION p6 VALUES LESS THAN (MAXVALUE, MAXVALUE),
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string(), "b".to_string()],
vec![
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(10_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(20_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(50_i32.into()),
],
vec![
PartitionBound::Value("sz".into()),
PartitionBound::Value(100_i32.into()),
],
vec![
PartitionBound::MaxValue,
PartitionBound::Value(200_i32.into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
],
vec![1, 2, 3, 4, 5, 6],
);
let test = |op: Operator, value: &str, expected_regions: Vec<u64>| {
let exprs = vec![
// Intentionally fix column b's partition expr to "b < 1". If we support finding
// regions by both columns("a" and "b") in the future, some test cases should fail.
PartitionExpr {
column: "b".to_string(),
op: Operator::Lt,
value: 1_i32.into(),
},
PartitionExpr {
column: "a".to_string(),
op,
value: value.into(),
},
];
let regions = rule.find_regions(&exprs).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionId>>()
);
};
test(Operator::NotEq, "hz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::NotEq, "what", vec![1, 2, 3, 4, 5, 6]);
test(Operator::GtEq, "ab", vec![1, 2, 3, 4, 5, 6]);
test(Operator::GtEq, "hz", vec![2, 3, 4, 5, 6]);
test(Operator::GtEq, "ijk", vec![2, 3, 4, 5, 6]);
test(Operator::GtEq, "sh", vec![4, 5, 6]);
test(Operator::GtEq, "ssh", vec![4, 5, 6]);
test(Operator::GtEq, "sz", vec![5, 6]);
test(Operator::GtEq, "zz", vec![5, 6]);
test(Operator::Gt, "ab", vec![1, 2, 3, 4, 5, 6]);
test(Operator::Gt, "hz", vec![2, 3, 4, 5, 6]);
test(Operator::Gt, "ijk", vec![2, 3, 4, 5, 6]);
test(Operator::Gt, "sh", vec![4, 5, 6]);
test(Operator::Gt, "ssh", vec![4, 5, 6]);
test(Operator::Gt, "sz", vec![5, 6]);
test(Operator::Gt, "zz", vec![5, 6]);
test(Operator::Eq, "ab", vec![1]);
test(Operator::Eq, "hz", vec![2, 3]);
test(Operator::Eq, "ijk", vec![2, 3]);
test(Operator::Eq, "sh", vec![4]);
test(Operator::Eq, "ssh", vec![4]);
test(Operator::Eq, "sz", vec![5, 6]);
test(Operator::Eq, "zz", vec![5, 6]);
test(Operator::Lt, "ab", vec![1]);
test(Operator::Lt, "hz", vec![1]);
test(Operator::Lt, "ijk", vec![1, 2, 3]);
test(Operator::Lt, "sh", vec![1, 2, 3]);
test(Operator::Lt, "ssh", vec![1, 2, 3, 4]);
test(Operator::Lt, "sz", vec![1, 2, 3, 4]);
test(Operator::Lt, "zz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::LtEq, "ab", vec![1]);
test(Operator::LtEq, "hz", vec![1, 2, 3]);
test(Operator::LtEq, "ijk", vec![1, 2, 3]);
test(Operator::LtEq, "sh", vec![1, 2, 3, 4]);
test(Operator::LtEq, "ssh", vec![1, 2, 3, 4]);
test(Operator::LtEq, "sz", vec![1, 2, 3, 4, 5, 6]);
test(Operator::LtEq, "zz", vec![1, 2, 3, 4, 5, 6]);
// If trying to find regions that is not partitioning column, return all regions.
let exprs = vec![
PartitionExpr {
column: "c".to_string(),
op: Operator::Lt,
value: 1_i32.into(),
},
PartitionExpr {
column: "a".to_string(),
op: Operator::Lt,
value: "hz".into(),
},
];
let regions = rule.find_regions(&exprs).unwrap();
assert_eq!(regions, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_find_region() {
// PARTITION BY RANGE COLUMNS(a) (
// PARTITION r1 VALUES LESS THAN ('hz'),
// PARTITION r2 VALUES LESS THAN ('sh'),
// PARTITION r3 VALUES LESS THAN (MAXVALUE),
// )
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string()],
vec![
vec![PartitionBound::Value("hz".into())],
vec![PartitionBound::Value("sh".into())],
vec![PartitionBound::MaxValue],
],
vec![1, 2, 3],
);
assert_matches!(
rule.find_region(&["foo".into(), 1000_i32.into()]),
Err(error::Error::RegionKeysSize {
expect: 1,
actual: 2,
..
})
);
assert_matches!(rule.find_region(&["foo".into()]), Ok(1));
assert_matches!(rule.find_region(&["bar".into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into()]), Ok(2));
assert_matches!(rule.find_region(&["hzz".into()]), Ok(2));
assert_matches!(rule.find_region(&["sh".into()]), Ok(3));
assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3));
// PARTITION BY RANGE COLUMNS(a, b) (
// PARTITION r1 VALUES LESS THAN ('hz', 10),
// PARTITION r2 VALUES LESS THAN ('hz', 20),
// PARTITION r3 VALUES LESS THAN ('sh', 50),
// PARTITION r4 VALUES LESS THAN (MAXVALUE, MAXVALUE),
// )
let rule = RangeColumnsPartitionRule::new(
vec!["a".to_string(), "b".to_string()],
vec![
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(10_i32.into()),
],
vec![
PartitionBound::Value("hz".into()),
PartitionBound::Value(20_i32.into()),
],
vec![
PartitionBound::Value("sh".into()),
PartitionBound::Value(50_i32.into()),
],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
],
vec![1, 2, 3, 4],
);
assert_matches!(
rule.find_region(&["foo".into()]),
Err(error::Error::RegionKeysSize {
expect: 2,
actual: 1,
..
})
);
assert_matches!(rule.find_region(&["foo".into(), 1_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["bar".into(), 11_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into(), 2_i32.into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into(), 12_i32.into()]), Ok(2));
assert_matches!(rule.find_region(&["hz".into(), 22_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hz".into(), 999_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hzz".into(), 1_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["hzz".into(), 999_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["sh".into(), 49_i32.into()]), Ok(3));
assert_matches!(rule.find_region(&["sh".into(), 50_i32.into()]), Ok(4));
assert_matches!(rule.find_region(&["zzz".into(), 1_i32.into()]), Ok(4));
}
}

View File

@@ -2,7 +2,7 @@ use datatypes::prelude::*;
use snafu::OptionExt;
use crate::error::{self, Error};
use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId, ValueList};
use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId};
/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value
/// range. It's generated from create table request, using MySQL's syntax:
@@ -26,6 +26,7 @@ use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId, Valu
/// - partition name must be unique
/// - range bounds(the "value"s) must be strictly increased
/// - the last partition range must be bounded by "MAXVALUE"
///
/// are all been done in the create table SQL parsing stage. So we can safely skip some checks on the
/// input arguments.
///
@@ -66,7 +67,7 @@ impl PartitionRule for RangePartitionRule {
vec![self.column_name().to_string()]
}
fn find_region(&self, _values: &ValueList) -> Result<RegionId, Self::Error> {
fn find_region(&self, _values: &[Value]) -> Result<RegionId, Self::Error> {
unimplemented!()
}
@@ -100,7 +101,7 @@ impl PartitionRule for RangePartitionRule {
Operator::NotEq => &self.regions[..],
_ => unimplemented!(),
},
Err(i) => match *op {
Err(i) => match op {
Operator::Lt | Operator::LtEq => &self.regions[..=i],
Operator::Eq => &self.regions[i..=i],
Operator::Gt | Operator::GtEq => &self.regions[i..],

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use datatypes::value::Value;
use datatypes::vectors::VectorBuilder;
use datatypes::vectors::VectorRef;
use snafu::ensure;
@@ -11,7 +12,7 @@ use crate::error::FindPartitionColumnSnafu;
use crate::error::FindRegionSnafu;
use crate::error::InvalidInsertRequestSnafu;
use crate::error::Result;
use crate::partitioning::{PartitionRule, ValueList};
use crate::partitioning::PartitionRule;
pub type DistInsertRequest = HashMap<RegionId, InsertRequest>;
@@ -105,7 +106,7 @@ fn find_partitioning_values(
.collect()
}
fn partition_values(partition_columns: &[VectorRef], idx: usize) -> ValueList {
fn partition_values(partition_columns: &[VectorRef], idx: usize) -> Vec<Value> {
partition_columns
.iter()
.map(|column| column.get(idx))
@@ -411,7 +412,7 @@ mod tests {
vec!["id".to_string()]
}
fn find_region(&self, values: &super::ValueList) -> Result<RegionId, Self::Error> {
fn find_region(&self, values: &[Value]) -> Result<RegionId, Self::Error> {
let val = values.get(0).unwrap().to_owned();
let id_1: Value = 1_i16.into();
let id_2: Value = 2_i16.into();