feat: implement multi-dim partition rule (#3409)

* generate expr rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement show create for new partition rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement row spliter

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: fix failed tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fix lint issues

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: ignore tests for deprecated partition rule

* chore: remove unused partition rule tests setup

* test(sqlness): add basic partition tests

* test(multi_dim): add basic find region test

* address CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Signed-off-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-03-05 19:39:15 +08:00
committed by GitHub
parent 97cbfcfe23
commit 020635063c
21 changed files with 806 additions and 221 deletions

2
Cargo.lock generated
View File

@@ -6476,6 +6476,8 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"sql",
"sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)",
"store-api",
"table",
]

View File

@@ -454,6 +454,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse sql value"))]
ParseSqlValue {
source: sql::error::Error,
location: Location,
},
#[snafu(display("Failed to build default value, column: {}", column))]
ColumnDefaultValue {
column: String,
@@ -522,6 +528,9 @@ pub enum Error {
#[snafu(display("Failed to create logical tables: {}", reason))]
CreateLogicalTables { reason: String, location: Location },
#[snafu(display("Invalid partition rule: {}", reason))]
InvalidPartitionRule { reason: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -643,7 +652,9 @@ impl ErrorExt for Error {
Error::CreateTableWithMultiCatalogs { .. }
| Error::CreateTableWithMultiSchemas { .. }
| Error::EmptyCreateTableExpr { .. } => StatusCode::InvalidArguments,
| Error::EmptyCreateTableExpr { .. }
| Error::InvalidPartitionRule { .. }
| Error::ParseSqlValue { .. } => StatusCode::InvalidArguments,
Error::CreateLogicalTables { .. } => StatusCode::Unexpected,
}

View File

@@ -31,9 +31,12 @@ use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::{info, tracing};
use common_time::Timezone;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use query::sql::create_table_stmt;
use regex::Regex;
@@ -42,6 +45,8 @@ use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
use sql::statements::sql_value_to_value;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -52,9 +57,9 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu,
CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu,
InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::statement::show::create_partitions_stmt;
@@ -730,13 +735,17 @@ fn find_partition_columns(partitions: &Option<Partitions>) -> Result<Vec<String>
Ok(columns)
}
/// Parse [Partitions] into a group of partition entries.
///
/// Returns a list of [PartitionBound], each of which defines a partition.
fn find_partition_entries(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
_query_ctx: &QueryContextRef,
query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(_partitions) = partitions {
let entries = if let Some(partitions) = partitions {
// extract concrete data type of partition columns
let column_defs = partition_columns
.iter()
.map(|pc| {
@@ -748,24 +757,103 @@ fn find_partition_entries(
.unwrap()
})
.collect::<Vec<_>>();
let mut column_name_and_type = Vec::with_capacity(column_defs.len());
let mut column_name_and_type = HashMap::with_capacity(column_defs.len());
for column in column_defs {
let column_name = &column.name;
let data_type = ConcreteDataType::from(
ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone())
.context(ColumnDataTypeSnafu)?,
);
column_name_and_type.push((column_name, data_type));
column_name_and_type.insert(column_name, data_type);
}
// TODO(ruihang): implement the partition value parser.
vec![vec![PartitionBound::MaxValue]]
// Transform parser expr to partition expr
let mut partition_exprs = Vec::with_capacity(partitions.exprs.len());
for partition in &partitions.exprs {
let partition_expr =
convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?;
partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]);
}
// fallback for no expr
if partition_exprs.is_empty() {
partition_exprs.push(vec![PartitionBound::MaxValue]);
}
partition_exprs
} else {
vec![vec![PartitionBound::MaxValue]]
};
Ok(entries)
}
fn convert_one_expr(
expr: &Expr,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
timezone: &Timezone,
) -> Result<PartitionExpr> {
let Expr::BinaryOp { left, op, right } = expr else {
return InvalidPartitionRuleSnafu {
reason: "partition rule must be a binary expression",
}
.fail();
};
let op =
RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu {
reason: format!("unsupported operator in partition expr {op}"),
})?;
// convert leaf node.
let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) {
(Expr::Identifier(ident), Expr::Value(value)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone)?;
(Operand::Column(column_name), op, Operand::Value(value))
}
(Expr::Value(value), Expr::Identifier(ident)) => {
let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?;
let value = convert_value(value, data_type, timezone)?;
(Operand::Value(value), op, Operand::Column(column_name))
}
(Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => {
// sub-expr must against another sub-expr
let lhs = convert_one_expr(left, column_name_and_type, timezone)?;
let rhs = convert_one_expr(right, column_name_and_type, timezone)?;
(Operand::Expr(lhs), op, Operand::Expr(rhs))
}
_ => {
return InvalidPartitionRuleSnafu {
reason: format!("invalid partition expr {expr}"),
}
.fail();
}
};
Ok(PartitionExpr::new(lhs, op, rhs))
}
fn convert_identifier(
ident: &Ident,
column_name_and_type: &HashMap<&String, ConcreteDataType>,
) -> Result<(String, ConcreteDataType)> {
let column_name = ident.value.clone();
let data_type = column_name_and_type
.get(&column_name)
.cloned()
.with_context(|| ColumnNotFoundSnafu { msg: &column_name })?;
Ok((column_name, data_type))
}
fn convert_value(
value: &ParserValue,
data_type: ConcreteDataType,
timezone: &Timezone,
) -> Result<Value> {
sql_value_to_value("<NONAME>", &data_type, value, Some(timezone)).context(ParseSqlValueSnafu)
}
/// Merge table level table options with schema level table options.
fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
@@ -821,10 +909,10 @@ mod test {
(
r"
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b) (
PARTITION r0 VALUES LESS THAN ('hz'),
PARTITION r1 VALUES LESS THAN ('sh'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (b) (
b < 'hz',
b >= 'hz' AND b < 'sh',
b >= 'sh'
)
ENGINE=mito",
r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#,
@@ -834,8 +922,9 @@ ENGINE=mito",
CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) )
PARTITION BY RANGE COLUMNS (b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 10),
PARTITION r1 VALUES LESS THAN ('sh', 20),
PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE),
b < 'hz' AND a < 10,
b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20,
b >= 'sh' AND a >= 20
)
ENGINE=mito",
r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#,

View File

@@ -16,6 +16,7 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::Ident;
@@ -88,10 +89,22 @@ pub(crate) fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<O
.map(|name| name[..].into())
.collect();
// TODO(ruihang): convert partition info back to partition expr
let exprs = partitions
.iter()
.filter_map(|partition| {
partition
.partition
.partition_bounds()
.first()
.and_then(|bound| {
if let PartitionBound::Expr(expr) = bound {
Some(expr.to_parser_expr())
} else {
None
}
})
})
.collect();
Ok(Some(Partitions {
column_list,
exprs: vec![],
}))
Ok(Some(Partitions { column_list, exprs }))
}

View File

@@ -29,6 +29,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use meta_client::client::MetaClient;
use partition::columns::RangeColumnsPartitionRule;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::partition::{PartitionBound, PartitionDef};
use partition::range::RangePartitionRule;
@@ -116,7 +117,11 @@ pub(crate) async fn create_partition_rule_manager(
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(10_i32.into())],
vec![PartitionBound::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::Int32(10)),
))],
)
.try_into()
.unwrap(),
@@ -135,7 +140,19 @@ pub(crate) async fn create_partition_rule_manager(
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::Value(50_i32.into())],
vec![PartitionBound::Expr(PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int32(10)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::Int32(50)),
)),
))],
)
.try_into()
.unwrap(),
@@ -154,7 +171,11 @@ pub(crate) async fn create_partition_rule_manager(
partition: Some(
PartitionDef::new(
vec!["a".to_string()],
vec![PartitionBound::MaxValue],
vec![PartitionBound::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::Int32(50)),
))],
)
.try_into()
.unwrap(),
@@ -172,83 +193,11 @@ pub(crate) async fn create_partition_rule_manager(
.await
.unwrap();
table_metadata_manager
.create_table_metadata(
new_test_table_info(2, "table_2", regions.clone().into_iter()).into(),
TableRouteValue::physical(vec![
RegionRoute {
region: Region {
id: 1.into(),
name: "r1".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(10_i32.into()),
PartitionBound::Value("hz".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
id: 2.into(),
name: "r2".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![
PartitionBound::Value(50_i32.into()),
PartitionBound::Value("sh".into()),
],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
RegionRoute {
region: Region {
id: 3.into(),
name: "r3".to_string(),
partition: Some(
PartitionDef::new(
vec!["a".to_string(), "b".to_string()],
vec![PartitionBound::MaxValue, PartitionBound::MaxValue],
)
.try_into()
.unwrap(),
),
attrs: BTreeMap::new(),
},
leader_peer: None,
follower_peers: vec![],
leader_status: None,
leader_down_since: None,
},
]),
region_wal_options,
)
.await
.unwrap();
partition_manager
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang, weny): WIP new partition rule"]
async fn test_find_partition_rule() {
let partition_manager =
create_partition_rule_manager(Arc::new(MemoryKvBackend::default())).await;

View File

@@ -28,5 +28,7 @@ prometheus.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
table.workspace = true

117
src/partition/src/expr.rs Normal file
View File

@@ -0,0 +1,117 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::value::Value;
use serde::{Deserialize, Serialize};
use sql::statements::value_to_sql_value;
use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
/// by [`Self::to_parser_expr`].
///
/// [Expr]: sqlparser::ast::Expr
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PartitionExpr {
pub(crate) lhs: Box<Operand>,
pub(crate) op: RestrictedOp,
pub(crate) rhs: Box<Operand>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Operand {
Column(String),
Value(Value),
Expr(PartitionExpr),
}
/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
/// partition expressions.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum RestrictedOp {
// Evaluate to binary
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
// Conjunction
And,
Or,
}
impl RestrictedOp {
pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
match op {
ParserBinaryOperator::Eq => Some(Self::Eq),
ParserBinaryOperator::NotEq => Some(Self::NotEq),
ParserBinaryOperator::Lt => Some(Self::Lt),
ParserBinaryOperator::LtEq => Some(Self::LtEq),
ParserBinaryOperator::Gt => Some(Self::Gt),
ParserBinaryOperator::GtEq => Some(Self::GtEq),
ParserBinaryOperator::And => Some(Self::And),
ParserBinaryOperator::Or => Some(Self::Or),
_ => None,
}
}
pub fn to_parser_op(&self) -> ParserBinaryOperator {
match self {
Self::Eq => ParserBinaryOperator::Eq,
Self::NotEq => ParserBinaryOperator::NotEq,
Self::Lt => ParserBinaryOperator::Lt,
Self::LtEq => ParserBinaryOperator::LtEq,
Self::Gt => ParserBinaryOperator::Gt,
Self::GtEq => ParserBinaryOperator::GtEq,
Self::And => ParserBinaryOperator::And,
Self::Or => ParserBinaryOperator::Or,
}
}
}
impl PartitionExpr {
pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
Self {
lhs: Box::new(lhs),
op,
rhs: Box::new(rhs),
}
}
/// Convert [Self] back to sqlparser's [Expr]
///
/// [Expr]: ParserExpr
pub fn to_parser_expr(&self) -> ParserExpr {
// Safety: Partition rule won't contains unsupported value type.
// Otherwise it will be rejected by the parser.
let lhs = match &*self.lhs {
Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
Operand::Expr(e) => e.to_parser_expr(),
};
let rhs = match &*self.rhs {
Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
Operand::Expr(e) => e.to_parser_expr(),
};
ParserExpr::BinaryOp {
left: Box::new(lhs),
op: self.op.to_parser_op(),
right: Box::new(rhs),
}
}
}

View File

@@ -16,8 +16,10 @@
pub mod columns;
pub mod error;
pub mod expr;
pub mod manager;
pub mod metrics;
pub mod multi_dim;
pub mod partition;
pub mod range;
pub mod splitter;

View File

@@ -30,6 +30,7 @@ use table::metadata::TableId;
use crate::columns::RangeColumnsPartitionRule;
use crate::error::{FindLeaderSnafu, Result};
use crate::multi_dim::MultiDimPartitionRule;
use crate::partition::{PartitionBound, PartitionDef, PartitionExpr};
use crate::range::RangePartitionRule;
use crate::splitter::RowSplitter;
@@ -122,12 +123,41 @@ impl PartitionRuleManager {
Ok(results)
}
/// Get partition rule of given table.
pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result<PartitionRuleRef> {
let partitions = self.find_table_partitions(table_id).await?;
let partition_columns = partitions[0].partition.partition_columns();
let regions = partitions
.iter()
.map(|x| x.id.region_number())
.collect::<Vec<RegionNumber>>();
let exprs = partitions
.iter()
.filter_map(|x| match &x.partition.partition_bounds()[0] {
PartitionBound::Expr(e) => Some(e.clone()),
_ => None,
})
.collect::<Vec<_>>();
Ok(Arc::new(MultiDimPartitionRule::new(
partition_columns.clone(),
regions,
exprs,
)) as _)
}
/// Get partition rule of given table.
pub async fn find_table_partition_rule_deprecated(
&self,
table_id: TableId,
) -> Result<PartitionRuleRef> {
let partitions = self.find_table_partitions(table_id).await?;
debug_assert!(!partitions.is_empty());
let partition_columns = partitions[0].partition.partition_columns();
let regions = partitions
.iter()
.map(|x| x.id.region_number())
@@ -142,6 +172,7 @@ impl PartitionRuleManager {
.filter_map(|info| match &info.partition.partition_bounds()[0] {
PartitionBound::Value(v) => Some(v.clone()),
PartitionBound::MaxValue => None,
PartitionBound::Expr(_) => None,
})
.collect::<Vec<Value>>();
Arc::new(RangePartitionRule::new(
@@ -266,10 +297,15 @@ fn create_partitions_from_region_routes(
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => {
DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if op.is_comparison_operator() => {
let column_op_value = match (left.as_ref(), right.as_ref()) {
(DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)),
(DfExpr::Literal(v), DfExpr::Column(c)) => Some((&c.name, reverse_operator(op), v)),
(DfExpr::Literal(v), DfExpr::Column(c)) => Some((
&c.name,
// Safety: previous branch ensures this is a comparison operator
op.swap().unwrap(),
v,
)),
_ => None,
};
if let Some((column, op, scalar)) = column_op_value {
@@ -311,27 +347,3 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<Hash
.into_iter()
.collect::<HashSet<RegionNumber>>())
}
#[inline]
fn is_compare_op(op: &Operator) -> bool {
matches!(
*op,
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq
)
}
#[inline]
fn reverse_operator(op: &Operator) -> Operator {
match *op {
Operator::Lt => Operator::Gt,
Operator::Gt => Operator::Lt,
Operator::LtEq => Operator::GtEq,
Operator::GtEq => Operator::LtEq,
_ => *op,
}
}

View File

@@ -0,0 +1,201 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
use datatypes::prelude::Value;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use store_api::storage::RegionNumber;
use crate::error::{self, Result};
use crate::expr::{Operand, PartitionExpr, RestrictedOp};
use crate::PartitionRule;
#[derive(Debug, Serialize, Deserialize)]
pub struct MultiDimPartitionRule {
partition_columns: Vec<String>,
// name to index of `partition_columns`
name_to_index: HashMap<String, usize>,
regions: Vec<RegionNumber>,
exprs: Vec<PartitionExpr>,
}
impl MultiDimPartitionRule {
pub fn new(
partition_columns: Vec<String>,
regions: Vec<RegionNumber>,
exprs: Vec<PartitionExpr>,
) -> Self {
let name_to_index = partition_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect::<HashMap<_, _>>();
Self {
partition_columns,
name_to_index,
regions,
exprs,
}
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
ensure!(
values.len() == self.partition_columns.len(),
error::RegionKeysSizeSnafu {
expect: self.partition_columns.len(),
actual: values.len(),
}
);
for (region_index, expr) in self.exprs.iter().enumerate() {
if self.evaluate_expr(expr, values)? {
return Ok(self.regions[region_index]);
}
}
// return the default region number
Ok(0)
}
fn evaluate_expr(&self, expr: &PartitionExpr, values: &[Value]) -> Result<bool> {
match (expr.lhs.as_ref(), expr.rhs.as_ref()) {
(Operand::Column(name), Operand::Value(r)) => {
let index = self.name_to_index.get(name).unwrap();
let l = &values[*index];
Self::perform_op(l, &expr.op, r)
}
(Operand::Value(l), Operand::Column(name)) => {
let index = self.name_to_index.get(name).unwrap();
let r = &values[*index];
Self::perform_op(l, &expr.op, r)
}
(Operand::Expr(lhs), Operand::Expr(rhs)) => {
let lhs = self.evaluate_expr(lhs, values)?;
let rhs = self.evaluate_expr(rhs, values)?;
match expr.op {
RestrictedOp::And => Ok(lhs && rhs),
RestrictedOp::Or => Ok(lhs || rhs),
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
fn perform_op(lhs: &Value, op: &RestrictedOp, rhs: &Value) -> Result<bool> {
let result = match op {
RestrictedOp::Eq => lhs.eq(rhs),
RestrictedOp::NotEq => lhs.ne(rhs),
RestrictedOp::Lt => lhs.partial_cmp(rhs) == Some(Ordering::Less),
RestrictedOp::LtEq => {
let result = lhs.partial_cmp(rhs);
result == Some(Ordering::Less) || result == Some(Ordering::Equal)
}
RestrictedOp::Gt => lhs.partial_cmp(rhs) == Some(Ordering::Greater),
RestrictedOp::GtEq => {
let result = lhs.partial_cmp(rhs);
result == Some(Ordering::Greater) || result == Some(Ordering::Equal)
}
RestrictedOp::And | RestrictedOp::Or => unreachable!(),
};
Ok(result)
}
}
impl PartitionRule for MultiDimPartitionRule {
fn as_any(&self) -> &dyn Any {
self
}
fn partition_columns(&self) -> Vec<String> {
self.partition_columns.clone()
}
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
self.find_region(values)
}
fn find_regions_by_exprs(
&self,
_exprs: &[crate::partition::PartitionExpr],
) -> Result<Vec<RegionNumber>> {
Ok(self.regions.clone())
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
use crate::error;
#[test]
fn test_find_region() {
// PARTITION ON COLUMNS (b) (
// b < 'hz',
// b >= 'hz' AND b < 'sh',
// b >= 'sh'
// )
let rule = MultiDimPartitionRule::new(
vec!["b".to_string()],
vec![1, 2, 3],
vec![
PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::String("hz".into())),
),
PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::String("hz".into())),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::Lt,
Operand::Value(datatypes::value::Value::String("sh".into())),
)),
),
PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::GtEq,
Operand::Value(datatypes::value::Value::String("sh".into())),
),
],
);
assert_matches!(
rule.find_region(&["foo".into(), 1000_i32.into()]),
Err(error::Error::RegionKeysSize {
expect: 1,
actual: 2,
..
})
);
assert_matches!(rule.find_region(&["foo".into()]), Ok(1));
assert_matches!(rule.find_region(&["bar".into()]), Ok(1));
assert_matches!(rule.find_region(&["hz".into()]), Ok(2));
assert_matches!(rule.find_region(&["hzz".into()]), Ok(2));
assert_matches!(rule.find_region(&["sh".into()]), Ok(3));
assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3));
}
}

View File

@@ -49,6 +49,7 @@ pub trait PartitionRule: Sync + Send {
pub enum PartitionBound {
Value(Value),
MaxValue,
Expr(crate::expr::PartitionExpr),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -62,6 +63,7 @@ impl Display for PartitionBound {
match self {
Self::Value(v) => write!(f, "{}", v),
Self::MaxValue => write!(f, "MAXVALUE"),
Self::Expr(e) => write!(f, "{:?}", e),
}
}
}

View File

@@ -98,6 +98,7 @@ impl CreateTableExprTranslator {
_ => format!("{v}"),
},
PartitionBound::MaxValue => "MAXVALUE".to_string(),
PartitionBound::Expr(expr) => expr.to_parser_expr().to_string(),
}
}

View File

@@ -186,7 +186,6 @@ mod test {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_distributed_insert_delete_and_query() {
common_telemetry::init_default_ut_logging();
@@ -204,11 +203,11 @@ CREATE TABLE {table_name} (
ts TIMESTAMP,
TIME INDEX (ts),
PRIMARY KEY (a, b)
) PARTITION BY RANGE COLUMNS(a) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (20),
PARTITION r2 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
) PARTITION ON COLUMNS(a) (
a < 10,
a >= 10 AND a < 20,
a >= 20 AND a < 50,
a >= 50
)"
);
create_table(frontend, sql).await;

View File

@@ -67,7 +67,6 @@ mod tests {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_distributed_exec_sql() {
common_telemetry::init_default_ut_logging();
@@ -85,11 +84,11 @@ mod tests {
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN ('MOSS'),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A' AND host < '550-W',
host >= '550-W' AND host < 'MOSS',
host >= 'MOSS'
)
engine=mito"#;
create_table(instance, sql).await;

View File

@@ -83,7 +83,6 @@ async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>)
}
#[apply(both_instances_cases)]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_show_create_table(instance: Arc<dyn MockInstance>) {
let frontend = instance.frontend();
let sql = if instance.is_distributed_mode() {
@@ -92,11 +91,11 @@ async fn test_show_create_table(instance: Arc<dyn MockInstance>) {
ts timestamp,
TIME INDEX(ts)
)
PARTITION BY RANGE COLUMNS (n) (
PARTITION r0 VALUES LESS THAN (1),
PARTITION r1 VALUES LESS THAN (10),
PARTITION r2 VALUES LESS THAN (100),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (n) (
n < 1,
n >= 1 AND n < 10,
n >= 10 AND n < 100,
n >= 100
)"#
} else {
r#"create table demo(
@@ -113,26 +112,26 @@ PARTITION BY RANGE COLUMNS (n) (
let output = execute_sql(&frontend, "show create table demo").await;
let expected = if instance.is_distributed_mode() {
r#"+-------+--------------------------------------------+
| Table | Create Table |
+-------+--------------------------------------------+
| demo | CREATE TABLE IF NOT EXISTS "demo" ( |
| | "n" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("n") |
| | ) |
| | PARTITION BY RANGE COLUMNS ("n") ( |
| | PARTITION r0 VALUES LESS THAN (1), |
| | PARTITION r1 VALUES LESS THAN (10), |
| | PARTITION r2 VALUES LESS THAN (100), |
| | PARTITION r3 VALUES LESS THAN (MAXVALUE) |
| | ) |
| | ENGINE=mito |
| | WITH( |
| | regions = 4 |
| | ) |
+-------+--------------------------------------------+"#
r#"+-------+-------------------------------------+
| Table | Create Table |
+-------+-------------------------------------+
| demo | CREATE TABLE IF NOT EXISTS "demo" ( |
| | "n" INT NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("n") |
| | ) |
| | PARTITION ON COLUMNS ("n") ( |
| | n < 1, |
| | n >= 100, |
| | n >= 1 AND n < 10, |
| | n >= 10 AND n < 100 |
| | ) |
| | ENGINE=mito |
| | WITH( |
| | regions = 4 |
| | ) |
+-------+-------------------------------------+"#
} else {
r#"+-------+-------------------------------------+
| Table | Create Table |

View File

@@ -29,7 +29,6 @@ http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
// region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
sql_tests!(File);
// TODO(ruihang): re-enable this when the new partition rule is ready
// region_migration_tests!(File);
region_migration_tests!(File);
// TODO(niebayes): add integration tests for remote wal.

View File

@@ -1,36 +0,0 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
Affected Rows: 0
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
+---------------+---------+-----------+--------+
| region_id | peer_id | is_leader | status |
+---------------+---------+-----------+--------+
| REGION_ID | PEER_ID | Yes | ALIVE |
+---------------+---------+-----------+--------+
DROP TABLE my_table;
Affected Rows: 0

View File

@@ -1,19 +0,0 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
DROP TABLE my_table;

View File

@@ -0,0 +1,170 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
Affected Rows: 0
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(1000)) }) | ID |
| greptime | public | my_table | p1 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(2000)) }) | ID |
| greptime | public | my_table | p2 | (a) VALUES LESS THAN (PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(1000)) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(2000)) }) }) | ID |
+---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
+---------------+---------+-----------+--------+
| region_id | peer_id | is_leader | status |
+---------------+---------+-----------+--------+
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
+---------------+---------+-----------+--------+
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
Affected Rows: 8
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
DELETE FROM my_table WHERE a < 150;
Affected Rows: 1
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
DELETE FROM my_table WHERE a < 2200 AND a > 1500;
Affected Rows: 2
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
DELETE FROM my_table WHERE a < 2500;
Affected Rows: 5
SELECT * FROM my_table;
++
++
DROP TABLE my_table;
Affected Rows: 0
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) ();
Affected Rows: 0
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
+---------------+---------+-----------+--------+
| region_id | peer_id | is_leader | status |
+---------------+---------+-----------+--------+
| REGION_ID | PEER_ID | Yes | ALIVE |
+---------------+---------+-----------+--------+
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
Affected Rows: 8
SELECT * FROM my_table;
+------+---+-------------------------+
| a | b | ts |
+------+---+-------------------------+
| 100 | a | 1970-01-01T00:00:00.001 |
| 200 | b | 1970-01-01T00:00:00.002 |
| 1100 | c | 1970-01-01T00:00:00.003 |
| 1200 | d | 1970-01-01T00:00:00.004 |
| 2000 | e | 1970-01-01T00:00:00.005 |
| 2100 | f | 1970-01-01T00:00:00.006 |
| 2200 | g | 1970-01-01T00:00:00.007 |
| 2400 | h | 1970-01-01T00:00:00.008 |
+------+---+-------------------------+
DROP TABLE my_table;
Affected Rows: 0

View File

@@ -0,0 +1,71 @@
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 150;
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 2200 AND a > 1500;
SELECT * FROM my_table;
DELETE FROM my_table WHERE a < 2500;
SELECT * FROM my_table;
DROP TABLE my_table;
CREATE TABLE my_table (
a INT PRIMARY KEY,
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION ON COLUMNS (a) ();
-- SQLNESS REPLACE (\d{13}) ID
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name;
-- SQLNESS REPLACE (\d{13}) REGION_ID
-- SQLNESS REPLACE (\d{1}) PEER_ID
SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id;
INSERT INTO my_table VALUES
(100, 'a', 1),
(200, 'b', 2),
(1100, 'c', 3),
(1200, 'd', 4),
(2000, 'e', 5),
(2100, 'f', 6),
(2200, 'g', 7),
(2400, 'h', 8);
SELECT * FROM my_table;
DROP TABLE my_table;

View File

@@ -35,11 +35,13 @@ SHOW CREATE TABLE system_metrics;
| | PRIMARY KEY ("id", "host") |
| | ) |
| | PARTITION ON COLUMNS ("id") ( |
| | |
| | id < 5, |
| | id >= 9, |
| | id >= 5 AND id < 9 |
| | ) |
| | ENGINE=mito |
| | WITH( |
| | regions = 1, |
| | regions = 3, |
| | ttl = '7days', |
| | write_buffer_size = '1.0KiB' |
| | ) |