feat!: new partition grammar - parser part (#3347)

* parser part

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix test in sql

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* comment out and ignore some logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness cases

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* temporary disable region migration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* allow dead code

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-02-27 15:20:16 +08:00
committed by GitHub
parent 492a00969d
commit 3544c9334c
17 changed files with 264 additions and 516 deletions

View File

@@ -37,11 +37,8 @@ use partition::partition::{PartitionBound, PartitionDef};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, Partitions};
use sql::statements::sql_value_to_value;
use sql::MAXVALUE;
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -52,8 +49,8 @@ use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
CreateLogicalTablesSnafu, CreateTableInfoSnafu, CreateTableWithMultiCatalogsSnafu,
CreateTableWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyCreateTableExprSnafu,
InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, ParseSqlSnafu, Result,
SchemaNotFoundSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
InvalidPartitionColumnsSnafu, InvalidTableNameSnafu, Result, SchemaNotFoundSnafu,
TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
@@ -564,6 +561,7 @@ fn validate_partition_columns(
Ok(())
}
/// Parse partition statement [Partitions] into [MetaPartition] and partition columns.
fn parse_partitions(
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
@@ -690,9 +688,9 @@ fn find_partition_entries(
create_table: &CreateTableExpr,
partitions: &Option<Partitions>,
partition_columns: &[String],
query_ctx: &QueryContextRef,
_query_ctx: &QueryContextRef,
) -> Result<Vec<Vec<PartitionBound>>> {
let entries = if let Some(partitions) = partitions {
let entries = if let Some(_partitions) = partitions {
let column_defs = partition_columns
.iter()
.map(|pc| {
@@ -714,24 +712,8 @@ fn find_partition_entries(
column_name_and_type.push((column_name, data_type));
}
let mut entries = Vec::with_capacity(partitions.entries.len());
for e in partitions.entries.iter() {
let mut values = Vec::with_capacity(e.value_list.len());
for (i, v) in e.value_list.iter().enumerate() {
// indexing is safe here because we have checked that "value_list" and "column_list" are matched in size
let (column_name, data_type) = &column_name_and_type[i];
let v = match v {
SqlValue::Number(n, _) if n == MAXVALUE => PartitionBound::MaxValue,
_ => PartitionBound::Value(
sql_value_to_value(column_name, data_type, v, Some(&query_ctx.timezone()))
.context(ParseSqlSnafu)?,
),
};
values.push(v);
}
entries.push(values);
}
entries
// TODO(ruihang): implement the partition value parser.
vec![vec![PartitionBound::MaxValue]]
} else {
vec![vec![PartitionBound::MaxValue]]
};
@@ -786,6 +768,7 @@ mod test {
}
#[tokio::test]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_parse_partitions() {
common_telemetry::init_default_ut_logging();
let cases = [

View File

@@ -16,13 +16,11 @@ use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::tracing;
use partition::manager::PartitionInfo;
use partition::partition::PartitionBound;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::ast::Ident;
use sql::statements::create::Partitions;
use sql::statements::show::{ShowDatabases, ShowTables, ShowVariables};
use sql::{statements, MAXVALUE};
use table::TableRef;
use crate::error::{self, ExecuteStatementSnafu, Result};
@@ -90,30 +88,10 @@ fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Parti
.map(|name| name[..].into())
.collect();
let entries = partitions
.into_iter()
.map(|info| {
// Generated the partition name from id
let name = &format!("r{}", info.id.region_number());
let bounds = info.partition.partition_bounds();
let value_list = bounds
.iter()
.map(|b| match b {
PartitionBound::Value(v) => statements::value_to_sql_value(v)
.with_context(|_| error::ConvertSqlValueSnafu { value: v.clone() }),
PartitionBound::MaxValue => Ok(SqlValue::Number(MAXVALUE.to_string(), false)),
})
.collect::<Result<Vec<_>>>()?;
Ok(PartitionEntry {
name: name[..].into(),
value_list,
})
})
.collect::<Result<Vec<_>>>()?;
// TODO(ruihang): convert partition info back to partition expr
Ok(Some(Partitions {
column_list,
entries,
exprs: vec![],
}))
}

View File

@@ -12,15 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp::Ordering;
use std::collections::HashMap;
use common_catalog::consts::default_engine;
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use once_cell::sync::Lazy;
use snafu::{ensure, OptionExt, ResultExt};
use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Value};
use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Expr};
use sqlparser::dialect::keywords::Keyword;
use sqlparser::keywords::ALL_KEYWORDS;
use sqlparser::parser::IsOptional::Mandatory;
@@ -28,27 +25,22 @@ use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, TokenWithLocation, Word};
use table::requests::valid_table_option;
use crate::ast::{ColumnDef, Ident, TableConstraint, Value as SqlValue};
use crate::ast::{ColumnDef, Ident, TableConstraint};
use crate::error::{
self, InvalidColumnOptionSnafu, InvalidTableOptionSnafu, InvalidTimeIndexSnafu,
MissingTimeIndexSnafu, Result, SyntaxSnafu,
};
use crate::parser::ParserContext;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateTable, PartitionEntry, Partitions, TIME_INDEX,
CreateDatabase, CreateExternalTable, CreateTable, Partitions, TIME_INDEX,
};
use crate::statements::get_data_type_by_alias_name;
use crate::statements::statement::Statement;
use crate::statements::{
get_data_type_by_alias_name, sql_data_type_to_concrete_data_type, sql_value_to_value,
};
use crate::util::parse_option_string;
pub const ENGINE: &str = "ENGINE";
pub const MAXVALUE: &str = "MAXVALUE";
static LESS: Lazy<Token> = Lazy::new(|| Token::make_keyword("LESS"));
static THAN: Lazy<Token> = Lazy::new(|| Token::make_keyword("THAN"));
/// Parses create [table] statement
impl<'a> ParserContext<'a> {
pub(crate) fn parse_create(&mut self) -> Result<Statement> {
@@ -188,17 +180,17 @@ impl<'a> ParserContext<'a> {
Ok(Statement::CreateTable(create_table))
}
// "PARTITION BY ..." syntax:
// https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html
/// "PARTITION BY ..." syntax:
// TODO(ruihang): docs
fn parse_partitions(&mut self) -> Result<Option<Partitions>> {
if !self.parser.parse_keyword(Keyword::PARTITION) {
return Ok(None);
}
self.parser
.expect_keywords(&[Keyword::BY, Keyword::RANGE, Keyword::COLUMNS])
.expect_keywords(&[Keyword::ON, Keyword::COLUMNS])
.context(error::UnexpectedSnafu {
sql: self.sql,
expected: "BY, RANGE, COLUMNS",
expected: "ON, COLUMNS",
actual: self.peek_token_as_string(),
})?;
@@ -211,46 +203,13 @@ impl<'a> ParserContext<'a> {
.map(Self::canonicalize_identifier)
.collect();
let entries = self.parse_comma_separated(Self::parse_partition_entry)?;
let exprs = self.parse_comma_separated(Self::parse_partition_entry)?;
Ok(Some(Partitions {
column_list,
entries,
}))
Ok(Some(Partitions { column_list, exprs }))
}
fn parse_partition_entry(&mut self) -> Result<PartitionEntry> {
self.parser
.expect_keyword(Keyword::PARTITION)
.context(error::UnexpectedSnafu {
sql: self.sql,
expected: "PARTITION",
actual: self.peek_token_as_string(),
})?;
let name = self.parser.parse_identifier().context(error::SyntaxSnafu)?;
self.parser
.expect_keyword(Keyword::VALUES)
.and_then(|_| self.parser.expect_token(&LESS))
.and_then(|_| self.parser.expect_token(&THAN))
.context(error::SyntaxSnafu)?;
let value_list = self.parse_comma_separated(Self::parse_value_list)?;
Ok(PartitionEntry { name, value_list })
}
fn parse_value_list(&mut self) -> Result<SqlValue> {
let token = self.parser.peek_token().token;
let value = match token {
Token::Word(Word { value, .. }) if value == MAXVALUE => {
let _ = self.parser.next_token();
SqlValue::Number(MAXVALUE.to_string(), false)
}
_ => self.parser.parse_value().context(error::SyntaxSnafu)?,
};
Ok(value)
fn parse_partition_entry(&mut self) -> Result<Expr> {
self.parser.parse_expr().context(error::SyntaxSnafu)
}
/// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F`
@@ -692,146 +651,60 @@ fn get_real_timestamp_type(data_type: &DataType) -> DataType {
fn validate_partitions(columns: &[ColumnDef], partitions: &Partitions) -> Result<()> {
let partition_columns = ensure_partition_columns_defined(columns, partitions)?;
ensure_partition_names_no_duplicate(partitions)?;
ensure_value_list_len_matches_columns(partitions, &partition_columns)?;
let value_lists = ensure_value_lists_strictly_increased(partitions, partition_columns)?;
ensure_value_lists_bounded_by_maxvalue(value_lists)?;
ensure_exprs_are_binary(&partitions.exprs, &partition_columns)?;
Ok(())
}
/// Ensure that partition ranges fully cover all values.
// Simply check the last partition is bounded by "MAXVALUE"s.
// MySQL does not have this restriction. However, I think we'd better have it because:
// - It might save user from adding more partitions in the future by hand, which is often
// a tedious task. Why not provide an extra partition at the beginning and leave all
// other partition related jobs to us? I think it's a reasonable argument to user.
// - It might save us from some ugly designs and codings. The "MAXVALUE" bound is natural
// in dealing with values that are unspecified upfront. Without it, we have to store
// and use the user defined max bound everywhere, starting from calculating regions by
// partition rule in Frontend, to automatically split and merge regions in Meta.
fn ensure_value_lists_bounded_by_maxvalue(value_lists: Vec<&Vec<Value>>) -> Result<()> {
let is_maxvalue_bound = value_lists.last().map(|v| {
v.iter()
.all(|x| matches!(x, SqlValue::Number(s, _) if s == MAXVALUE))
});
ensure!(
matches!(is_maxvalue_bound, Some(true)),
error::InvalidSqlSnafu {
msg: "Please provide an extra partition that is bounded by 'MAXVALUE'."
/// Ensure all exprs are binary expr and all the columns are defined in the column list.
fn ensure_exprs_are_binary(exprs: &[Expr], columns: &[&ColumnDef]) -> Result<()> {
for expr in exprs {
// The first level must be binary expr
if let Expr::BinaryOp { left, op: _, right } = expr {
ensure_one_expr(left, columns)?;
ensure_one_expr(right, columns)?;
} else {
return error::InvalidSqlSnafu {
msg: format!("Partition rule expr {:?} is not a binary expr!", expr),
}
.fail();
}
);
Ok(())
}
fn is_string_value(v: &SqlValue) -> bool {
matches!(
v,
SqlValue::DoubleQuotedString(_) | SqlValue::SingleQuotedString(_)
)
}
/// Ensure that value lists of partitions are strictly increasing.
fn ensure_value_lists_strictly_increased<'a>(
partitions: &'a Partitions,
partition_columns: Vec<&'a ColumnDef>,
) -> Result<Vec<&'a Vec<Value>>> {
let value_lists = partitions
.entries
.iter()
.map(|x| &x.value_list)
.collect::<Vec<_>>();
for i in 1..value_lists.len() {
let mut equal_tuples = 0;
for (n, (x, y)) in value_lists[i - 1]
.iter()
.zip(value_lists[i].iter())
.enumerate()
{
let column = partition_columns[n];
let is_x_maxvalue = matches!(x, SqlValue::Number(s, _) if s == MAXVALUE);
let is_y_maxvalue = matches!(y, SqlValue::Number(s, _) if s == MAXVALUE);
match (is_x_maxvalue, is_y_maxvalue) {
(true, true) => {
equal_tuples += 1;
}
(false, false) => {
let column_name = &column.name.value;
let cdt = sql_data_type_to_concrete_data_type(&column.data_type)?;
if matches!(
cdt,
ConcreteDataType::Timestamp(_)
| ConcreteDataType::Date(_)
| ConcreteDataType::DateTime(_)
) {
// Date/Timestamp/Datetime need to be aware of timezone information
// when converting from a string to a specific type.
// If x and y have only one value type as a string,
// comparison is not allowed.
match (is_string_value(x), is_string_value(y)) {
(true, false) | (false, true) => {
return error::InvalidSqlSnafu {
msg: format!(
"Can't compare {:?} with {:?} in partition rules",
x, y
),
}
.fail();
}
_ => {}
}
}
// We only want to comnpare the `x` and `y` values,
// so the `timezone` can be ignored.
let x = sql_value_to_value(column_name, &cdt, x, None)?;
let y = sql_value_to_value(column_name, &cdt, y, None)?;
match x.cmp(&y) {
Ordering::Less => break,
Ordering::Equal => equal_tuples += 1,
Ordering::Greater => return error::InvalidSqlSnafu {
msg: "VALUES LESS THAN value must be strictly increasing for each partition.",
}.fail()
}
}
(true, false) => return error::InvalidSqlSnafu {
msg: "VALUES LESS THAN value must be strictly increasing for each partition.",
}
.fail(),
(false, true) => break,
}
}
ensure!(
equal_tuples < partition_columns.len(),
error::InvalidSqlSnafu {
msg: "VALUES LESS THAN value must be strictly increasing for each partition.",
}
);
}
Ok(value_lists)
}
/// Ensure that value list's length matches the column list.
fn ensure_value_list_len_matches_columns(
partitions: &Partitions,
partition_columns: &[&ColumnDef],
) -> Result<()> {
for entry in partitions.entries.iter() {
ensure!(
entry.value_list.len() == partition_columns.len(),
error::InvalidSqlSnafu {
msg: "Partition value list does not match column list.",
}
);
}
Ok(())
}
/// Ensure that all columns used in "PARTITION BY RANGE COLUMNS" are defined in create table.
/// Check if the expr is a binary expr, an ident or a literal value.
/// If is ident, then check it is in the column list.
/// This recursive function is intended to be used by [ensure_exprs_are_binary].
fn ensure_one_expr(expr: &Expr, columns: &[&ColumnDef]) -> Result<()> {
match expr {
Expr::BinaryOp { left, op: _, right } => {
ensure_one_expr(left, columns)?;
ensure_one_expr(right, columns)?;
Ok(())
}
Expr::Identifier(ident) => {
let column_name = &ident.value;
ensure!(
columns.iter().any(|c| &c.name.value == column_name),
error::InvalidSqlSnafu {
msg: format!(
"Column {:?} in rule expr is not referenced in PARTITION ON!",
column_name
),
}
);
Ok(())
}
Expr::Value(_) => Ok(()),
_ => error::InvalidSqlSnafu {
msg: format!("Partition rule expr {:?} is not a binary expr!", expr),
}
.fail(),
}
}
/// Ensure that all columns used in "PARTITION ON COLUMNS" are defined in create table.
fn ensure_partition_columns_defined<'a>(
columns: &'a [ColumnDef],
partitions: &'a Partitions,
@@ -852,25 +725,6 @@ fn ensure_partition_columns_defined<'a>(
.collect::<Result<Vec<&ColumnDef>>>()
}
/// Ensure that partition names do not duplicate.
fn ensure_partition_names_no_duplicate(partitions: &Partitions) -> Result<()> {
let partition_names = partitions
.entries
.iter()
.map(|x| &x.name.value)
.sorted()
.collect::<Vec<&String>>();
for w in partition_names.windows(2) {
ensure!(
w[0] != w[1],
error::InvalidSqlSnafu {
msg: format!("Duplicate partition names: {}", w[0]),
}
)
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
@@ -879,6 +733,7 @@ mod tests {
use common_catalog::consts::FILE_ENGINE;
use common_error::ext::ErrorExt;
use sqlparser::ast::ColumnOption::NotNull;
use sqlparser::ast::{BinaryOperator, Value};
use super::*;
use crate::dialect::GreptimeDbDialect;
@@ -1050,10 +905,11 @@ mod tests {
fn test_validate_create() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp TIME INDEX)
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
PARTITION ON COLUMNS(c, a) (
a < 10,
a > 10 AND a < 20,
a > 20 AND c < 100,
a > 20 AND c >= 100
)
ENGINE=mito";
let result =
@@ -1062,11 +918,7 @@ ENGINE=mito";
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, x) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
PARTITION ON COLUMNS(x) ()
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
@@ -1074,98 +926,6 @@ ENGINE=mito";
.unwrap_err()
.to_string()
.contains("Partition column \"x\" not defined!"));
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r2 VALUES LESS THAN ('sz', 3000),
PARTITION r1 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result
.unwrap_err()
.to_string()
.contains("Duplicate partition names: r1"));
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh'),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result
.unwrap_err()
.to_string()
.contains("Partition value list does not match column list"));
let cases = vec![
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('sh', 1000),
PARTITION r1 VALUES LESS THAN ('hz', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 2000),
PARTITION r1 VALUES LESS THAN ('hz', 1000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('hz', 1000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
)
ENGINE=mito",
r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, 2000),
PARTITION r1 VALUES LESS THAN ('sh', 3000),
)
ENGINE=mito",
];
for sql in cases {
let result = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
);
assert!(result
.unwrap_err()
.to_string()
.contains("VALUES LESS THAN value must be strictly increasing for each partition"));
}
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, 9999),
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result
.unwrap_err()
.to_string()
.contains("Please provide an extra partition that is bounded by 'MAXVALUE'."));
}
#[test]
@@ -1180,11 +940,11 @@ CREATE TABLE monitor (
TIME INDEX (ts),
PRIMARY KEY (host),
)
PARTITION BY RANGE COLUMNS(idc, host_id) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r2 VALUES LESS THAN ('sh', 3000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
PARTITION ON COLUMNS(idc, host_id) (
idc <= 'hz' AND host_id < 1000,
idc > 'hz' AND idc <= 'sh' AND host_id < 2000,
idc > 'sh' AND host_id < 3000,
idc > 'sh' AND host_id >= 3000,
)
ENGINE=mito";
let result =
@@ -1203,40 +963,89 @@ ENGINE=mito";
.collect::<Vec<&String>>();
assert_eq!(column_list, vec!["idc", "host_id"]);
let entries = &partitions.entries;
let partition_names = entries
.iter()
.map(|x| &x.name.value)
.collect::<Vec<&String>>();
assert_eq!(partition_names, vec!["r0", "r1", "r2", "r3"]);
let exprs = &partitions.exprs;
assert_eq!(
entries[0].value_list,
vec![
SqlValue::SingleQuotedString("hz".to_string()),
SqlValue::Number("1000".to_string(), false)
]
exprs[0],
Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("idc".into())),
op: BinaryOperator::LtEq,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"hz".to_string()
)))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("host_id".into())),
op: BinaryOperator::Lt,
right: Box::new(Expr::Value(Value::Number("1000".to_string(), false)))
})
}
);
assert_eq!(
entries[1].value_list,
vec![
SqlValue::SingleQuotedString("sh".to_string()),
SqlValue::Number("2000".to_string(), false)
]
exprs[1],
Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("idc".into())),
op: BinaryOperator::Gt,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"hz".to_string()
)))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("idc".into())),
op: BinaryOperator::LtEq,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"sh".to_string()
)))
})
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("host_id".into())),
op: BinaryOperator::Lt,
right: Box::new(Expr::Value(Value::Number("2000".to_string(), false)))
})
}
);
assert_eq!(
entries[2].value_list,
vec![
SqlValue::SingleQuotedString("sh".to_string()),
SqlValue::Number("3000".to_string(), false)
]
exprs[2],
Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("idc".into())),
op: BinaryOperator::Gt,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"sh".to_string()
)))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("host_id".into())),
op: BinaryOperator::Lt,
right: Box::new(Expr::Value(Value::Number("3000".to_string(), false)))
})
}
);
assert_eq!(
entries[3].value_list,
vec![
SqlValue::Number(MAXVALUE.to_string(), false),
SqlValue::Number(MAXVALUE.to_string(), false)
]
exprs[3],
Expr::BinaryOp {
left: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("idc".into())),
op: BinaryOperator::Gt,
right: Box::new(Expr::Value(Value::SingleQuotedString(
"sh".to_string()
)))
}),
op: BinaryOperator::And,
right: Box::new(Expr::BinaryOp {
left: Box::new(Expr::Identifier("host_id".into())),
op: BinaryOperator::GtEq,
right: Box::new(Expr::Value(Value::Number("3000".to_string(), false)))
})
}
);
}
_ => unreachable!(),
@@ -1493,31 +1302,15 @@ ENGINE=mito";
}
}
#[test]
fn test_parse_partitions_with_invalid_comparison() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT, ts timestamp time index)
PARTITION BY RANGE COLUMNS(ts) (
PARTITION r0 VALUES LESS THAN (1000),
PARTITION r1 VALUES LESS THAN ('2024-01-19 00:00:00'),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let err_msg = result.unwrap_err().output_msg();
assert!(err_msg.contains("Invalid SQL, error: Can't compare"));
}
#[test]
fn test_parse_partitions_with_error_syntax() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
PARTITION COLUMNS(c, a) (
a < 10,
a > 10 AND a < 20,
a > 20 AND c < 100,
a > 20 AND c >= 100
)
ENGINE=mito";
let result =
@@ -1525,37 +1318,49 @@ ENGINE=mito";
assert!(result
.unwrap_err()
.output_msg()
.contains("sql parser error: Expected BY, found: RANGE"));
.contains("sql parser error: Expected ON, found: COLUMNS"));
}
#[test]
fn test_parse_partitions_without_rule() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT, d TIMESTAMP TIME INDEX )
PARTITION ON COLUMNS(c, a) ()
ENGINE=mito";
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
}
#[test]
fn test_parse_partitions_unreferenced_column() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE),
PARTITION ON COLUMNS(c, a) (
b = 'foo'
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result
.unwrap_err()
.output_msg()
.contains("sql parser error: Expected LESS, found: THAN"));
assert_eq!(
result.unwrap_err().output_msg(),
"Invalid SQL, error: Column \"b\" in rule expr is not referenced in PARTITION ON!"
);
}
#[test]
fn test_parse_partitions_not_binary_expr() {
let sql = r"
CREATE TABLE rcx ( a INT, b STRING, c INT )
PARTITION BY RANGE COLUMNS(b, a) (
PARTITION r0 VALUES LESS THAN ('hz', 1000),
PARTITION r1 VALUES LESS THAN ('sh', 2000),
PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALU),
PARTITION ON COLUMNS(c, a) (
b
)
ENGINE=mito";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
assert!(result
.unwrap_err()
.output_msg()
.contains("Expected a concrete value, found: MAXVALU"));
assert_eq!(
result.unwrap_err().output_msg(),
"Invalid SQL, error: Partition rule expr Identifier(Ident { value: \"b\", quote_style: None }) is not a binary expr!"
);
}
fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) {

View File

@@ -16,6 +16,7 @@ use std::fmt::{Display, Formatter};
use common_catalog::consts::FILE_ENGINE;
use itertools::Itertools;
use sqlparser::ast::Expr;
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
@@ -128,7 +129,7 @@ impl CreateTable {
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
pub struct Partitions {
pub column_list: Vec<Ident>,
pub entries: Vec<PartitionEntry>,
pub exprs: Vec<Expr>,
}
impl Partitions {
@@ -162,9 +163,9 @@ impl Display for Partitions {
if !self.column_list.is_empty() {
write!(
f,
"PARTITION BY RANGE COLUMNS ({}) (\n{}\n)",
"PARTITION ON COLUMNS ({}) (\n{}\n)",
format_list_comma!(self.column_list),
format_list_indent!(self.entries),
format_list_indent!(self.exprs),
)
} else {
write!(f, "")
@@ -233,12 +234,11 @@ mod tests {
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (ts) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (host) (
host = 'a',
host > 'a',
)
engine=mito
with(regions=1, ttl='7d', storage='File');
@@ -259,12 +259,11 @@ CREATE TABLE IF NOT EXISTS demo (
cpu DOUBLE DEFAULT 0,
memory DOUBLE,
TIME INDEX (ts),
PRIMARY KEY (ts, host)
PRIMARY KEY (host)
)
PARTITION BY RANGE COLUMNS (ts) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE)
PARTITION ON COLUMNS (host) (
host = 'a',
host > 'a'
)
ENGINE=mito
WITH(
@@ -341,13 +340,9 @@ ENGINE=mito
cpu double default 0,
memory double,
TIME INDEX (ts),
PRIMARY KEY(ts, host)
)
PARTITION BY RANGE COLUMNS (ts) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PRIMARY KEY(host)
)
PARTITION ON COLUMNS (host) ()
engine=mito
with(regions=1, ttl='7d', hello='world');
";

View File

@@ -186,6 +186,7 @@ mod test {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_distributed_insert_delete_and_query() {
common_telemetry::init_default_ut_logging();

View File

@@ -67,6 +67,7 @@ mod tests {
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_distributed_exec_sql() {
common_telemetry::init_default_ut_logging();

View File

@@ -83,6 +83,7 @@ async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>)
}
#[apply(both_instances_cases)]
#[ignore = "TODO(ruihang): WIP new partition rule"]
async fn test_show_create_table(instance: Arc<dyn MockInstance>) {
let frontend = instance.frontend();
let sql = if instance.is_distributed_mode() {
@@ -1872,12 +1873,7 @@ async fn test_custom_storage(instance: Arc<dyn MockInstance>) {
a int null primary key,
ts timestamp time index,
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION r0 VALUES LESS THAN (1),
PARTITION r1 VALUES LESS THAN (10),
PARTITION r2 VALUES LESS THAN (100),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
)
PARTITION ON COLUMNS (a) ()
with(storage='{storage_name}')
"#
)
@@ -1926,15 +1922,12 @@ async fn test_custom_storage(instance: Arc<dyn MockInstance>) {
TIME INDEX ("ts"),
PRIMARY KEY ("a")
)
PARTITION BY RANGE COLUMNS ("a") (
PARTITION r0 VALUES LESS THAN (1),
PARTITION r1 VALUES LESS THAN (10),
PARTITION r2 VALUES LESS THAN (100),
PARTITION r3 VALUES LESS THAN (MAXVALUE)
PARTITION ON COLUMNS ("a") (
)
ENGINE=mito
WITH(
regions = 4,
regions = 1,
storage = '{storage_name}'
)"#
)

View File

@@ -19,6 +19,7 @@ mod http;
#[macro_use]
mod sql;
#[macro_use]
#[allow(dead_code)]
mod region_migration;
// #[macro_use]
// mod region_failover;
@@ -28,6 +29,7 @@ http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
// region_failover_tests!(File, S3, S3WithCache, Oss, Azblob);
sql_tests!(File);
region_migration_tests!(File);
// TODO(ruihang): re-enable this when the new partition rule is ready
// region_migration_tests!(File);
// TODO(niebayes): add integration tests for remote wal.

View File

@@ -816,10 +816,10 @@ async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId {
CREATE TABLE {TEST_TABLE_NAME} (
i INT PRIMARY KEY,
ts TIMESTAMP TIME INDEX,
) PARTITION BY RANGE COLUMNS (i) (
PARTITION r0 VALUES LESS THAN (10),
PARTITION r1 VALUES LESS THAN (50),
PARTITION r3 VALUES LESS THAN (MAXVALUE),
) PARTITION ON COLUMNS (i) (
i <= 10,
i > 10 AND i <= 50,
i > 50
)"
);
let mut result = cluster.frontend.do_query(&sql, QueryContext::arc()).await;

View File

@@ -3,10 +3,10 @@ CREATE TABLE my_table (
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
Affected Rows: 0
@@ -17,9 +17,7 @@ SELECT table_catalog, table_schema, table_name, partition_name, partition_expres
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (10) | ID |
| greptime | public | my_table | p1 | (a) VALUES LESS THAN (20) | ID |
| greptime | public | my_table | p2 | (a) VALUES LESS THAN (MAXVALUE) | ID |
| greptime | public | my_table | p0 | (a) VALUES LESS THAN (MAXVALUE) | ID |
+---------------+--------------+------------+----------------+---------------------------------+-----------------------+
-- SQLNESS REPLACE (\d{13}) REGION_ID
@@ -30,8 +28,6 @@ SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_re
| region_id | peer_id | is_leader | status |
+---------------+---------+-----------+--------+
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
| REGION_ID | PEER_ID | Yes | ALIVE |
+---------------+---------+-----------+--------+
DROP TABLE my_table;

View File

@@ -3,10 +3,10 @@ CREATE TABLE my_table (
b STRING,
ts TIMESTAMP TIME INDEX,
)
PARTITION BY RANGE COLUMNS (a) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (a) (
a < 1000,
a >= 1000 AND a < 2000,
a >= 2000
);
-- SQLNESS REPLACE (\d{13}) ID

View File

@@ -7,10 +7,10 @@ CREATE TABLE demo(
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A' AND host < '550-W',
host >= '550-W'
);
Affected Rows: 0

View File

@@ -7,10 +7,10 @@ CREATE TABLE demo(
TIME INDEX (ts),
PRIMARY KEY(host)
)
PARTITION BY RANGE COLUMNS (host) (
PARTITION r0 VALUES LESS THAN ('550-A'),
PARTITION r1 VALUES LESS THAN ('550-W'),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (host) (
host < '550-A',
host >= '550-A' AND host < '550-W',
host >= '550-W'
);
-- SQLNESS REPLACE (-+) -

View File

@@ -72,9 +72,7 @@ Affected Rows: 0
create table AbCdEfGe(
CoLA string PRIMARY KEY,
tS timestamp time index
) PARTITION BY RANGE COLUMNS (cOlA) (
PARTITION p0 VALUES LESS THAN (MAXVALUE)
);
) PARTITION ON COLUMNS (cOlA) ();
Affected Rows: 0

View File

@@ -29,9 +29,7 @@ drop table aBcDeFgE;
create table AbCdEfGe(
CoLA string PRIMARY KEY,
tS timestamp time index
) PARTITION BY RANGE COLUMNS (cOlA) (
PARTITION p0 VALUES LESS THAN (MAXVALUE)
);
) PARTITION ON COLUMNS (cOlA) ();
drop table abcdefge;

View File

@@ -7,10 +7,10 @@ CREATE TABLE system_metrics (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(
@@ -34,14 +34,12 @@ SHOW CREATE TABLE system_metrics;
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("id", "host") |
| | ) |
| | PARTITION BY RANGE COLUMNS ("id") ( |
| | PARTITION r0 VALUES LESS THAN (5), |
| | PARTITION r1 VALUES LESS THAN (9), |
| | PARTITION r2 VALUES LESS THAN (MAXVALUE) |
| | PARTITION ON COLUMNS ("id") ( |
| | |
| | ) |
| | ENGINE=mito |
| | WITH( |
| | regions = 3, |
| | regions = 1, |
| | ttl = '7days', |
| | write_buffer_size = '1.0KiB' |
| | ) |
@@ -86,10 +84,10 @@ CREATE TABLE not_supported_table_options_keys (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(
@@ -109,10 +107,10 @@ CREATE TABLE not_supported_table_storage_option (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(

View File

@@ -7,10 +7,10 @@ CREATE TABLE system_metrics (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(
@@ -39,10 +39,10 @@ CREATE TABLE not_supported_table_options_keys (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(
@@ -59,10 +59,10 @@ CREATE TABLE not_supported_table_storage_option (
TIME INDEX (ts),
PRIMARY KEY (id, host)
)
PARTITION BY RANGE COLUMNS (id) (
PARTITION r0 VALUES LESS THAN (5),
PARTITION r1 VALUES LESS THAN (9),
PARTITION r2 VALUES LESS THAN (MAXVALUE),
PARTITION ON COLUMNS (id) (
id < 5,
id >= 5 AND id < 9,
id >= 9
)
ENGINE=mito
WITH(