From 020635063c3e4b8ab4ffcc6a0cc0b09e2c3f43d6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 5 Mar 2024 19:39:15 +0800 Subject: [PATCH] feat: implement multi-dim partition rule (#3409) * generate expr rule Signed-off-by: Ruihang Xia * implement show create for new partition rule Signed-off-by: Ruihang Xia * implement row spliter Signed-off-by: Ruihang Xia * fix: fix failed tests Signed-off-by: WenyXu * chore: fix lint issues Signed-off-by: WenyXu * chore: ignore tests for deprecated partition rule * chore: remove unused partition rule tests setup * test(sqlness): add basic partition tests * test(multi_dim): add basic find region test * address CR comments Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Signed-off-by: WenyXu Co-authored-by: WenyXu --- Cargo.lock | 2 + src/operator/src/error.rs | 13 +- src/operator/src/statement/ddl.rs | 119 +++++++++-- src/operator/src/statement/show.rs | 23 +- src/operator/src/tests/partition_manager.rs | 101 +++------ src/partition/Cargo.toml | 2 + src/partition/src/expr.rs | 117 ++++++++++ src/partition/src/lib.rs | 2 + src/partition/src/manager.rs | 66 +++--- src/partition/src/multi_dim.rs | 201 ++++++++++++++++++ src/partition/src/partition.rs | 2 + .../src/translator/mysql/create_expr.rs | 1 + tests-integration/src/grpc.rs | 11 +- tests-integration/src/instance.rs | 11 +- tests-integration/src/tests/instance_test.rs | 51 +++-- tests-integration/tests/main.rs | 3 +- .../cases/distributed/create/partition.result | 36 ---- tests/cases/distributed/create/partition.sql | 19 -- .../cases/standalone/common/partition.result | 170 +++++++++++++++ tests/cases/standalone/common/partition.sql | 71 +++++++ .../standalone/common/show/show_create.result | 6 +- 21 files changed, 806 insertions(+), 221 deletions(-) create mode 100644 src/partition/src/expr.rs create mode 100644 src/partition/src/multi_dim.rs delete mode 100644 tests/cases/distributed/create/partition.result delete mode 100644 tests/cases/distributed/create/partition.sql create mode 100644 tests/cases/standalone/common/partition.result create mode 100644 tests/cases/standalone/common/partition.sql diff --git a/Cargo.lock b/Cargo.lock index a9f95086af..7e4a95c319 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6476,6 +6476,8 @@ dependencies = [ "serde", "serde_json", "snafu", + "sql", + "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "store-api", "table", ] diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index f6826a3281..55b4c79238 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -454,6 +454,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse sql value"))] + ParseSqlValue { + source: sql::error::Error, + location: Location, + }, + #[snafu(display("Failed to build default value, column: {}", column))] ColumnDefaultValue { column: String, @@ -522,6 +528,9 @@ pub enum Error { #[snafu(display("Failed to create logical tables: {}", reason))] CreateLogicalTables { reason: String, location: Location }, + + #[snafu(display("Invalid partition rule: {}", reason))] + InvalidPartitionRule { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -643,7 +652,9 @@ impl ErrorExt for Error { Error::CreateTableWithMultiCatalogs { .. } | Error::CreateTableWithMultiSchemas { .. } - | Error::EmptyCreateTableExpr { .. } => StatusCode::InvalidArguments, + | Error::EmptyCreateTableExpr { .. } + | Error::InvalidPartitionRule { .. } + | Error::ParseSqlValue { .. } => StatusCode::InvalidArguments, Error::CreateLogicalTables { .. } => StatusCode::Unexpected, } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 22c6b44b78..8c76b0618b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -31,9 +31,12 @@ use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::{info, tracing}; +use common_time::Timezone; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; +use datatypes::value::Value; use lazy_static::lazy_static; +use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; use query::sql::create_table_stmt; use regex::Regex; @@ -42,6 +45,8 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; +use sql::statements::sql_value_to_value; +use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterKind, AlterTableRequest, TableOptions}; @@ -52,9 +57,9 @@ use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu, CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu, - InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu, - TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - UnrecognizedTableOptionSnafu, + InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, + ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::statement::show::create_partitions_stmt; @@ -730,13 +735,17 @@ fn find_partition_columns(partitions: &Option) -> Result Ok(columns) } +/// Parse [Partitions] into a group of partition entries. +/// +/// Returns a list of [PartitionBound], each of which defines a partition. fn find_partition_entries( create_table: &CreateTableExpr, partitions: &Option, partition_columns: &[String], - _query_ctx: &QueryContextRef, + query_ctx: &QueryContextRef, ) -> Result>> { - let entries = if let Some(_partitions) = partitions { + let entries = if let Some(partitions) = partitions { + // extract concrete data type of partition columns let column_defs = partition_columns .iter() .map(|pc| { @@ -748,24 +757,103 @@ fn find_partition_entries( .unwrap() }) .collect::>(); - let mut column_name_and_type = Vec::with_capacity(column_defs.len()); + let mut column_name_and_type = HashMap::with_capacity(column_defs.len()); for column in column_defs { let column_name = &column.name; let data_type = ConcreteDataType::from( ColumnDataTypeWrapper::try_new(column.data_type, column.datatype_extension.clone()) .context(ColumnDataTypeSnafu)?, ); - column_name_and_type.push((column_name, data_type)); + column_name_and_type.insert(column_name, data_type); } - // TODO(ruihang): implement the partition value parser. - vec![vec![PartitionBound::MaxValue]] + // Transform parser expr to partition expr + let mut partition_exprs = Vec::with_capacity(partitions.exprs.len()); + for partition in &partitions.exprs { + let partition_expr = + convert_one_expr(partition, &column_name_and_type, &query_ctx.timezone())?; + partition_exprs.push(vec![PartitionBound::Expr(partition_expr)]); + } + + // fallback for no expr + if partition_exprs.is_empty() { + partition_exprs.push(vec![PartitionBound::MaxValue]); + } + + partition_exprs } else { vec![vec![PartitionBound::MaxValue]] }; Ok(entries) } +fn convert_one_expr( + expr: &Expr, + column_name_and_type: &HashMap<&String, ConcreteDataType>, + timezone: &Timezone, +) -> Result { + let Expr::BinaryOp { left, op, right } = expr else { + return InvalidPartitionRuleSnafu { + reason: "partition rule must be a binary expression", + } + .fail(); + }; + + let op = + RestrictedOp::try_from_parser(&op.clone()).with_context(|| InvalidPartitionRuleSnafu { + reason: format!("unsupported operator in partition expr {op}"), + })?; + + // convert leaf node. + let (lhs, op, rhs) = match (left.as_ref(), right.as_ref()) { + (Expr::Identifier(ident), Expr::Value(value)) => { + let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?; + let value = convert_value(value, data_type, timezone)?; + (Operand::Column(column_name), op, Operand::Value(value)) + } + (Expr::Value(value), Expr::Identifier(ident)) => { + let (column_name, data_type) = convert_identifier(ident, column_name_and_type)?; + let value = convert_value(value, data_type, timezone)?; + (Operand::Value(value), op, Operand::Column(column_name)) + } + (Expr::BinaryOp { .. }, Expr::BinaryOp { .. }) => { + // sub-expr must against another sub-expr + let lhs = convert_one_expr(left, column_name_and_type, timezone)?; + let rhs = convert_one_expr(right, column_name_and_type, timezone)?; + (Operand::Expr(lhs), op, Operand::Expr(rhs)) + } + _ => { + return InvalidPartitionRuleSnafu { + reason: format!("invalid partition expr {expr}"), + } + .fail(); + } + }; + + Ok(PartitionExpr::new(lhs, op, rhs)) +} + +fn convert_identifier( + ident: &Ident, + column_name_and_type: &HashMap<&String, ConcreteDataType>, +) -> Result<(String, ConcreteDataType)> { + let column_name = ident.value.clone(); + let data_type = column_name_and_type + .get(&column_name) + .cloned() + .with_context(|| ColumnNotFoundSnafu { msg: &column_name })?; + Ok((column_name, data_type)) +} + +fn convert_value( + value: &ParserValue, + data_type: ConcreteDataType, + timezone: &Timezone, +) -> Result { + sql_value_to_value("", &data_type, value, Some(timezone)).context(ParseSqlValueSnafu) +} + +/// Merge table level table options with schema level table options. fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions { table_opts.ttl = table_opts.ttl.or(schema_opts.ttl); table_opts @@ -821,10 +909,10 @@ mod test { ( r" CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) -PARTITION BY RANGE COLUMNS (b) ( - PARTITION r0 VALUES LESS THAN ('hz'), - PARTITION r1 VALUES LESS THAN ('sh'), - PARTITION r2 VALUES LESS THAN (MAXVALUE), +PARTITION ON COLUMNS (b) ( + b < 'hz', + b >= 'hz' AND b < 'sh', + b >= 'sh' ) ENGINE=mito", r#"[{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"hz\"}}"]},{"column_list":["b"],"value_list":["{\"Value\":{\"String\":\"sh\"}}"]},{"column_list":["b"],"value_list":["\"MaxValue\""]}]"#, @@ -834,8 +922,9 @@ ENGINE=mito", CREATE TABLE rcx ( a INT, b STRING, c TIMESTAMP, TIME INDEX (c) ) PARTITION BY RANGE COLUMNS (b, a) ( PARTITION r0 VALUES LESS THAN ('hz', 10), - PARTITION r1 VALUES LESS THAN ('sh', 20), - PARTITION r2 VALUES LESS THAN (MAXVALUE, MAXVALUE), + b < 'hz' AND a < 10, + b >= 'hz' AND b < 'sh' AND a >= 10 AND a < 20, + b >= 'sh' AND a >= 20 ) ENGINE=mito", r#"[{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"hz\"}}","{\"Value\":{\"Int32\":10}}"]},{"column_list":["b","a"],"value_list":["{\"Value\":{\"String\":\"sh\"}}","{\"Value\":{\"Int32\":20}}"]},{"column_list":["b","a"],"value_list":["\"MaxValue\"","\"MaxValue\""]}]"#, diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index a768a07c76..85bc439751 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -16,6 +16,7 @@ use common_meta::table_name::TableName; use common_query::Output; use common_telemetry::tracing; use partition::manager::PartitionInfo; +use partition::partition::PartitionBound; use session::context::QueryContextRef; use snafu::ResultExt; use sql::ast::Ident; @@ -88,10 +89,22 @@ pub(crate) fn create_partitions_stmt(partitions: Vec) -> Result, + pub(crate) op: RestrictedOp, + pub(crate) rhs: Box, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum Operand { + Column(String), + Value(Value), + Expr(PartitionExpr), +} + +/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in +/// partition expressions. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub enum RestrictedOp { + // Evaluate to binary + Eq, + NotEq, + Lt, + LtEq, + Gt, + GtEq, + + // Conjunction + And, + Or, +} + +impl RestrictedOp { + pub fn try_from_parser(op: &ParserBinaryOperator) -> Option { + match op { + ParserBinaryOperator::Eq => Some(Self::Eq), + ParserBinaryOperator::NotEq => Some(Self::NotEq), + ParserBinaryOperator::Lt => Some(Self::Lt), + ParserBinaryOperator::LtEq => Some(Self::LtEq), + ParserBinaryOperator::Gt => Some(Self::Gt), + ParserBinaryOperator::GtEq => Some(Self::GtEq), + ParserBinaryOperator::And => Some(Self::And), + ParserBinaryOperator::Or => Some(Self::Or), + _ => None, + } + } + + pub fn to_parser_op(&self) -> ParserBinaryOperator { + match self { + Self::Eq => ParserBinaryOperator::Eq, + Self::NotEq => ParserBinaryOperator::NotEq, + Self::Lt => ParserBinaryOperator::Lt, + Self::LtEq => ParserBinaryOperator::LtEq, + Self::Gt => ParserBinaryOperator::Gt, + Self::GtEq => ParserBinaryOperator::GtEq, + Self::And => ParserBinaryOperator::And, + Self::Or => ParserBinaryOperator::Or, + } + } +} + +impl PartitionExpr { + pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self { + Self { + lhs: Box::new(lhs), + op, + rhs: Box::new(rhs), + } + } + + /// Convert [Self] back to sqlparser's [Expr] + /// + /// [Expr]: ParserExpr + pub fn to_parser_expr(&self) -> ParserExpr { + // Safety: Partition rule won't contains unsupported value type. + // Otherwise it will be rejected by the parser. + let lhs = match &*self.lhs { + Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())), + Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()), + Operand::Expr(e) => e.to_parser_expr(), + }; + + let rhs = match &*self.rhs { + Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())), + Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()), + Operand::Expr(e) => e.to_parser_expr(), + }; + + ParserExpr::BinaryOp { + left: Box::new(lhs), + op: self.op.to_parser_op(), + right: Box::new(rhs), + } + } +} diff --git a/src/partition/src/lib.rs b/src/partition/src/lib.rs index 33d156c7be..1d64951e10 100644 --- a/src/partition/src/lib.rs +++ b/src/partition/src/lib.rs @@ -16,8 +16,10 @@ pub mod columns; pub mod error; +pub mod expr; pub mod manager; pub mod metrics; +pub mod multi_dim; pub mod partition; pub mod range; pub mod splitter; diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index b42db7e700..eb413c6341 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -30,6 +30,7 @@ use table::metadata::TableId; use crate::columns::RangeColumnsPartitionRule; use crate::error::{FindLeaderSnafu, Result}; +use crate::multi_dim::MultiDimPartitionRule; use crate::partition::{PartitionBound, PartitionDef, PartitionExpr}; use crate::range::RangePartitionRule; use crate::splitter::RowSplitter; @@ -122,12 +123,41 @@ impl PartitionRuleManager { Ok(results) } - /// Get partition rule of given table. pub async fn find_table_partition_rule(&self, table_id: TableId) -> Result { let partitions = self.find_table_partitions(table_id).await?; let partition_columns = partitions[0].partition.partition_columns(); + let regions = partitions + .iter() + .map(|x| x.id.region_number()) + .collect::>(); + + let exprs = partitions + .iter() + .filter_map(|x| match &x.partition.partition_bounds()[0] { + PartitionBound::Expr(e) => Some(e.clone()), + _ => None, + }) + .collect::>(); + Ok(Arc::new(MultiDimPartitionRule::new( + partition_columns.clone(), + regions, + exprs, + )) as _) + } + + /// Get partition rule of given table. + pub async fn find_table_partition_rule_deprecated( + &self, + table_id: TableId, + ) -> Result { + let partitions = self.find_table_partitions(table_id).await?; + + debug_assert!(!partitions.is_empty()); + + let partition_columns = partitions[0].partition.partition_columns(); + let regions = partitions .iter() .map(|x| x.id.region_number()) @@ -142,6 +172,7 @@ impl PartitionRuleManager { .filter_map(|info| match &info.partition.partition_bounds()[0] { PartitionBound::Value(v) => Some(v.clone()), PartitionBound::MaxValue => None, + PartitionBound::Expr(_) => None, }) .collect::>(); Arc::new(RangePartitionRule::new( @@ -266,10 +297,15 @@ fn create_partitions_from_region_routes( fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result> { let expr = filter.df_expr(); match expr { - DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if is_compare_op(op) => { + DfExpr::BinaryExpr(BinaryExpr { left, op, right }) if op.is_comparison_operator() => { let column_op_value = match (left.as_ref(), right.as_ref()) { (DfExpr::Column(c), DfExpr::Literal(v)) => Some((&c.name, *op, v)), - (DfExpr::Literal(v), DfExpr::Column(c)) => Some((&c.name, reverse_operator(op), v)), + (DfExpr::Literal(v), DfExpr::Column(c)) => Some(( + &c.name, + // Safety: previous branch ensures this is a comparison operator + op.swap().unwrap(), + v, + )), _ => None, }; if let Some((column, op, scalar)) = column_op_value { @@ -311,27 +347,3 @@ fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result>()) } - -#[inline] -fn is_compare_op(op: &Operator) -> bool { - matches!( - *op, - Operator::Eq - | Operator::NotEq - | Operator::Lt - | Operator::LtEq - | Operator::Gt - | Operator::GtEq - ) -} - -#[inline] -fn reverse_operator(op: &Operator) -> Operator { - match *op { - Operator::Lt => Operator::Gt, - Operator::Gt => Operator::Lt, - Operator::LtEq => Operator::GtEq, - Operator::GtEq => Operator::LtEq, - _ => *op, - } -} diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs new file mode 100644 index 0000000000..0d7d9587de --- /dev/null +++ b/src/partition/src/multi_dim.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cmp::Ordering; +use std::collections::HashMap; + +use datatypes::prelude::Value; +use serde::{Deserialize, Serialize}; +use snafu::ensure; +use store_api::storage::RegionNumber; + +use crate::error::{self, Result}; +use crate::expr::{Operand, PartitionExpr, RestrictedOp}; +use crate::PartitionRule; + +#[derive(Debug, Serialize, Deserialize)] +pub struct MultiDimPartitionRule { + partition_columns: Vec, + // name to index of `partition_columns` + name_to_index: HashMap, + regions: Vec, + exprs: Vec, +} + +impl MultiDimPartitionRule { + pub fn new( + partition_columns: Vec, + regions: Vec, + exprs: Vec, + ) -> Self { + let name_to_index = partition_columns + .iter() + .enumerate() + .map(|(i, name)| (name.clone(), i)) + .collect::>(); + + Self { + partition_columns, + name_to_index, + regions, + exprs, + } + } + + fn find_region(&self, values: &[Value]) -> Result { + ensure!( + values.len() == self.partition_columns.len(), + error::RegionKeysSizeSnafu { + expect: self.partition_columns.len(), + actual: values.len(), + } + ); + + for (region_index, expr) in self.exprs.iter().enumerate() { + if self.evaluate_expr(expr, values)? { + return Ok(self.regions[region_index]); + } + } + + // return the default region number + Ok(0) + } + + fn evaluate_expr(&self, expr: &PartitionExpr, values: &[Value]) -> Result { + match (expr.lhs.as_ref(), expr.rhs.as_ref()) { + (Operand::Column(name), Operand::Value(r)) => { + let index = self.name_to_index.get(name).unwrap(); + let l = &values[*index]; + Self::perform_op(l, &expr.op, r) + } + (Operand::Value(l), Operand::Column(name)) => { + let index = self.name_to_index.get(name).unwrap(); + let r = &values[*index]; + Self::perform_op(l, &expr.op, r) + } + (Operand::Expr(lhs), Operand::Expr(rhs)) => { + let lhs = self.evaluate_expr(lhs, values)?; + let rhs = self.evaluate_expr(rhs, values)?; + match expr.op { + RestrictedOp::And => Ok(lhs && rhs), + RestrictedOp::Or => Ok(lhs || rhs), + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + + fn perform_op(lhs: &Value, op: &RestrictedOp, rhs: &Value) -> Result { + let result = match op { + RestrictedOp::Eq => lhs.eq(rhs), + RestrictedOp::NotEq => lhs.ne(rhs), + RestrictedOp::Lt => lhs.partial_cmp(rhs) == Some(Ordering::Less), + RestrictedOp::LtEq => { + let result = lhs.partial_cmp(rhs); + result == Some(Ordering::Less) || result == Some(Ordering::Equal) + } + RestrictedOp::Gt => lhs.partial_cmp(rhs) == Some(Ordering::Greater), + RestrictedOp::GtEq => { + let result = lhs.partial_cmp(rhs); + result == Some(Ordering::Greater) || result == Some(Ordering::Equal) + } + RestrictedOp::And | RestrictedOp::Or => unreachable!(), + }; + + Ok(result) + } +} + +impl PartitionRule for MultiDimPartitionRule { + fn as_any(&self) -> &dyn Any { + self + } + + fn partition_columns(&self) -> Vec { + self.partition_columns.clone() + } + + fn find_region(&self, values: &[Value]) -> Result { + self.find_region(values) + } + + fn find_regions_by_exprs( + &self, + _exprs: &[crate::partition::PartitionExpr], + ) -> Result> { + Ok(self.regions.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + use crate::error; + + #[test] + fn test_find_region() { + // PARTITION ON COLUMNS (b) ( + // b < 'hz', + // b >= 'hz' AND b < 'sh', + // b >= 'sh' + // ) + let rule = MultiDimPartitionRule::new( + vec!["b".to_string()], + vec![1, 2, 3], + vec![ + PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::String("hz".into())), + ), + PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::String("hz".into())), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::Lt, + Operand::Value(datatypes::value::Value::String("sh".into())), + )), + ), + PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::GtEq, + Operand::Value(datatypes::value::Value::String("sh".into())), + ), + ], + ); + assert_matches!( + rule.find_region(&["foo".into(), 1000_i32.into()]), + Err(error::Error::RegionKeysSize { + expect: 1, + actual: 2, + .. + }) + ); + assert_matches!(rule.find_region(&["foo".into()]), Ok(1)); + assert_matches!(rule.find_region(&["bar".into()]), Ok(1)); + assert_matches!(rule.find_region(&["hz".into()]), Ok(2)); + assert_matches!(rule.find_region(&["hzz".into()]), Ok(2)); + assert_matches!(rule.find_region(&["sh".into()]), Ok(3)); + assert_matches!(rule.find_region(&["zzzz".into()]), Ok(3)); + } +} diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index f25ca63da2..9720c2c52b 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -49,6 +49,7 @@ pub trait PartitionRule: Sync + Send { pub enum PartitionBound { Value(Value), MaxValue, + Expr(crate::expr::PartitionExpr), } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -62,6 +63,7 @@ impl Display for PartitionBound { match self { Self::Value(v) => write!(f, "{}", v), Self::MaxValue => write!(f, "MAXVALUE"), + Self::Expr(e) => write!(f, "{:?}", e), } } } diff --git a/tests-fuzz/src/translator/mysql/create_expr.rs b/tests-fuzz/src/translator/mysql/create_expr.rs index c5fa04d0ae..9feeb8e973 100644 --- a/tests-fuzz/src/translator/mysql/create_expr.rs +++ b/tests-fuzz/src/translator/mysql/create_expr.rs @@ -98,6 +98,7 @@ impl CreateTableExprTranslator { _ => format!("{v}"), }, PartitionBound::MaxValue => "MAXVALUE".to_string(), + PartitionBound::Expr(expr) => expr.to_parser_expr().to_string(), } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index a7572066f7..db2de37f16 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -186,7 +186,6 @@ mod test { } #[tokio::test(flavor = "multi_thread")] - #[ignore = "TODO(ruihang): WIP new partition rule"] async fn test_distributed_insert_delete_and_query() { common_telemetry::init_default_ut_logging(); @@ -204,11 +203,11 @@ CREATE TABLE {table_name} ( ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b) -) PARTITION BY RANGE COLUMNS(a) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), +) PARTITION ON COLUMNS(a) ( + a < 10, + a >= 10 AND a < 20, + a >= 20 AND a < 50, + a >= 50 )" ); create_table(frontend, sql).await; diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index d1f4c8929d..965d571d5a 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -67,7 +67,6 @@ mod tests { } #[tokio::test(flavor = "multi_thread")] - #[ignore = "TODO(ruihang): WIP new partition rule"] async fn test_distributed_exec_sql() { common_telemetry::init_default_ut_logging(); @@ -85,11 +84,11 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host) ) - PARTITION BY RANGE COLUMNS (host) ( - PARTITION r0 VALUES LESS THAN ('550-A'), - PARTITION r1 VALUES LESS THAN ('550-W'), - PARTITION r2 VALUES LESS THAN ('MOSS'), - PARTITION r3 VALUES LESS THAN (MAXVALUE), + PARTITION ON COLUMNS (host) ( + host < '550-A', + host >= '550-A' AND host < '550-W', + host >= '550-W' AND host < 'MOSS', + host >= 'MOSS' ) engine=mito"#; create_table(instance, sql).await; diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index fae444ba7c..78653c1d15 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -83,7 +83,6 @@ async fn test_create_database_and_insert_query(instance: Arc) } #[apply(both_instances_cases)] -#[ignore = "TODO(ruihang): WIP new partition rule"] async fn test_show_create_table(instance: Arc) { let frontend = instance.frontend(); let sql = if instance.is_distributed_mode() { @@ -92,11 +91,11 @@ async fn test_show_create_table(instance: Arc) { ts timestamp, TIME INDEX(ts) ) -PARTITION BY RANGE COLUMNS (n) ( - PARTITION r0 VALUES LESS THAN (1), - PARTITION r1 VALUES LESS THAN (10), - PARTITION r2 VALUES LESS THAN (100), - PARTITION r3 VALUES LESS THAN (MAXVALUE), +PARTITION ON COLUMNS (n) ( + n < 1, + n >= 1 AND n < 10, + n >= 10 AND n < 100, + n >= 100 )"# } else { r#"create table demo( @@ -113,26 +112,26 @@ PARTITION BY RANGE COLUMNS (n) ( let output = execute_sql(&frontend, "show create table demo").await; let expected = if instance.is_distributed_mode() { - r#"+-------+--------------------------------------------+ -| Table | Create Table | -+-------+--------------------------------------------+ -| demo | CREATE TABLE IF NOT EXISTS "demo" ( | -| | "n" INT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("n") | -| | ) | -| | PARTITION BY RANGE COLUMNS ("n") ( | -| | PARTITION r0 VALUES LESS THAN (1), | -| | PARTITION r1 VALUES LESS THAN (10), | -| | PARTITION r2 VALUES LESS THAN (100), | -| | PARTITION r3 VALUES LESS THAN (MAXVALUE) | -| | ) | -| | ENGINE=mito | -| | WITH( | -| | regions = 4 | -| | ) | -+-------+--------------------------------------------+"# + r#"+-------+-------------------------------------+ +| Table | Create Table | ++-------+-------------------------------------+ +| demo | CREATE TABLE IF NOT EXISTS "demo" ( | +| | "n" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("n") | +| | ) | +| | PARTITION ON COLUMNS ("n") ( | +| | n < 1, | +| | n >= 100, | +| | n >= 1 AND n < 10, | +| | n >= 10 AND n < 100 | +| | ) | +| | ENGINE=mito | +| | WITH( | +| | regions = 4 | +| | ) | ++-------+-------------------------------------+"# } else { r#"+-------+-------------------------------------+ | Table | Create Table | diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index d9bf85844f..42560e46f4 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -29,7 +29,6 @@ http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); // region_failover_tests!(File, S3, S3WithCache, Oss, Azblob); sql_tests!(File); -// TODO(ruihang): re-enable this when the new partition rule is ready -// region_migration_tests!(File); +region_migration_tests!(File); // TODO(niebayes): add integration tests for remote wal. diff --git a/tests/cases/distributed/create/partition.result b/tests/cases/distributed/create/partition.result deleted file mode 100644 index bc18c94ae7..0000000000 --- a/tests/cases/distributed/create/partition.result +++ /dev/null @@ -1,36 +0,0 @@ -CREATE TABLE my_table ( - a INT PRIMARY KEY, - b STRING, - ts TIMESTAMP TIME INDEX, -) -PARTITION ON COLUMNS (a) ( - a < 1000, - a >= 1000 AND a < 2000, - a >= 2000 -); - -Affected Rows: 0 - --- SQLNESS REPLACE (\d{13}) ID -SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; - -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ -| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ -| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID | -+---------------+--------------+------------+----------------+---------------------------------+-----------------------+ - --- SQLNESS REPLACE (\d{13}) REGION_ID --- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; - -+---------------+---------+-----------+--------+ -| region_id | peer_id | is_leader | status | -+---------------+---------+-----------+--------+ -| REGION_ID | PEER_ID | Yes | ALIVE | -+---------------+---------+-----------+--------+ - -DROP TABLE my_table; - -Affected Rows: 0 - diff --git a/tests/cases/distributed/create/partition.sql b/tests/cases/distributed/create/partition.sql deleted file mode 100644 index 963b548a99..0000000000 --- a/tests/cases/distributed/create/partition.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE TABLE my_table ( - a INT PRIMARY KEY, - b STRING, - ts TIMESTAMP TIME INDEX, -) -PARTITION ON COLUMNS (a) ( - a < 1000, - a >= 1000 AND a < 2000, - a >= 2000 -); - --- SQLNESS REPLACE (\d{13}) ID -SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; - --- SQLNESS REPLACE (\d{13}) REGION_ID --- SQLNESS REPLACE (\d{1}) PEER_ID -SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; - -DROP TABLE my_table; diff --git a/tests/cases/standalone/common/partition.result b/tests/cases/standalone/common/partition.result new file mode 100644 index 0000000000..3d1c1359d8 --- /dev/null +++ b/tests/cases/standalone/common/partition.result @@ -0,0 +1,170 @@ +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + ++---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | ++---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ +| greptime | public | my_table | p0 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(1000)) }) | ID | +| greptime | public | my_table | p1 | (a) VALUES LESS THAN (PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(2000)) }) | ID | +| greptime | public | my_table | p2 | (a) VALUES LESS THAN (PartitionExpr { lhs: Expr(PartitionExpr { lhs: Column("a"), op: GtEq, rhs: Value(Int32(1000)) }), op: And, rhs: Expr(PartitionExpr { lhs: Column("a"), op: Lt, rhs: Value(Int32(2000)) }) }) | ID | ++---------------+--------------+------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+ + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + ++---------------+---------+-----------+--------+ +| region_id | peer_id | is_leader | status | ++---------------+---------+-----------+--------+ +| REGION_ID | PEER_ID | Yes | ALIVE | +| REGION_ID | PEER_ID | Yes | ALIVE | +| REGION_ID | PEER_ID | Yes | ALIVE | ++---------------+---------+-----------+--------+ + +INSERT INTO my_table VALUES + (100, 'a', 1), + (200, 'b', 2), + (1100, 'c', 3), + (1200, 'd', 4), + (2000, 'e', 5), + (2100, 'f', 6), + (2200, 'g', 7), + (2400, 'h', 8); + +Affected Rows: 8 + +SELECT * FROM my_table; + ++------+---+-------------------------+ +| a | b | ts | ++------+---+-------------------------+ +| 100 | a | 1970-01-01T00:00:00.001 | +| 200 | b | 1970-01-01T00:00:00.002 | +| 1100 | c | 1970-01-01T00:00:00.003 | +| 1200 | d | 1970-01-01T00:00:00.004 | +| 2000 | e | 1970-01-01T00:00:00.005 | +| 2100 | f | 1970-01-01T00:00:00.006 | +| 2200 | g | 1970-01-01T00:00:00.007 | +| 2400 | h | 1970-01-01T00:00:00.008 | ++------+---+-------------------------+ + +DELETE FROM my_table WHERE a < 150; + +Affected Rows: 1 + +SELECT * FROM my_table; + ++------+---+-------------------------+ +| a | b | ts | ++------+---+-------------------------+ +| 200 | b | 1970-01-01T00:00:00.002 | +| 1100 | c | 1970-01-01T00:00:00.003 | +| 1200 | d | 1970-01-01T00:00:00.004 | +| 2000 | e | 1970-01-01T00:00:00.005 | +| 2100 | f | 1970-01-01T00:00:00.006 | +| 2200 | g | 1970-01-01T00:00:00.007 | +| 2400 | h | 1970-01-01T00:00:00.008 | ++------+---+-------------------------+ + +DELETE FROM my_table WHERE a < 2200 AND a > 1500; + +Affected Rows: 2 + +SELECT * FROM my_table; + ++------+---+-------------------------+ +| a | b | ts | ++------+---+-------------------------+ +| 200 | b | 1970-01-01T00:00:00.002 | +| 1100 | c | 1970-01-01T00:00:00.003 | +| 1200 | d | 1970-01-01T00:00:00.004 | +| 2200 | g | 1970-01-01T00:00:00.007 | +| 2400 | h | 1970-01-01T00:00:00.008 | ++------+---+-------------------------+ + +DELETE FROM my_table WHERE a < 2500; + +Affected Rows: 5 + +SELECT * FROM my_table; + +++ +++ + +DROP TABLE my_table; + +Affected Rows: 0 + +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) (); + +Affected Rows: 0 + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ +| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID | ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + ++---------------+---------+-----------+--------+ +| region_id | peer_id | is_leader | status | ++---------------+---------+-----------+--------+ +| REGION_ID | PEER_ID | Yes | ALIVE | ++---------------+---------+-----------+--------+ + +INSERT INTO my_table VALUES + (100, 'a', 1), + (200, 'b', 2), + (1100, 'c', 3), + (1200, 'd', 4), + (2000, 'e', 5), + (2100, 'f', 6), + (2200, 'g', 7), + (2400, 'h', 8); + +Affected Rows: 8 + + +SELECT * FROM my_table; + ++------+---+-------------------------+ +| a | b | ts | ++------+---+-------------------------+ +| 100 | a | 1970-01-01T00:00:00.001 | +| 200 | b | 1970-01-01T00:00:00.002 | +| 1100 | c | 1970-01-01T00:00:00.003 | +| 1200 | d | 1970-01-01T00:00:00.004 | +| 2000 | e | 1970-01-01T00:00:00.005 | +| 2100 | f | 1970-01-01T00:00:00.006 | +| 2200 | g | 1970-01-01T00:00:00.007 | +| 2400 | h | 1970-01-01T00:00:00.008 | ++------+---+-------------------------+ + +DROP TABLE my_table; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/partition.sql b/tests/cases/standalone/common/partition.sql new file mode 100644 index 0000000000..cc0c962743 --- /dev/null +++ b/tests/cases/standalone/common/partition.sql @@ -0,0 +1,71 @@ +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) ( + a < 1000, + a >= 1000 AND a < 2000, + a >= 2000 +); + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + +INSERT INTO my_table VALUES + (100, 'a', 1), + (200, 'b', 2), + (1100, 'c', 3), + (1200, 'd', 4), + (2000, 'e', 5), + (2100, 'f', 6), + (2200, 'g', 7), + (2400, 'h', 8); + +SELECT * FROM my_table; + +DELETE FROM my_table WHERE a < 150; + +SELECT * FROM my_table; + +DELETE FROM my_table WHERE a < 2200 AND a > 1500; + +SELECT * FROM my_table; + +DELETE FROM my_table WHERE a < 2500; + +SELECT * FROM my_table; + +DROP TABLE my_table; + +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION ON COLUMNS (a) (); + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + +INSERT INTO my_table VALUES + (100, 'a', 1), + (200, 'b', 2), + (1100, 'c', 3), + (1200, 'd', 4), + (2000, 'e', 5), + (2100, 'f', 6), + (2200, 'g', 7), + (2400, 'h', 8); + +SELECT * FROM my_table; + +DROP TABLE my_table; diff --git a/tests/cases/standalone/common/show/show_create.result b/tests/cases/standalone/common/show/show_create.result index 5db34bb81d..dbc714849b 100644 --- a/tests/cases/standalone/common/show/show_create.result +++ b/tests/cases/standalone/common/show/show_create.result @@ -35,11 +35,13 @@ SHOW CREATE TABLE system_metrics; | | PRIMARY KEY ("id", "host") | | | ) | | | PARTITION ON COLUMNS ("id") ( | -| | | +| | id < 5, | +| | id >= 9, | +| | id >= 5 AND id < 9 | | | ) | | | ENGINE=mito | | | WITH( | -| | regions = 1, | +| | regions = 3, | | | ttl = '7days', | | | write_buffer_size = '1.0KiB' | | | ) |