diff --git a/Cargo.lock b/Cargo.lock index b6ff256ba8..c269433a84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4755,6 +4755,8 @@ dependencies = [ "common-error", "common-time", "datatypes", + "itertools", + "once_cell", "snafu", "sqlparser", "table-engine", diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 5365b5d179..07de1893ab 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } +itertools = "0.10" +once_cell = "1.10" snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" table-engine = { path = "../table-engine" } diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 56daa3712e..ad6bfd1701 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -1,16 +1,27 @@ -use snafu::ensure; +use std::cmp::Ordering; + +use itertools::Itertools; +use once_cell::sync::Lazy; use snafu::ResultExt; +use snafu::{ensure, OptionExt}; +use sqlparser::ast::Value; +use sqlparser::dialect::keywords::Keyword; use sqlparser::parser::IsOptional::Mandatory; -use sqlparser::{dialect::keywords::Keyword, tokenizer::Token}; +use sqlparser::tokenizer::{Token, Word}; use table_engine::engine; -use crate::ast::{ColumnDef, Ident, TableConstraint}; +use crate::ast::{ColumnDef, Ident, TableConstraint, Value as SqlValue}; use crate::error::{self, InvalidTimeIndexSnafu, Result, SyntaxSnafu}; use crate::parser::ParserContext; -use crate::statements::create_table::{CreateTable, TIME_INDEX}; +use crate::statements::create_table::{CreateTable, PartitionEntry, Partitions, TIME_INDEX}; use crate::statements::statement::Statement; +use crate::statements::{sql_data_type_to_concrete_data_type, sql_value_to_value}; const ENGINE: &str = "ENGINE"; +const MAXVALUE: &str = "MAXVALUE"; + +static LESS: Lazy = Lazy::new(|| Token::make_keyword("LESS")); +static THAN: Lazy = Lazy::new(|| Token::make_keyword("THAN")); /// Parses create [table] statement impl<'a> ParserContext<'a> { @@ -27,13 +38,16 @@ impl<'a> ParserContext<'a> { .parse_object_name() .context(error::SyntaxSnafu { sql: self.sql })?; let (columns, constraints) = self.parse_columns()?; + + let partitions = self.parse_partitions()?; + let engine = self.parse_table_engine()?; let options = self .parser .parse_options(Keyword::WITH) .context(error::SyntaxSnafu { sql: self.sql })?; - Ok(Statement::Create(CreateTable { + let create_table = CreateTable { if_not_exists, name: table_name, columns, @@ -41,9 +55,95 @@ impl<'a> ParserContext<'a> { constraints, options, table_id: 0, // table id is assigned by catalog manager + partitions, + }; + validate_create(&create_table)?; + + Ok(Statement::Create(create_table)) + } + + // "PARTITION BY ..." syntax: + // https://dev.mysql.com/doc/refman/8.0/en/partitioning-columns-range.html + fn parse_partitions(&mut self) -> Result> { + if !self.parser.parse_keyword(Keyword::PARTITION) { + return Ok(None); + } + self.parser + .expect_keywords(&[Keyword::BY, Keyword::RANGE, Keyword::COLUMNS]) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let column_list = self + .parser + .parse_parenthesized_column_list(Mandatory) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let entries = self.parse_comma_separated(Self::parse_partition_entry)?; + + Ok(Some(Partitions { + column_list, + entries, })) } + fn parse_partition_entry(&mut self) -> Result { + self.parser + .expect_keyword(Keyword::PARTITION) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let name = self + .parser + .parse_identifier() + .context(error::SyntaxSnafu { sql: self.sql })?; + + self.parser + .expect_keyword(Keyword::VALUES) + .and_then(|_| self.parser.expect_token(&LESS)) + .and_then(|_| self.parser.expect_token(&THAN)) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let value_list = self.parse_comma_separated(Self::parse_value_list)?; + + Ok(PartitionEntry { name, value_list }) + } + + fn parse_value_list(&mut self) -> Result { + let token = self.parser.peek_token(); + let value = match token { + Token::Word(Word { value, .. }) if value == MAXVALUE => { + let _ = self.parser.next_token(); + SqlValue::Number(MAXVALUE.to_string(), false) + } + _ => self + .parser + .parse_value() + .context(error::SyntaxSnafu { sql: self.sql })?, + }; + Ok(value) + } + + /// Parse a comma-separated list wrapped by "()", and of which all items accepted by `F` + fn parse_comma_separated(&mut self, mut f: F) -> Result> + where + F: FnMut(&mut ParserContext<'a>) -> Result, + { + self.parser + .expect_token(&Token::LParen) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let mut values = vec![]; + while self.parser.peek_token() != Token::RParen { + values.push(f(self)?); + if !self.parser.consume_token(&Token::Comma) { + break; + } + } + + self.parser + .expect_token(&Token::RParen) + .context(error::SyntaxSnafu { sql: self.sql })?; + Ok(values) + } + fn parse_columns(&mut self) -> Result<(Vec, Vec)> { let mut columns = vec![]; let mut constraints = vec![]; @@ -157,6 +257,161 @@ impl<'a> ParserContext<'a> { } } +fn validate_create(create_table: &CreateTable) -> Result<()> { + if let Some(partitions) = &create_table.partitions { + validate_partitions(&create_table.columns, partitions)?; + } + Ok(()) +} + +fn validate_partitions(columns: &[ColumnDef], partitions: &Partitions) -> Result<()> { + let partition_columns = ensure_partition_columns_defined(columns, partitions)?; + + ensure_partition_names_no_duplicate(partitions)?; + + ensure_value_list_len_matches_columns(partitions, &partition_columns)?; + + let value_lists = ensure_value_lists_strictly_increased(partitions, partition_columns)?; + + ensure_value_lists_bounded_by_maxvalue(value_lists)?; + + Ok(()) +} + +/// Ensure that partition ranges fully cover all values. +// Simply check the last partition is bounded by "MAXVALUE"s. +// MySQL does not have this restriction. However, I think we'd better have it because: +// - It might save user from adding more partitions in the future by hand, which is often +// a tedious task. Why not provide an extra partition at the beginning and leave all +// other partition related jobs to us? I think it's a reasonable argument to user. +// - It might save us from some ugly designs and codings. The "MAXVALUE" bound is natural +// in dealing with values that are unspecified upfront. Without it, we have to store +// and use the user defined max bound everywhere, starting from calculating regions by +// partition rule in Frontend, to automatically split and merge regions in Meta. +fn ensure_value_lists_bounded_by_maxvalue(value_lists: Vec<&Vec>) -> Result<()> { + let is_maxvalue_bound = value_lists.last().map(|v| { + v.iter() + .all(|x| matches!(x, SqlValue::Number(s, _) if s == MAXVALUE)) + }); + ensure!( + matches!(is_maxvalue_bound, Some(true)), + error::InvalidSqlSnafu { + msg: "Please provide an extra partition that is bounded by 'MAXVALUE'." + } + ); + Ok(()) +} + +/// Ensure that value lists of partitions are strictly increasing. +fn ensure_value_lists_strictly_increased<'a>( + partitions: &'a Partitions, + partition_columns: Vec<&'a ColumnDef>, +) -> Result>> { + let value_lists = partitions + .entries + .iter() + .map(|x| &x.value_list) + .collect::>(); + for i in 1..value_lists.len() { + let mut equal_tuples = 0; + for (n, (x, y)) in value_lists[i - 1] + .iter() + .zip(value_lists[i].iter()) + .enumerate() + { + let column = partition_columns[n]; + let is_x_maxvalue = matches!(x, SqlValue::Number(s, _) if s == MAXVALUE); + let is_y_maxvalue = matches!(y, SqlValue::Number(s, _) if s == MAXVALUE); + match (is_x_maxvalue, is_y_maxvalue) { + (true, true) => { + equal_tuples += 1; + } + (false, false) => { + let column_name = &column.name.value; + let cdt = sql_data_type_to_concrete_data_type(&column.data_type)?; + let x = sql_value_to_value(column_name, &cdt, x)?; + let y = sql_value_to_value(column_name, &cdt, y)?; + match x.cmp(&y) { + Ordering::Less => break, + Ordering::Equal => equal_tuples += 1, + Ordering::Greater => return error::InvalidSqlSnafu { + msg: "VALUES LESS THAN value must be strictly increasing for each partition.", + }.fail() + } + } + (true, false) => return error::InvalidSqlSnafu { + msg: "VALUES LESS THAN value must be strictly increasing for each partition.", + } + .fail(), + (false, true) => break, + } + } + ensure!( + equal_tuples < partition_columns.len(), + error::InvalidSqlSnafu { + msg: "VALUES LESS THAN value must be strictly increasing for each partition.", + } + ); + } + Ok(value_lists) +} + +/// Ensure that value list's length matches the column list. +fn ensure_value_list_len_matches_columns( + partitions: &Partitions, + partition_columns: &Vec<&ColumnDef>, +) -> Result<()> { + for entry in partitions.entries.iter() { + ensure!( + entry.value_list.len() == partition_columns.len(), + error::InvalidSqlSnafu { + msg: "Partition value list does not match column list.", + } + ); + } + Ok(()) +} + +/// Ensure that all columns used in "PARTITION BY RANGE COLUMNS" are defined in create table. +fn ensure_partition_columns_defined<'a>( + columns: &'a [ColumnDef], + partitions: &'a Partitions, +) -> Result> { + partitions + .column_list + .iter() + .map(|x| { + // Normally the columns in "create table" won't be too many, + // a linear search to find the target every time is fine. + columns + .iter() + .find(|c| &c.name == x) + .context(error::InvalidSqlSnafu { + msg: format!("Partition column {:?} not defined!", x.value), + }) + }) + .collect::>>() +} + +/// Ensure that partition names do not duplicate. +fn ensure_partition_names_no_duplicate(partitions: &Partitions) -> Result<()> { + let partition_names = partitions + .entries + .iter() + .map(|x| &x.name.value) + .sorted() + .collect::>(); + for w in partition_names.windows(2) { + ensure!( + w[0] != w[1], + error::InvalidSqlSnafu { + msg: format!("Duplicate partition names: {}", w[0]), + } + ) + } + Ok(()) +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -165,6 +420,237 @@ mod tests { use super::*; + #[test] + fn test_validate_create() { + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result.is_ok()); + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, x) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("Partition column \"x\" not defined!")); + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r2 VALUES LESS THAN ('sz', 3000), + PARTITION r1 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("Duplicate partition names: r1")); + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh'), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("Partition value list does not match column list")); + + let cases = vec![ + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('sh', 1000), + PARTITION r1 VALUES LESS THAN ('hz', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito", + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 2000), + PARTITION r1 VALUES LESS THAN ('hz', 1000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito", + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('hz', 1000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito", + r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, 2000), + PARTITION r1 VALUES LESS THAN ('sh', 3000), +) +ENGINE=mito", + ]; + for sql in cases { + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("VALUES LESS THAN value must be strictly increasing for each partition")); + } + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, 9999), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("Please provide an extra partition that is bounded by 'MAXVALUE'.")); + } + + #[test] + fn test_parse_create_table_with_partitions() { + let sql = r" +CREATE TABLE monitor ( + host_id INT, + idc STRING, + ts TIMESTAMP, + cpu DOUBLE DEFAULT 0, + memory DOUBLE, + TIME INDEX (ts), + PRIMARY KEY (host), +) +PARTITION BY RANGE COLUMNS(idc, host_id) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r2 VALUES LESS THAN ('sh', 3000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(result.len(), 1); + match &result[0] { + Statement::Create(c) => { + assert!(c.partitions.is_some()); + + let partitions = c.partitions.as_ref().unwrap(); + let column_list = partitions + .column_list + .iter() + .map(|x| &x.value) + .collect::>(); + assert_eq!(column_list, vec!["idc", "host_id"]); + + let entries = &partitions.entries; + let partition_names = entries + .iter() + .map(|x| &x.name.value) + .collect::>(); + assert_eq!(partition_names, vec!["r0", "r1", "r2", "r3"]); + + assert_eq!( + entries[0].value_list, + vec![ + SqlValue::SingleQuotedString("hz".to_string()), + SqlValue::Number("1000".to_string(), false) + ] + ); + assert_eq!( + entries[1].value_list, + vec![ + SqlValue::SingleQuotedString("sh".to_string()), + SqlValue::Number("2000".to_string(), false) + ] + ); + assert_eq!( + entries[2].value_list, + vec![ + SqlValue::SingleQuotedString("sh".to_string()), + SqlValue::Number("3000".to_string(), false) + ] + ); + assert_eq!( + entries[3].value_list, + vec![ + SqlValue::Number(MAXVALUE.to_string(), false), + SqlValue::Number(MAXVALUE.to_string(), false) + ] + ); + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_partitions_with_error_syntax() { + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("sql parser error: Expected BY, found: RANGE")); + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("sql parser error: Expected LESS, found: THAN")); + + let sql = r" +CREATE TABLE rcx ( a INT, b STRING, c INT ) +PARTITION BY RANGE COLUMNS(b, a) ( + PARTITION r0 VALUES LESS THAN ('hz', 1000), + PARTITION r1 VALUES LESS THAN ('sh', 2000), + PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALU), +) +ENGINE=mito"; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result + .unwrap_err() + .to_string() + .contains("sql parser error: Expected a concrete value, found: MAXVALU")); + } + fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) { assert_eq!(column.name.to_string(), name); assert_eq!(column.data_type.to_string(), data_type); diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 0229b547a2..53ce293b6b 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -230,7 +230,7 @@ pub fn column_def_to_schema(column_def: &ColumnDef) -> Result { }) } -fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { +pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result { match data_type { SqlDataType::BigInt(_) => Ok(ConcreteDataType::int64_datatype()), SqlDataType::Int(_) => Ok(ConcreteDataType::int32_datatype()), diff --git a/src/sql/src/statements/create_table.rs b/src/sql/src/statements/create_table.rs index ec30e287ca..ba145a6339 100644 --- a/src/sql/src/statements/create_table.rs +++ b/src/sql/src/statements/create_table.rs @@ -1,4 +1,4 @@ -use crate::ast::{ColumnDef, ObjectName, SqlOption, TableConstraint}; +use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue}; /// Time index name, used in table constraints. pub const TIME_INDEX: &str = "__time_index"; @@ -15,4 +15,17 @@ pub struct CreateTable { pub constraints: Vec, /// Table options in `WITH`. pub options: Vec, + pub partitions: Option, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct Partitions { + pub column_list: Vec, + pub entries: Vec, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct PartitionEntry { + pub name: Ident, + pub value_list: Vec, }