refactor: upgrade DataFusion, Arrow and Sqlparser (#1074)

* refactor: upgrade DataFusion, Arrow and Sqlparser

* fix: resolve PR comments
This commit is contained in:
LFC
2023-02-27 22:20:08 +08:00
committed by GitHub
parent 30287e7e41
commit 11d45e2918
115 changed files with 1368 additions and 1000 deletions

View File

@@ -11,6 +11,7 @@ common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datafusion-sql.workspace = true
datatypes = { path = "../datatypes" }
hex = "0.4"
itertools = "0.10"

View File

@@ -138,6 +138,12 @@ pub enum Error {
#[snafu(display("Unsupported format option: {}", name))]
UnsupportedCopyFormatOption { name: String },
#[snafu(display("Unable to convert statement {} to DataFusion statement", statement))]
ConvertToDfStatement {
statement: String,
backtrace: Backtrace,
},
}
impl ErrorExt for Error {
@@ -167,6 +173,7 @@ impl ErrorExt for Error {
UnsupportedAlterTableStatement { .. } => StatusCode::InvalidSyntax,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
ConvertToDfStatement { .. } => StatusCode::Internal,
}
}

View File

@@ -16,11 +16,9 @@ use snafu::{ensure, ResultExt};
use sqlparser::dialect::Dialect;
use sqlparser::keywords::Keyword;
use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, Tokenizer};
use sqlparser::tokenizer::{Token, TokenWithLocation};
use crate::error::{
self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu, TokenizerSnafu,
};
use crate::error::{self, InvalidDatabaseNameSnafu, InvalidTableNameSnafu, Result, SyntaxSnafu};
use crate::parsers::tql_parser;
use crate::statements::describe::DescribeTable;
use crate::statements::drop::DropTable;
@@ -38,14 +36,11 @@ impl<'a> ParserContext<'a> {
/// Parses SQL with given dialect
pub fn create_with_dialect(sql: &'a str, dialect: &dyn Dialect) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();
let mut tokenizer = Tokenizer::new(dialect, sql);
let tokens: Vec<Token> = tokenizer.tokenize().context(TokenizerSnafu { sql })?;
let mut parser_ctx = ParserContext {
sql,
parser: Parser::new(tokens, dialect),
};
let parser = Parser::new(dialect)
.try_with_sql(sql)
.context(SyntaxSnafu { sql })?;
let mut parser_ctx = ParserContext { sql, parser };
let mut expecting_statement_delimiter = false;
loop {
@@ -71,7 +66,7 @@ impl<'a> ParserContext<'a> {
/// Parses parser context to a set of statements.
pub fn parse_statement(&mut self) -> Result<Statement> {
match self.parser.peek_token() {
match self.parser.peek_token().token {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
@@ -185,7 +180,7 @@ impl<'a> ParserContext<'a> {
}
fn parse_show_tables(&mut self) -> Result<Statement> {
let database = match self.parser.peek_token() {
let database = match self.parser.peek_token().token {
Token::EOF | Token::SemiColon => {
return Ok(Statement::ShowTables(ShowTables {
kind: ShowKind::All,
@@ -220,7 +215,7 @@ impl<'a> ParserContext<'a> {
_ => None,
};
let kind = match self.parser.peek_token() {
let kind = match self.parser.peek_token().token {
Token::EOF | Token::SemiColon => ShowKind::All,
// SHOW TABLES [WHERE | LIKE] [EXPR]
Token::Word(w) => match w.keyword {
@@ -319,7 +314,7 @@ impl<'a> ParserContext<'a> {
}
// Report unexpected token
pub(crate) fn expected<T>(&self, expected: &str, found: Token) -> Result<T> {
pub(crate) fn expected<T>(&self, expected: &str, found: TokenWithLocation) -> Result<T> {
Err(ParserError::ParserError(format!(
"Expected {expected}, found: {found}",
)))
@@ -327,7 +322,7 @@ impl<'a> ParserContext<'a> {
}
pub fn matches_keyword(&mut self, expected: Keyword) -> bool {
match self.parser.peek_token() {
match self.parser.peek_token().token {
Token::Word(w) => w.keyword == expected,
_ => false,
}
@@ -349,7 +344,7 @@ impl<'a> ParserContext<'a> {
/// Parses `SHOW DATABASES` statement.
pub fn parse_show_databases(&mut self) -> Result<Statement> {
let tok = self.parser.next_token();
let tok = self.parser.next_token().token;
match &tok {
Token::EOF | Token::SemiColon => {
Ok(Statement::ShowDatabases(ShowDatabases::new(ShowKind::All)))
@@ -563,7 +558,7 @@ mod tests {
limit: None,
offset: None,
fetch: None,
lock: None,
locks: vec![],
}));
let explain = Explain::try_from(SpStatement::Explain {

View File

@@ -22,7 +22,7 @@ use sqlparser::ast::{ColumnOption, ColumnOptionDef, DataType, Value};
use sqlparser::dialect::keywords::Keyword;
use sqlparser::parser::IsOptional::Mandatory;
use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, Word};
use sqlparser::tokenizer::{Token, TokenWithLocation, Word};
use crate::ast::{ColumnDef, Ident, TableConstraint, Value as SqlValue};
use crate::error::{
@@ -45,7 +45,7 @@ static THAN: Lazy<Token> = Lazy::new(|| Token::make_keyword("THAN"));
/// Parses create [table] statement
impl<'a> ParserContext<'a> {
pub(crate) fn parse_create(&mut self) -> Result<Statement> {
match self.parser.peek_token() {
match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::TABLE => self.parse_create_table(),
@@ -135,7 +135,7 @@ impl<'a> ParserContext<'a> {
let column_list = self
.parser
.parse_parenthesized_column_list(Mandatory)
.parse_parenthesized_column_list(Mandatory, false)
.context(error::SyntaxSnafu { sql: self.sql })?;
let entries = self.parse_comma_separated(Self::parse_partition_entry)?;
@@ -172,7 +172,7 @@ impl<'a> ParserContext<'a> {
}
fn parse_value_list(&mut self) -> Result<SqlValue> {
let token = self.parser.peek_token();
let token = self.parser.peek_token().token;
let value = match token {
Token::Word(Word { value, .. }) if value == MAXVALUE => {
let _ = self.parser.next_token();
@@ -228,7 +228,7 @@ impl<'a> ParserContext<'a> {
loop {
if let Some(constraint) = self.parse_optional_table_constraint()? {
constraints.push(constraint);
} else if let Token::Word(_) = self.parser.peek_token() {
} else if let Token::Word(_) = self.parser.peek_token().token {
self.parse_column(&mut columns, &mut constraints)?;
} else {
return self.expected(
@@ -387,7 +387,10 @@ impl<'a> ParserContext<'a> {
Ok(Some(ColumnOption::NotNull))
} else if parser.parse_keywords(&[Keyword::COMMENT]) {
match parser.next_token() {
Token::SingleQuotedString(value, ..) => Ok(Some(ColumnOption::Comment(value))),
TokenWithLocation {
token: Token::SingleQuotedString(value, ..),
..
} => Ok(Some(ColumnOption::Comment(value))),
unexpected => parser.expected("string", unexpected),
}
} else if parser.parse_keyword(Keyword::NULL) {
@@ -428,7 +431,10 @@ impl<'a> ParserContext<'a> {
None
};
match self.parser.next_token() {
Token::Word(w) if w.keyword == Keyword::PRIMARY => {
TokenWithLocation {
token: Token::Word(w),
..
} if w.keyword == Keyword::PRIMARY => {
self.parser
.expect_keyword(Keyword::KEY)
.context(error::UnexpectedSnafu {
@@ -438,7 +444,7 @@ impl<'a> ParserContext<'a> {
})?;
let columns = self
.parser
.parse_parenthesized_column_list(Mandatory)
.parse_parenthesized_column_list(Mandatory, false)
.context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Some(TableConstraint::Unique {
name,
@@ -446,7 +452,10 @@ impl<'a> ParserContext<'a> {
is_primary: true,
}))
}
Token::Word(w) if w.keyword == Keyword::TIME => {
TokenWithLocation {
token: Token::Word(w),
..
} if w.keyword == Keyword::TIME => {
self.parser
.expect_keyword(Keyword::INDEX)
.context(error::UnexpectedSnafu {
@@ -457,7 +466,7 @@ impl<'a> ParserContext<'a> {
let columns = self
.parser
.parse_parenthesized_column_list(Mandatory)
.parse_parenthesized_column_list(Mandatory, false)
.context(error::SyntaxSnafu { sql: self.sql })?;
ensure!(
@@ -503,9 +512,11 @@ impl<'a> ParserContext<'a> {
actual: self.peek_token_as_string(),
})?;
match self.parser.next_token() {
Token::Word(w) => Ok(w.value),
unexpected => self.expected("Engine is missing", unexpected),
let token = self.parser.next_token();
if let Token::Word(w) = token.token {
Ok(w.value)
} else {
self.expected("'Engine' is missing", token)
}
}
}

View File

@@ -35,7 +35,7 @@ impl<'a> ParserContext<'a> {
pub(crate) fn parse_tql(&mut self) -> Result<Statement> {
self.parser.next_token();
match self.parser.peek_token() {
match self.parser.peek_token().token {
Token::Word(w) => {
let uppercase = w.value.to_uppercase();
match w.keyword {
@@ -80,7 +80,7 @@ impl<'a> ParserContext<'a> {
parser: &mut Parser,
token: Token,
) -> std::result::Result<String, ParserError> {
let value = match parser.next_token() {
let value = match parser.next_token().token {
Token::Number(n, _) => n,
Token::DoubleQuotedString(s) | Token::SingleQuotedString(s) => s,
unexpected => {

View File

@@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion_sql::parser::Statement as DfStatement;
use sqlparser::ast::Statement as SpStatement;
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable;
use crate::statements::copy::CopyTable;
use crate::statements::create::{CreateDatabase, CreateTable};
@@ -67,3 +71,21 @@ pub struct Hint {
pub comment: String,
pub prefix: String,
}
impl TryFrom<&Statement> for DfStatement {
type Error = Error;
fn try_from(s: &Statement) -> Result<Self, Self::Error> {
let s = match s {
Statement::Query(query) => SpStatement::Query(Box::new(query.inner.clone())),
Statement::Explain(explain) => explain.inner.clone(),
_ => {
return ConvertToDfStatementSnafu {
statement: format!("{s:?}"),
}
.fail();
}
};
Ok(DfStatement::Statement(Box::new(s)))
}
}