From ea2ebc0e8784ec1fec6b152eeff7e0d6a3982c99 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 1 Nov 2022 16:09:23 +0800 Subject: [PATCH] feat: range partition rule (#304) * feat: range partitioning rule Co-authored-by: luofucong --- Cargo.lock | 2 + src/frontend/Cargo.toml | 2 + src/frontend/src/error.rs | 10 +- src/frontend/src/lib.rs | 2 +- src/frontend/src/partition.rs | 13 -- src/frontend/src/partitioning.rs | 34 +++++ src/frontend/src/partitioning/range.rs | 195 +++++++++++++++++++++++++ src/frontend/src/spliter.rs | 16 +- 8 files changed, 252 insertions(+), 22 deletions(-) delete mode 100644 src/frontend/src/partition.rs create mode 100644 src/frontend/src/partitioning.rs create mode 100644 src/frontend/src/partitioning/range.rs diff --git a/Cargo.lock b/Cargo.lock index 575c657d5d..4cfbd40054 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1809,6 +1809,7 @@ dependencies = [ "common-time", "datafusion", "datafusion-common", + "datafusion-expr", "datanode", "datatypes", "futures", @@ -1818,6 +1819,7 @@ dependencies = [ "servers", "snafu", "sql", + "store-api", "table", "tempdir", "tokio", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 293533f14e..af83b960c6 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -15,6 +15,7 @@ common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } openmetrics-parser = "0.4" prost = "0.11" @@ -22,6 +23,7 @@ serde = "1.0" servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } sql = { path = "../sql" } +store-api = { path = "../store-api" } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 22dbdc7981..34c4a3a7f2 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -100,6 +100,13 @@ pub enum Error { reason: String, backtrace: Backtrace, }, + + #[snafu(display("Expect {} region keys, actual {}", expect, actual))] + RegionKeysSize { + expect: usize, + actual: usize, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -112,7 +119,8 @@ impl ErrorExt for Error { | Error::InvalidSql { .. } | Error::FindRegion { .. } | Error::InvalidInsertRequest { .. } - | Error::FindPartitionColumn { .. } => StatusCode::InvalidArguments, + | Error::FindPartitionColumn { .. } + | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, Error::RuntimeResource { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), Error::ParseSql { source } => source.status_code(), diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 5dd6d1047e..85521c4475 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -6,7 +6,7 @@ pub mod influxdb; pub mod instance; pub mod mysql; pub mod opentsdb; -pub mod partition; +pub mod partitioning; pub mod postgres; pub mod prometheus; mod server; diff --git a/src/frontend/src/partition.rs b/src/frontend/src/partition.rs deleted file mode 100644 index 68da14080d..0000000000 --- a/src/frontend/src/partition.rs +++ /dev/null @@ -1,13 +0,0 @@ -use std::fmt::Debug; - -use crate::spliter::{ColumnName, RegionId, ValueList}; - -pub trait PartitionRule { - type Error: Debug; - - fn partition_columns(&self) -> Vec; - - fn find_region(&self, values: &ValueList) -> std::result::Result; - - // TODO(fys): there maybe other method -} diff --git a/src/frontend/src/partitioning.rs b/src/frontend/src/partitioning.rs new file mode 100644 index 0000000000..7a0b308146 --- /dev/null +++ b/src/frontend/src/partitioning.rs @@ -0,0 +1,34 @@ +mod range; + +use std::fmt::Debug; + +pub use datafusion_expr::Operator; +use datatypes::prelude::Value; +use store_api::storage::RegionId; + +pub(crate) type ValueList = Vec; + +pub trait PartitionRule { + type Error: Debug; + + fn partition_columns(&self) -> Vec; + + // TODO(LFC): Unify `find_region` and `find_regions` methods when distributed read and write features are both merged into develop. + // Or find better names since one is mainly for writes and the other is for reads. + fn find_region(&self, values: &ValueList) -> Result; + + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error>; +} + +#[derive(Debug, PartialEq, Eq)] +pub struct PartitionExpr { + column: String, + op: Operator, + value: Value, +} + +impl PartitionExpr { + pub fn value(&self) -> &Value { + &self.value + } +} diff --git a/src/frontend/src/partitioning/range.rs b/src/frontend/src/partitioning/range.rs new file mode 100644 index 0000000000..10fc4fa6fc --- /dev/null +++ b/src/frontend/src/partitioning/range.rs @@ -0,0 +1,195 @@ +use datatypes::prelude::*; +use snafu::OptionExt; + +use crate::error::{self, Error}; +use crate::partitioning::{Operator, PartitionExpr, PartitionRule, RegionId, ValueList}; + +/// [RangePartitionRule] manages the distribution of partitions partitioning by some column's value +/// range. It's generated from create table request, using MySQL's syntax: +/// +/// ```SQL +/// CREATE TABLE table_name ( +/// columns definition +/// ) +/// PARTITION BY RANGE (column_name) ( +/// PARTITION partition_name VALUES LESS THAN (value)[, +/// PARTITION partition_name VALUES LESS THAN (value)][, +/// ...] +/// ) +/// ``` +/// +/// Please refer to MySQL's ["RANGE Partitioning"](https://dev.mysql.com/doc/refman/8.0/en/partitioning-range.html) +/// document for more details. +/// +/// Some partition related validations like: +/// - the column used in partitioning must be defined in the create table request +/// - partition name must be unique +/// - range bounds(the "value"s) must be strictly increased +/// - the last partition range must be bounded by "MAXVALUE" +/// are all been done in the create table SQL parsing stage. So we can safely skip some checks on the +/// input arguments. +/// +/// # Important Notes on Partition and Region +/// +/// Technically, table "partition" is a concept of data sharding logically, i.e., how table's data are +/// distributed in logic. And "region" is of how data been placed physically. They should be used +/// in different ways. +/// +/// However, currently we have only one region for each partition. For the sake of simplicity, the +/// terms "partition" and "region" are used interchangeably. +/// +// TODO(LFC): Further clarify "partition" and "region". +// Could be creating an extra layer between partition and region. +struct RangePartitionRule { + column_name: String, + // Does not store the last "MAXVALUE" bound; because in this way our binary search in finding + // partitions are easier (besides, it's hard to represent "MAXVALUE" in our `Value`). + // Then the length of `bounds` is one less than `regions`. + bounds: Vec, + regions: Vec, +} + +impl RangePartitionRule { + fn column_name(&self) -> &String { + &self.column_name + } + + fn all_regions(&self) -> &Vec { + &self.regions + } +} + +impl PartitionRule for RangePartitionRule { + type Error = Error; + + fn partition_columns(&self) -> Vec { + vec![self.column_name().to_string()] + } + + fn find_region(&self, _values: &ValueList) -> Result { + unimplemented!() + } + + fn find_regions(&self, exprs: &[PartitionExpr]) -> Result, Self::Error> { + debug_assert_eq!( + exprs.len(), + 1, + "RangePartitionRule can only handle one partition expr, actual {}", + exprs.len() + ); + + let PartitionExpr { column, op, value } = + exprs.first().context(error::FindRegionSnafu { + reason: "no partition expr is provided", + })?; + let regions = if column == self.column_name() { + // an example of bounds and regions: + // SQL: + // PARTITION p1 VALUES LESS THAN (10), + // PARTITION p2 VALUES LESS THAN (20), + // PARTITION p3 VALUES LESS THAN (50), + // PARTITION p4 VALUES LESS THAN (MAXVALUE), + // bounds: [10, 20, 50] + // regions: [1, 2, 3, 4] + match self.bounds.binary_search(value) { + Ok(i) => match op { + Operator::Lt => &self.regions[..=i], + Operator::LtEq => &self.regions[..=(i + 1)], + Operator::Eq => &self.regions[(i + 1)..=(i + 1)], + Operator::Gt | Operator::GtEq => &self.regions[(i + 1)..], + Operator::NotEq => &self.regions[..], + _ => unimplemented!(), + }, + Err(i) => match *op { + Operator::Lt | Operator::LtEq => &self.regions[..=i], + Operator::Eq => &self.regions[i..=i], + Operator::Gt | Operator::GtEq => &self.regions[i..], + Operator::NotEq => &self.regions[..], + _ => unimplemented!(), + }, + } + .to_vec() + } else { + self.all_regions().clone() + }; + Ok(regions) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_find_regions() { + // PARTITION BY RANGE (a) ( + // PARTITION p1 VALUES LESS THAN ('hz'), + // PARTITION p2 VALUES LESS THAN ('sh'), + // PARTITION p3 VALUES LESS THAN ('sz'), + // PARTITION p4 VALUES LESS THAN (MAXVALUE), + // ) + let rule = RangePartitionRule { + column_name: "a".to_string(), + bounds: vec!["hz".into(), "sh".into(), "sz".into()], + regions: vec![1, 2, 3, 4], + }; + + let test = |column: &str, op: Operator, value: &str, expected_regions: Vec| { + let expr = PartitionExpr { + column: column.to_string(), + op, + value: value.into(), + }; + let regions = rule.find_regions(&[expr]).unwrap(); + assert_eq!( + regions, + expected_regions.into_iter().collect::>() + ); + }; + + test("a", Operator::NotEq, "hz", vec![1, 2, 3, 4]); + test("a", Operator::NotEq, "what", vec![1, 2, 3, 4]); + + test("a", Operator::GtEq, "ab", vec![1, 2, 3, 4]); + test("a", Operator::GtEq, "hz", vec![2, 3, 4]); + test("a", Operator::GtEq, "ijk", vec![2, 3, 4]); + test("a", Operator::GtEq, "sh", vec![3, 4]); + test("a", Operator::GtEq, "ssh", vec![3, 4]); + test("a", Operator::GtEq, "sz", vec![4]); + test("a", Operator::GtEq, "zz", vec![4]); + + test("a", Operator::Gt, "ab", vec![1, 2, 3, 4]); + test("a", Operator::Gt, "hz", vec![2, 3, 4]); + test("a", Operator::Gt, "ijk", vec![2, 3, 4]); + test("a", Operator::Gt, "sh", vec![3, 4]); + test("a", Operator::Gt, "ssh", vec![3, 4]); + test("a", Operator::Gt, "sz", vec![4]); + test("a", Operator::Gt, "zz", vec![4]); + + test("a", Operator::Eq, "ab", vec![1]); + test("a", Operator::Eq, "hz", vec![2]); + test("a", Operator::Eq, "ijk", vec![2]); + test("a", Operator::Eq, "sh", vec![3]); + test("a", Operator::Eq, "ssh", vec![3]); + test("a", Operator::Eq, "sz", vec![4]); + test("a", Operator::Eq, "zz", vec![4]); + + test("a", Operator::Lt, "ab", vec![1]); + test("a", Operator::Lt, "hz", vec![1]); + test("a", Operator::Lt, "ijk", vec![1, 2]); + test("a", Operator::Lt, "sh", vec![1, 2]); + test("a", Operator::Lt, "ssh", vec![1, 2, 3]); + test("a", Operator::Lt, "sz", vec![1, 2, 3]); + test("a", Operator::Lt, "zz", vec![1, 2, 3, 4]); + + test("a", Operator::LtEq, "ab", vec![1]); + test("a", Operator::LtEq, "hz", vec![1, 2]); + test("a", Operator::LtEq, "ijk", vec![1, 2]); + test("a", Operator::LtEq, "sh", vec![1, 2, 3]); + test("a", Operator::LtEq, "ssh", vec![1, 2, 3]); + test("a", Operator::LtEq, "sz", vec![1, 2, 3, 4]); + test("a", Operator::LtEq, "zz", vec![1, 2, 3, 4]); + + test("b", Operator::Lt, "1", vec![1, 2, 3, 4]); + } +} diff --git a/src/frontend/src/spliter.rs b/src/frontend/src/spliter.rs index 3ae15785b6..5740c2a9eb 100644 --- a/src/frontend/src/spliter.rs +++ b/src/frontend/src/spliter.rs @@ -1,21 +1,18 @@ use std::collections::HashMap; -use datatypes::value::Value; use datatypes::vectors::VectorBuilder; use datatypes::vectors::VectorRef; use snafu::ensure; use snafu::OptionExt; +use store_api::storage::RegionId; use table::requests::InsertRequest; use crate::error::FindPartitionColumnSnafu; use crate::error::FindRegionSnafu; use crate::error::InvalidInsertRequestSnafu; use crate::error::Result; -use crate::partition::PartitionRule; +use crate::partitioning::{PartitionRule, ValueList}; -pub type RegionId = u64; -pub type ColumnName = String; -pub type ValueList = Vec; pub type DistInsertRequest = HashMap; pub struct WriteSpliter<'a, P> { @@ -170,8 +167,9 @@ mod tests { use super::{ check_req, find_partitioning_values, partition_insert_request, partition_values, - ColumnName, PartitionRule, RegionId, WriteSpliter, + PartitionRule, RegionId, WriteSpliter, }; + use crate::partitioning::PartitionExpr; #[test] fn test_insert_req_check() { @@ -409,7 +407,7 @@ mod tests { impl PartitionRule for MockPartitionRule { type Error = String; - fn partition_columns(&self) -> Vec { + fn partition_columns(&self) -> Vec { vec!["id".to_string()] } @@ -426,5 +424,9 @@ mod tests { } unreachable!() } + + fn find_regions(&self, _: &[PartitionExpr]) -> Result, Self::Error> { + unimplemented!() + } } }