feat: range partition rule (#304)

* feat: range partitioning rule

Co-authored-by: luofucong <luofucong@greptime.com>
This commit is contained in:
LFC
2022-11-01 16:09:23 +08:00
committed by GitHub
parent dacfd12b8f
commit ea2ebc0e87
8 changed files with 252 additions and 22 deletions

2
Cargo.lock generated
View File

@@ -1809,6 +1809,7 @@ dependencies = [
"common-time",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datanode",
"datatypes",
"futures",
@@ -1818,6 +1819,7 @@ dependencies = [
"servers",
"snafu",
"sql",
"store-api",
"table",
"tempdir",
"tokio",

View File

@@ -15,6 +15,7 @@ common-recordbatch = { path = "../common/recordbatch" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" }
datatypes = { path = "../datatypes" }
openmetrics-parser = "0.4"
prost = "0.11"
@@ -22,6 +23,7 @@ serde = "1.0"
servers = { path = "../servers" }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }

View File

@@ -100,6 +100,13 @@ pub enum Error {
reason: String,
backtrace: Backtrace,
},
#[snafu(display("Expect {} region keys, actual {}", expect, actual))]
RegionKeysSize {
expect: usize,
actual: usize,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -112,7 +119,8 @@ impl ErrorExt for Error {
| Error::InvalidSql { .. }
| Error::FindRegion { .. }
| Error::InvalidInsertRequest { .. }
| Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments,
| Error::FindPartitionColumn { .. }
| Error::RegionKeysSize { .. } => StatusCode::InvalidArguments,
Error::RuntimeResource { source, .. } => source.status_code(),
Error::StartServer { source, .. } => source.status_code(),
Error::ParseSql { source } => source.status_code(),

View File

@@ -6,7 +6,7 @@ pub mod influxdb;
pub mod instance;
pub mod mysql;
pub mod opentsdb;
pub mod partition;
pub mod partitioning;
pub mod postgres;
pub mod prometheus;
mod server;

View File

@@ -1,13 +0,0 @@
use std::fmt::Debug;
use crate::spliter::{ColumnName, RegionId, ValueList};
pub trait PartitionRule {
type Error: Debug;
fn partition_columns(&self) -> Vec<ColumnName>;
fn find_region(&self, values: &ValueList) -> std::result::Result<RegionId, Self::Error>;
// TODO(fys): there maybe other method
}

View File

@@ -0,0 +1,34 @@
mod range;
use std::fmt::Debug;
pub use datafusion_expr::Operator;
use datatypes::prelude::Value;
use store_api::storage::RegionId;
pub(crate) type ValueList = Vec<Value>;
pub trait PartitionRule {
type Error: Debug;
fn partition_columns(&self) -> Vec<String>;
// TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop.
// Or find better names since one is mainly for writes and the other is for reads.
fn find_region(&self, values: &ValueList) -> Result<RegionId, Self::Error>;
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error>;
}
#[derive(Debug, PartialEq, Eq)]
pub struct PartitionExpr {
column: String,
op: Operator,
value: Value,
}
impl PartitionExpr {
pub fn value(&self) -> &Value {
&self.value
}
}

View File

@@ -0,0 +1,195 @@
use datatypes::prelude::*;
use snafu::OptionExt;
use crate::error::{self, Error};
use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId, ValueList};
/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value
/// range. It's generated from create table request, using MySQL's syntax:
///
/// ```SQL
/// CREATE TABLE table_name (
/// columns definition
/// )
/// PARTITION BY RANGE (column_name) (
/// PARTITION partition_name VALUES LESS THAN (value)[,
/// PARTITION partition_name VALUES LESS THAN (value)][,
/// ...]
/// )
/// ```
///
/// Please refer to MySQL's ["RANGE Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html)
/// document for more details.
///
/// Some partition related validations like:
/// - the column used in partitioning must be defined in the create table request
/// - partition name must be unique
/// - range bounds(the "value"s) must be strictly increased
/// - the last partition range must be bounded by "MAXVALUE"
/// are all been done in the create table SQL parsing stage. So we can safely skip some checks on the
/// input arguments.
///
/// # Important Notes on Partition and Region
///
/// Technically, table "partition" is a concept of data sharding logically, i.e., how table's data are
/// distributed in logic. And "region" is of how data been placed physically. They should be used
/// in different ways.
///
/// However, currently we have only one region for each partition. For the sake of simplicity, the
/// terms "partition" and "region" are used interchangeably.
///
// TODO(LFC): Further clarify "partition" and "region".
// Could be creating an extra layer between partition and region.
struct RangePartitionRule {
column_name: String,
// Does not store the last "MAXVALUE" bound; because in this way our binary search in finding
// partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`).
// Then the length of `bounds` is one less than `regions`.
bounds: Vec<Value>,
regions: Vec<RegionId>,
}
impl RangePartitionRule {
fn column_name(&self) -> &String {
&self.column_name
}
fn all_regions(&self) -> &Vec<RegionId> {
&self.regions
}
}
impl PartitionRule for RangePartitionRule {
type Error = Error;
fn partition_columns(&self) -> Vec<String> {
vec![self.column_name().to_string()]
}
fn find_region(&self, _values: &ValueList) -> Result<RegionId, Self::Error> {
unimplemented!()
}
fn find_regions(&self, exprs: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
debug_assert_eq!(
exprs.len(),
1,
"RangePartitionRule can only handle one partition expr, actual {}",
exprs.len()
);
let PartitionExpr { column, op, value } =
exprs.first().context(error::FindRegionSnafu {
reason: "no partition expr is provided",
})?;
let regions = if column == self.column_name() {
// an example of bounds and regions:
// SQL:
// PARTITION p1 VALUES LESS THAN (10),
// PARTITION p2 VALUES LESS THAN (20),
// PARTITION p3 VALUES LESS THAN (50),
// PARTITION p4 VALUES LESS THAN (MAXVALUE),
// bounds: [10, 20, 50]
// regions: [1, 2, 3, 4]
match self.bounds.binary_search(value) {
Ok(i) => match op {
Operator::Lt => &self.regions[..=i],
Operator::LtEq => &self.regions[..=(i + 1)],
Operator::Eq => &self.regions[(i + 1)..=(i + 1)],
Operator::Gt | Operator::GtEq => &self.regions[(i + 1)..],
Operator::NotEq => &self.regions[..],
_ => unimplemented!(),
},
Err(i) => match *op {
Operator::Lt | Operator::LtEq => &self.regions[..=i],
Operator::Eq => &self.regions[i..=i],
Operator::Gt | Operator::GtEq => &self.regions[i..],
Operator::NotEq => &self.regions[..],
_ => unimplemented!(),
},
}
.to_vec()
} else {
self.all_regions().clone()
};
Ok(regions)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_find_regions() {
// PARTITION BY RANGE (a) (
// PARTITION p1 VALUES LESS THAN ('hz'),
// PARTITION p2 VALUES LESS THAN ('sh'),
// PARTITION p3 VALUES LESS THAN ('sz'),
// PARTITION p4 VALUES LESS THAN (MAXVALUE),
// )
let rule = RangePartitionRule {
column_name: "a".to_string(),
bounds: vec!["hz".into(), "sh".into(), "sz".into()],
regions: vec![1, 2, 3, 4],
};
let test = |column: &str, op: Operator, value: &str, expected_regions: Vec<u64>| {
let expr = PartitionExpr {
column: column.to_string(),
op,
value: value.into(),
};
let regions = rule.find_regions(&[expr]).unwrap();
assert_eq!(
regions,
expected_regions.into_iter().collect::<Vec<RegionId>>()
);
};
test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]);
test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]);
test("a", Operator::GtEq, "ab", vec![1, 2, 3, 4]);
test("a", Operator::GtEq, "hz", vec![2, 3, 4]);
test("a", Operator::GtEq, "ijk", vec![2, 3, 4]);
test("a", Operator::GtEq, "sh", vec![3, 4]);
test("a", Operator::GtEq, "ssh", vec![3, 4]);
test("a", Operator::GtEq, "sz", vec![4]);
test("a", Operator::GtEq, "zz", vec![4]);
test("a", Operator::Gt, "ab", vec![1, 2, 3, 4]);
test("a", Operator::Gt, "hz", vec![2, 3, 4]);
test("a", Operator::Gt, "ijk", vec![2, 3, 4]);
test("a", Operator::Gt, "sh", vec![3, 4]);
test("a", Operator::Gt, "ssh", vec![3, 4]);
test("a", Operator::Gt, "sz", vec![4]);
test("a", Operator::Gt, "zz", vec![4]);
test("a", Operator::Eq, "ab", vec![1]);
test("a", Operator::Eq, "hz", vec![2]);
test("a", Operator::Eq, "ijk", vec![2]);
test("a", Operator::Eq, "sh", vec![3]);
test("a", Operator::Eq, "ssh", vec![3]);
test("a", Operator::Eq, "sz", vec![4]);
test("a", Operator::Eq, "zz", vec![4]);
test("a", Operator::Lt, "ab", vec![1]);
test("a", Operator::Lt, "hz", vec![1]);
test("a", Operator::Lt, "ijk", vec![1, 2]);
test("a", Operator::Lt, "sh", vec![1, 2]);
test("a", Operator::Lt, "ssh", vec![1, 2, 3]);
test("a", Operator::Lt, "sz", vec![1, 2, 3]);
test("a", Operator::Lt, "zz", vec![1, 2, 3, 4]);
test("a", Operator::LtEq, "ab", vec![1]);
test("a", Operator::LtEq, "hz", vec![1, 2]);
test("a", Operator::LtEq, "ijk", vec![1, 2]);
test("a", Operator::LtEq, "sh", vec![1, 2, 3]);
test("a", Operator::LtEq, "ssh", vec![1, 2, 3]);
test("a", Operator::LtEq, "sz", vec![1, 2, 3, 4]);
test("a", Operator::LtEq, "zz", vec![1, 2, 3, 4]);
test("b", Operator::Lt, "1", vec![1, 2, 3, 4]);
}
}

View File

@@ -1,21 +1,18 @@
use std::collections::HashMap;
use datatypes::value::Value;
use datatypes::vectors::VectorBuilder;
use datatypes::vectors::VectorRef;
use snafu::ensure;
use snafu::OptionExt;
use store_api::storage::RegionId;
use table::requests::InsertRequest;
use crate::error::FindPartitionColumnSnafu;
use crate::error::FindRegionSnafu;
use crate::error::InvalidInsertRequestSnafu;
use crate::error::Result;
use crate::partition::PartitionRule;
use crate::partitioning::{PartitionRule, ValueList};
pub type RegionId = u64;
pub type ColumnName = String;
pub type ValueList = Vec<Value>;
pub type DistInsertRequest = HashMap<RegionId, InsertRequest>;
pub struct WriteSpliter<'a, P> {
@@ -170,8 +167,9 @@ mod tests {
use super::{
check_req, find_partitioning_values, partition_insert_request, partition_values,
ColumnName, PartitionRule, RegionId, WriteSpliter,
PartitionRule, RegionId, WriteSpliter,
};
use crate::partitioning::PartitionExpr;
#[test]
fn test_insert_req_check() {
@@ -409,7 +407,7 @@ mod tests {
impl PartitionRule for MockPartitionRule {
type Error = String;
fn partition_columns(&self) -> Vec<ColumnName> {
fn partition_columns(&self) -> Vec<String> {
vec!["id".to_string()]
}
@@ -426,5 +424,9 @@ mod tests {
}
unreachable!()
}
fn find_regions(&self, _: &[PartitionExpr]) -> Result<Vec<RegionId>, Self::Error> {
unimplemented!()
}
}
}