feat: INSERT statement and planner implementation (#16)

* feat: Add SQL parser definition and SHOW DATABASE implementation

* chores: Eliminate clippy warnings and errors.

* chores: remove Gt prefix in some structs; rename some mod; remove print(s) in unit tests; refactor crate layout;
feat: wrap sqlparser error;

* chores: reorder cargo dependency

* chores: fix code style

* chores: add #[cfg(test)] to unit tests

* style: fix test mod style

* feat: implement select query parser

* chores: remove unused dependency

* feat: implement TryInto<sqlparser::ast::Statement> for Statement

* chore: fix style issues

* refactor: wrap sqlparser Query inside Query statement variant to reduce complexity

* refactor: replace TryInto to TryFrom

* refactor: use [Rust 2018 mod convention](https://doc.rust-lang.org/edition-guide/rust-2018/path-changes.html#no-more-modrs)

* refactor: remove unnecessary file prefix in statement mod

* feat: implement INSERT parser (currently without INSERT validation)

* feat: wrap DataFusion planner and add simple query planner implementation

* refactor: move planner mod to query crate

* fix: styles and conventions
This commit is contained in:
Lei, Huang
2022-05-05 16:28:38 +08:00
committed by GitHub
parent bf331ec4ac
commit f889ed5488
18 changed files with 198 additions and 44 deletions

19
Cargo.lock generated
View File

@@ -402,7 +402,7 @@ dependencies = [
"pin-project-lite",
"rand",
"smallvec",
"sqlparser 0.15.0",
"sqlparser",
"tempfile",
"tokio",
"tokio-stream",
@@ -416,7 +416,7 @@ dependencies = [
"arrow2",
"ordered-float 2.10.0",
"parquet2",
"sqlparser 0.15.0",
"sqlparser",
]
[[package]]
@@ -427,7 +427,7 @@ dependencies = [
"ahash",
"arrow2",
"datafusion-common",
"sqlparser 0.15.0",
"sqlparser",
]
[[package]]
@@ -1060,6 +1060,7 @@ dependencies = [
"futures",
"futures-util",
"snafu",
"sql",
"table",
"tokio",
"tokio-stream",
@@ -1256,9 +1257,8 @@ checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451"
name = "sql"
version = "0.1.0"
dependencies = [
"query",
"snafu",
"sqlparser 0.16.0",
"sqlparser",
]
[[package]]
@@ -1270,15 +1270,6 @@ dependencies = [
"log",
]
[[package]]
name = "sqlparser"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e9a527b68048eb95495a1508f6c8395c8defcff5ecdbe8ad4106d08a2ef2a3c"
dependencies = [
"log",
]
[[package]]
name = "static_assertions"
version = "1.1.0"

View File

@@ -18,6 +18,7 @@ futures-util = "0.3.21"
snafu = "0.7.0"
table = { path = "../table" }
tokio = "1.0"
sql = { path = "../sql" }
[dev-dependencies]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }

View File

@@ -21,3 +21,13 @@ impl From<Error> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum PlannerError {
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
DfPlan {
sql: String,
source: DataFusionError,
},
}

View File

@@ -6,4 +6,5 @@ pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_planner;
pub mod plan;
pub mod planner;
pub mod query_engine;

56
src/query/src/planner.rs Normal file
View File

@@ -0,0 +1,56 @@
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use snafu::ResultExt;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use crate::error;
use crate::error::PlannerError;
use crate::plan::LogicalPlan;
pub trait Planner {
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
}
type Result<T> = std::result::Result<T, PlannerError>;
pub struct DfPlanner<'a, S: ContextProvider> {
sql_to_rel: SqlToRel<'a, S>,
}
impl<'a, S: ContextProvider> DfPlanner<'a, S> {
/// Creates a DataFusion planner instance
pub fn new(schema_provider: &'a S) -> Self {
let rel = SqlToRel::new(schema_provider);
Self { sql_to_rel: rel }
}
/// Converts QUERY statement to logical plan.
pub fn query_to_plan(&self, query: Box<Query>) -> Result<LogicalPlan> {
// todo(hl): original SQL should be provided as an argument
let sql = query.inner.to_string();
let result = self
.sql_to_rel
.query_to_plan(query.inner)
.context(error::DfPlanSnafu { sql })?;
Ok(LogicalPlan::DfPlan(result))
}
}
impl<'a, S> Planner for DfPlanner<'a, S>
where
S: ContextProvider,
{
/// Converts statement to logical plan using datafusion planner
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::ShowDatabases(_) => {
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Insert(_) => {
todo!()
}
}
}
}

View File

@@ -20,10 +20,14 @@ use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider};
use crate::error::{self, Result};
use crate::executor::Runtime;
const DEFAULT_CATALOG_NAME: &str = "greptime";
const DEFAULT_SCHEMA_NAME: &str = "public";
/// Query engine global state
#[derive(Clone)]
pub struct QueryEngineState {
df_context: ExecutionContext,
catalog_list: Arc<dyn CatalogList>,
}
impl fmt::Debug for QueryEngineState {
@@ -35,7 +39,8 @@ impl fmt::Debug for QueryEngineState {
impl QueryEngineState {
pub(crate) fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
let config = ExecutionConfig::new().with_default_catalog_and_schema("greptime", "public");
let config = ExecutionConfig::new()
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let df_context = ExecutionContext::with_config(config);
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
@@ -43,7 +48,10 @@ impl QueryEngineState {
runtime: df_context.runtime_env(),
});
Self { df_context }
Self {
df_context,
catalog_list,
}
}
#[inline]
@@ -55,6 +63,13 @@ impl QueryEngineState {
pub(crate) fn runtime(&self) -> Runtime {
self.df_context.runtime_env().into()
}
#[allow(dead_code)]
pub(crate) fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.and_then(|c| c.schema(schema_name))
}
}
/// Adapters between datafusion and greptime query engine.

View File

@@ -6,6 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
query = { path = "../query" }
snafu = "0.7.0"
sqlparser = "0.16.0"
sqlparser = "0.15.0"

View File

@@ -7,5 +7,4 @@ pub mod dialect;
pub mod errors;
pub mod parser;
pub mod parsers;
pub mod planner;
pub mod statements;

View File

@@ -5,9 +5,11 @@ use sqlparser::parser::Parser;
use sqlparser::tokenizer::{Token, Tokenizer};
use crate::errors;
use crate::statements::show_database::SqlShowDatabase;
use crate::statements::show_kind::ShowKind;
use crate::statements::statement::Statement;
use crate::statements::statement_show_database::SqlShowDatabase;
pub type Result<T> = std::result::Result<T, errors::ParserError>;
/// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser.
pub struct ParserContext<'a> {
@@ -17,10 +19,7 @@ pub struct ParserContext<'a> {
impl<'a> ParserContext<'a> {
/// Parses SQL with given dialect
pub fn create_with_dialect(
sql: &'a str,
dialect: &dyn Dialect,
) -> Result<Vec<Statement>, errors::ParserError> {
pub fn create_with_dialect(sql: &'a str, dialect: &dyn Dialect) -> Result<Vec<Statement>> {
let mut stmts: Vec<Statement> = Vec::new();
let mut tokenizer = Tokenizer::new(dialect, sql);
@@ -54,7 +53,7 @@ impl<'a> ParserContext<'a> {
}
/// Parses parser context to a set of statements.
pub fn parse_statement(&mut self) -> Result<Statement, errors::ParserError> {
pub fn parse_statement(&mut self) -> Result<Statement> {
match self.parser.peek_token() {
Token::Word(w) => {
match w.keyword {
@@ -87,7 +86,7 @@ impl<'a> ParserContext<'a> {
}
/// Raises an "unsupported statement" error.
pub fn unsupported<T>(&self, keyword: String) -> Result<T, errors::ParserError> {
pub fn unsupported<T>(&self, keyword: String) -> Result<T> {
Err(errors::ParserError::Unsupported {
sql: self.sql.to_string(),
keyword,
@@ -96,7 +95,7 @@ impl<'a> ParserContext<'a> {
/// Parses SHOW statements
/// todo(hl) support `show table`/`show settings`/`show create`/`show users` ect.
fn parse_show(&mut self) -> Result<Statement, errors::ParserError> {
fn parse_show(&mut self) -> Result<Statement> {
if self.consume_token("DATABASES") || self.consume_token("SCHEMAS") {
Ok(self.parse_show_databases()?)
} else {
@@ -104,15 +103,11 @@ impl<'a> ParserContext<'a> {
}
}
fn parse_explain(&mut self) -> Result<Statement, errors::ParserError> {
fn parse_explain(&mut self) -> Result<Statement> {
todo!()
}
fn parse_insert(&mut self) -> Result<Statement, errors::ParserError> {
todo!()
}
fn parse_create(&mut self) -> Result<Statement, errors::ParserError> {
fn parse_create(&mut self) -> Result<Statement> {
todo!()
}
@@ -131,7 +126,7 @@ impl<'a> ParserContext<'a> {
}
/// Parses `SHOW DATABASES` statement.
pub fn parse_show_databases(&mut self) -> Result<Statement, errors::ParserError> {
pub fn parse_show_databases(&mut self) -> Result<Statement> {
let tok = self.parser.next_token();
match &tok {
Token::EOF | Token::SemiColon => Ok(Statement::ShowDatabases(SqlShowDatabase::new(

View File

@@ -1 +1,2 @@
pub(crate) mod insert_parser;
pub(crate) mod query_parser;

View File

@@ -0,0 +1,30 @@
use snafu::ResultExt;
use sqlparser::ast::Statement as SpStatement;
use crate::errors;
use crate::parser::ParserContext;
use crate::parser::Result;
use crate::statements::insert::Insert;
use crate::statements::statement::Statement;
/// INSERT statement parser implementation
impl<'a> ParserContext<'a> {
pub(crate) fn parse_insert(&mut self) -> Result<Statement> {
self.parser.next_token();
let spstatement = self
.parser
.parse_insert()
.context(errors::InnerSnafu { sql: self.sql })?;
match spstatement {
SpStatement::Insert { .. } => {
Ok(Statement::Insert(Box::new(Insert { inner: spstatement })))
}
unexp => errors::UnsupportedSnafu {
sql: self.sql.to_string(),
keyword: unexp.to_string(),
}
.fail(),
}
}
}

View File

@@ -1,14 +1,14 @@
use errors::ParserError;
use snafu::prelude::*;
use crate::errors;
use crate::parser::ParserContext;
use crate::parser::Result;
use crate::statements::query::Query;
use crate::statements::statement::Statement;
use crate::statements::statement_query::Query;
impl<'a> ParserContext<'a> {
/// Parses select and it's variants.
pub(crate) fn parse_query(&mut self) -> Result<Statement, ParserError> {
pub(crate) fn parse_query(&mut self) -> Result<Statement> {
let spquery = self
.parser
.parse_query()
@@ -45,6 +45,9 @@ mod tests {
Statement::ShowDatabases(_) => {
panic!("Not expected to be a show database statement")
}
Statement::Insert(_) => {
panic!("Not expected to be a show database statement")
}
Statement::Query(_) => {}
}
}

View File

@@ -1 +0,0 @@

View File

@@ -1,4 +1,5 @@
pub mod insert;
pub mod query;
pub mod show_database;
pub mod show_kind;
pub mod statement;
pub mod statement_query;
pub mod statement_show_database;

View File

@@ -0,0 +1,22 @@
use sqlparser::ast::Statement;
use sqlparser::parser::ParserError;
#[derive(Debug, Clone, PartialEq)]
pub struct Insert {
// Can only be sqlparser::ast::Statement::Insert variant
pub inner: Statement,
}
impl TryFrom<Statement> for Insert {
type Error = ParserError;
fn try_from(value: Statement) -> Result<Self, Self::Error> {
match value {
Statement::Insert { .. } => Ok(Insert { inner: value }),
unexp => Err(ParserError::ParserError(format!(
"Not expected to be {}",
unexp
))),
}
}
}

View File

@@ -0,0 +1,26 @@
use sqlparser::ast::Query as SpQuery;
use crate::errors::ParserError;
/// Query statement instance.
#[derive(Debug, Clone, PartialEq)]
pub struct Query {
pub inner: SpQuery,
}
/// Automatically converts from sqlparser Query instance to SqlQuery.
impl TryFrom<SpQuery> for Query {
type Error = ParserError;
fn try_from(q: SpQuery) -> Result<Self, Self::Error> {
Ok(Query { inner: q })
}
}
impl TryFrom<Query> for SpQuery {
type Error = ParserError;
fn try_from(value: Query) -> Result<Self, Self::Error> {
Ok(value.inner)
}
}

View File

@@ -1,8 +1,9 @@
use sqlparser::ast::Statement as SpStatement;
use sqlparser::parser::ParserError::ParserError;
use sqlparser::parser::ParserError;
use crate::statements::statement_query::Query;
use crate::statements::statement_show_database::SqlShowDatabase;
use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::show_database::SqlShowDatabase;
/// Tokens parsed by `DFParser` are converted into these values.
#[derive(Debug, Clone, PartialEq)]
@@ -12,6 +13,9 @@ pub enum Statement {
// Query
Query(Box<Query>),
// Insert
Insert(Box<Insert>),
}
/// Converts Statement to sqlparser statement
@@ -20,10 +24,11 @@ impl TryFrom<Statement> for SpStatement {
fn try_from(value: Statement) -> Result<Self, Self::Error> {
match value {
Statement::ShowDatabases(_) => Err(ParserError(
Statement::ShowDatabases(_) => Err(ParserError::ParserError(
"sqlparser does not support SHOW DATABASE query.".to_string(),
)),
Statement::Query(s) => Ok(SpStatement::Query(Box::new(s.inner))),
Statement::Insert(i) => Ok(i.inner),
}
}
}