diff --git a/Cargo.lock b/Cargo.lock index 5bf9194052..4d392ef35f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3460,6 +3460,7 @@ dependencies = [ "common-error", "snafu", "sqlparser", + "table-engine", ] [[package]] diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 498751ecc3..77f7252696 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -49,7 +49,7 @@ where todo!("Currently not supported") } Statement::Query(qb) => self.query_to_plan(qb), - Statement::Insert(_) => unreachable!(), + Statement::Create(_) | Statement::Insert(_) => unreachable!(), } } } diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 98365426a0..e6bff657cc 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" common-error = { path = "../common/error" } snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" +table-engine = { path = "../table-engine" } diff --git a/src/sql/src/ast.rs b/src/sql/src/ast.rs index 317527540b..335c84040e 100644 --- a/src/sql/src/ast.rs +++ b/src/sql/src/ast.rs @@ -1,2 +1 @@ -pub use sqlparser::ast::Expr; -pub use sqlparser::ast::Value; +pub use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName, SqlOption, TableConstraint, Value}; diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 9188d8432b..9498452f1d 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -33,6 +33,12 @@ pub enum Error { #[snafu(display("Tokenizer error, sql: {}, source: {}", sql, source))] Tokenizer { sql: String, source: TokenizerError }, + + #[snafu(display( + "Invalid time index, it should contains only one column, sql: {}.", + sql + ))] + InvalidTimeIndex { sql: String, backtrace: Backtrace }, } impl ErrorExt for Error { @@ -41,7 +47,9 @@ impl ErrorExt for Error { match self { Unsupported { .. } => StatusCode::Unsupported, - Unexpected { .. } | Syntax { .. } | Tokenizer { .. } => StatusCode::InvalidSyntax, + Unexpected { .. } | Syntax { .. } | InvalidTimeIndex { .. } | Tokenizer { .. } => { + StatusCode::InvalidSyntax + } } } diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 419d0a44d3..da5c607809 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -2,9 +2,10 @@ use snafu::ResultExt; use sqlparser::dialect::Dialect; use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; +use sqlparser::parser::ParserError; use sqlparser::tokenizer::{Token, Tokenizer}; -use crate::error::{self, Error, TokenizerSnafu}; +use crate::error::{self, Error, SyntaxSnafu, TokenizerSnafu}; use crate::statements::show_database::SqlShowDatabase; use crate::statements::show_kind::ShowKind; use crate::statements::statement::Statement; @@ -108,8 +109,13 @@ impl<'a> ParserContext<'a> { todo!() } - fn parse_create(&mut self) -> Result { - todo!() + // Report unexpected token + pub(crate) fn expected(&self, expected: &str, found: Token) -> Result { + Err(ParserError::ParserError(format!( + "Expected {}, found: {}", + expected, found + ))) + .context(SyntaxSnafu { sql: self.sql }) } pub fn consume_token(&mut self, expected: &str) -> bool { diff --git a/src/sql/src/parsers.rs b/src/sql/src/parsers.rs index d290d58818..71117e627e 100644 --- a/src/sql/src/parsers.rs +++ b/src/sql/src/parsers.rs @@ -1,2 +1,3 @@ +pub(crate) mod create_parser; pub(crate) mod insert_parser; pub(crate) mod query_parser; diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs new file mode 100644 index 0000000000..bcb0b3106c --- /dev/null +++ b/src/sql/src/parsers/create_parser.rs @@ -0,0 +1,237 @@ +use snafu::ensure; +use snafu::ResultExt; +use sqlparser::parser::IsOptional::Mandatory; +use sqlparser::{dialect::keywords::Keyword, tokenizer::Token}; +use table_engine::engine; + +use crate::ast::{ColumnDef, Ident, TableConstraint}; +use crate::error; +use crate::error::{InvalidTimeIndexSnafu, SyntaxSnafu}; +use crate::parser::ParserContext; +use crate::parser::Result; +use crate::statements::create_table::{CreateTable, TIME_INDEX}; +use crate::statements::statement::Statement; + +const ENGINE: &str = "ENGINE"; + +/// Pasre create [table] statement +impl<'a> ParserContext<'a> { + pub(crate) fn parse_create(&mut self) -> Result { + self.parser + .expect_keyword(Keyword::TABLE) + .context(error::SyntaxSnafu { sql: self.sql })?; + let if_not_exists = + self.parser + .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + + let table_name = self + .parser + .parse_object_name() + .context(error::SyntaxSnafu { sql: self.sql })?; + let (columns, constraints) = self.parse_columns()?; + let engine = self.parse_table_engine()?; + let options = self + .parser + .parse_options(Keyword::WITH) + .context(error::SyntaxSnafu { sql: self.sql })?; + + Ok(Statement::Create(CreateTable { + if_not_exists, + name: table_name, + columns, + engine, + constraints, + options, + })) + } + + fn parse_columns(&mut self) -> Result<(Vec, Vec)> { + let mut columns = vec![]; + let mut constraints = vec![]; + if !self.parser.consume_token(&Token::LParen) || self.parser.consume_token(&Token::RParen) { + return Ok((columns, constraints)); + } + + loop { + if let Some(constraint) = self.parse_optional_table_constraint()? { + constraints.push(constraint); + } else if let Token::Word(_) = self.parser.peek_token() { + columns.push( + self.parser + .parse_column_def() + .context(SyntaxSnafu { sql: self.sql })?, + ); + } else { + return self.expected( + "column name or constraint definition", + self.parser.peek_token(), + ); + } + let comma = self.parser.consume_token(&Token::Comma); + if self.parser.consume_token(&Token::RParen) { + // allow a trailing comma, even though it's not in standard + break; + } else if !comma { + return self.expected( + "',' or ')' after column definition", + self.parser.peek_token(), + ); + } + } + + Ok((columns, constraints)) + } + + // Copy from sqlparser by boyan + fn parse_optional_table_constraint(&mut self) -> Result> { + let name = if self.parser.parse_keyword(Keyword::CONSTRAINT) { + Some( + self.parser + .parse_identifier() + .context(error::SyntaxSnafu { sql: self.sql })?, + ) + } else { + None + }; + match self.parser.next_token() { + Token::Word(w) if w.keyword == Keyword::PRIMARY => { + self.parser + .expect_keyword(Keyword::KEY) + .context(error::SyntaxSnafu { sql: self.sql })?; + let columns = self + .parser + .parse_parenthesized_column_list(Mandatory) + .context(error::SyntaxSnafu { sql: self.sql })?; + Ok(Some(TableConstraint::Unique { + name, + columns, + is_primary: true, + })) + } + Token::Word(w) if w.keyword == Keyword::TIME => { + self.parser + .expect_keyword(Keyword::INDEX) + .context(error::SyntaxSnafu { sql: self.sql })?; + let columns = self + .parser + .parse_parenthesized_column_list(Mandatory) + .context(error::SyntaxSnafu { sql: self.sql })?; + + ensure!(columns.len() == 1, InvalidTimeIndexSnafu { sql: self.sql }); + + // TODO(dennis): TableConstraint doesn't support dialect right now, + // so we use unique constraint with special key to represent TIME INDEX. + Ok(Some(TableConstraint::Unique { + name: Some(Ident { + value: TIME_INDEX.to_owned(), + quote_style: None, + }), + columns, + is_primary: false, + })) + } + unexpected => { + if name.is_some() { + self.expected("PRIMARY, TIME", unexpected) + } else { + self.parser.prev_token(); + Ok(None) + } + } + } + } + + /// Parses the set of valid formats + fn parse_table_engine(&mut self) -> Result { + if !self.consume_token(ENGINE) { + return Ok(engine::DEFAULT_ENGINE.to_string()); + } + + self.parser + .expect_token(&Token::Eq) + .context(error::SyntaxSnafu { sql: self.sql })?; + + match self.parser.next_token() { + Token::Word(w) => Ok(w.value), + unexpected => self.expected("Engine is missing", unexpected), + } + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use sqlparser::dialect::GenericDialect; + + use super::*; + + fn assert_column_def(column: &ColumnDef, name: &str, data_type: &str) { + assert_eq!(column.name.to_string(), name); + assert_eq!(column.data_type.to_string(), data_type); + } + + #[test] + pub fn test_parse_create_table() { + let sql = r"create table demo( + host string, + ts int64, + cpu float64 default 0, + memory float64, + TIME INDEX (ts), + PRIMARY KEY(ts, host)) engine=mito + with(regions=1); + "; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}).unwrap(); + assert_eq!(1, result.len()); + match &result[0] { + Statement::Create(c) => { + assert!(!c.if_not_exists); + assert_eq!("demo", c.name.to_string()); + assert_eq!("mito", c.engine); + assert_eq!(4, c.columns.len()); + let columns = &c.columns; + assert_column_def(&columns[0], "host", "STRING"); + assert_column_def(&columns[1], "ts", "int64"); + assert_column_def(&columns[2], "cpu", "float64"); + assert_column_def(&columns[3], "memory", "float64"); + let constraints = &c.constraints; + assert_matches!( + &constraints[0], + TableConstraint::Unique { + is_primary: false, + .. + } + ); + assert_matches!( + &constraints[1], + TableConstraint::Unique { + is_primary: true, + .. + } + ); + let options = &c.options; + assert_eq!(1, options.len()); + assert_eq!("regions", &options[0].name.to_string()); + assert_eq!("1", &options[0].value.to_string()); + } + _ => unreachable!(), + } + } + + #[test] + fn test_invalid_index_keys() { + let sql = r"create table demo( + host string, + ts int64, + cpu float64 default 0, + memory float64, + TIME INDEX (ts, host), + PRIMARY KEY(ts, host)) engine=mito + with(regions=1); + "; + let result = ParserContext::create_with_dialect(sql, &GenericDialect {}); + assert!(result.is_err()); + assert_matches!(result, Err(crate::error::Error::InvalidTimeIndex { .. })); + } +} diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index d5b332eeaf..603328fd71 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -48,7 +48,7 @@ mod tests { Statement::Insert(_) => { panic!("Not expected to be a show database statement") } - Statement::Query(_) => {} + Statement::Create(_) | Statement::Query(_) => {} } } } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index eea41d1e38..20230ce292 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -1,3 +1,4 @@ +pub mod create_table; pub mod insert; pub mod query; pub mod show_database; diff --git a/src/sql/src/statements/create_table.rs b/src/sql/src/statements/create_table.rs new file mode 100644 index 0000000000..2d3dd61002 --- /dev/null +++ b/src/sql/src/statements/create_table.rs @@ -0,0 +1,17 @@ +use crate::ast::{ColumnDef, ObjectName, SqlOption, TableConstraint}; + +/// Time index name, used in table constraints. +pub const TIME_INDEX: &str = "__time_index"; + +#[derive(Debug, PartialEq, Clone)] +pub struct CreateTable { + /// Create if not exists + pub if_not_exists: bool, + /// Table name + pub name: ObjectName, + pub columns: Vec, + pub engine: String, + pub constraints: Vec, + /// Table options in `WITH`. + pub options: Vec, +} diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index ff7d3c8e43..379414c35d 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -1,6 +1,7 @@ use sqlparser::ast::Statement as SpStatement; use sqlparser::parser::ParserError; +use crate::statements::create_table::CreateTable; use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::show_database::SqlShowDatabase; @@ -16,6 +17,9 @@ pub enum Statement { // Insert Insert(Box), + + /// CREATE TABLE + Create(CreateTable), } /// Converts Statement to sqlparser statement @@ -29,6 +33,7 @@ impl TryFrom for SpStatement { )), Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))), Statement::Insert(i) => Ok(i.inner), + Statement::Create(_) => unimplemented!(), } } }