Compare commits

...

2 Commits

Author SHA1 Message Date
Ruihang Xia
94409967be Merge branch 'main' into create-view 2024-04-22 21:08:22 +08:00
Ruihang Xia
7503992d61 add statement
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-04-17 19:13:54 +08:00
11 changed files with 108 additions and 11 deletions

1
Cargo.lock generated
View File

@@ -6298,6 +6298,7 @@ dependencies = [
"sql", "sql",
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
"store-api", "store-api",
"substrait 0.7.2",
"table", "table",
"tokio", "tokio",
"tonic 0.11.0", "tonic 0.11.0",

View File

@@ -52,6 +52,7 @@ snafu.workspace = true
sql.workspace = true sql.workspace = true
sqlparser.workspace = true sqlparser.workspace = true
store-api.workspace = true store-api.workspace = true
substrait.workspace = true
table.workspace = true table.workspace = true
tokio.workspace = true tokio.workspace = true
tonic.workspace = true tonic.workspace = true

View File

@@ -541,6 +541,12 @@ pub enum Error {
end: String, end: String,
location: Location, location: Location,
}, },
#[snafu(display("Failed to convert between logical plan and substrait plan"))]
SubstraitCodec {
location: Location,
source: substrait::error::Error,
},
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
@@ -597,6 +603,7 @@ impl ErrorExt for Error {
Error::RequestInserts { source, .. } => source.status_code(), Error::RequestInserts { source, .. } => source.status_code(),
Error::RequestRegion { source, .. } => source.status_code(), Error::RequestRegion { source, .. } => source.status_code(),
Error::RequestDeletes { source, .. } => source.status_code(), Error::RequestDeletes { source, .. } => source.status_code(),
Error::SubstraitCodec { source, .. } => source.status_code(),
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => { Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
source.status_code() source.status_code()

View File

@@ -164,6 +164,10 @@ impl StatementExecutor {
let _ = self.create_external_table(stmt, query_ctx).await?; let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0)) Ok(Output::new_with_affected_rows(0))
} }
Statement::CreateView(stmt) => {
let _ = self.create_view(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => { Statement::DropTable(stmt) => {
let (catalog, schema, table) = let (catalog, schema, table) =
@@ -256,6 +260,13 @@ impl StatementExecutor {
.context(PlanStatementSnafu) .context(PlanStatementSnafu)
} }
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.query_engine
.planner()
.optimize(plan)
.context(PlanStatementSnafu)
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> { async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?; let plan = self.plan(stmt, query_ctx.clone()).await?;

View File

@@ -39,16 +39,21 @@ use datatypes::value::Value;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef}; use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::sql::create_table_stmt; use query::sql::create_table_stmt;
use regex::Regex; use regex::Regex;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name; use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt}; use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::statements::alter::AlterTable; use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; use sql::statements::create::{
CreateExternalTable, CreateTable, CreateTableLike, CreateView, Partitions,
};
use sql::statements::sql_value_to_value; use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::dist_table::DistTable; use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions}; use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -60,7 +65,7 @@ use crate::error::{
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
}; };
use crate::expr_factory; use crate::expr_factory;
@@ -320,6 +325,33 @@ impl StatementExecutor {
.collect()) .collect())
} }
#[tracing::instrument(skip_all)]
pub async fn create_view(
&self,
create_view: CreateView,
ctx: QueryContextRef,
) -> Result<TableRef> {
// convert input into logical plan
let logical_plan = match *create_view.input {
Statement::Query(query) => {
self.plan(QueryStatement::Sql(Statement::Query(query)), ctx)
.await?
}
Statement::Tql(query) => self.plan_tql(query, &ctx).await?,
_ => {
todo!("throw an error")
}
};
let optimized_plan = self.optimize_logical_plan(logical_plan)?;
// encode logical plan
let encoded_plan = DFLogicalSubstraitConvertor
.encode(&optimized_plan.unwrap_df_plan())
.context(SubstraitCodecSnafu)?;
todo!()
}
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> { pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();

View File

@@ -20,6 +20,7 @@ use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
}; };
use query::plan::LogicalPlan;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use sql::statements::tql::Tql; use sql::statements::tql::Tql;
@@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
use crate::statement::StatementExecutor; use crate::statement::StatementExecutor;
impl StatementExecutor { impl StatementExecutor {
/// Plan the given [Tql] query and return the [LogicalPlan].
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> { pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
let stmt = match tql { let stmt = match tql {
Tql::Eval(eval) => { Tql::Eval(eval) => {
let promql = PromQuery { let promql = PromQuery {
@@ -86,12 +88,17 @@ impl StatementExecutor {
.unwrap() .unwrap()
} }
}; };
let plan = self self.query_engine
.query_engine
.planner() .planner()
.plan(stmt, query_ctx.clone()) .plan(stmt, query_ctx.clone())
.await .await
.context(PlanStatementSnafu)?; .context(PlanStatementSnafu)
}
/// Execute the given [Tql] query and return the result.
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan_tql(tql, &query_ctx).await?;
self.query_engine self.query_engine
.execute(plan, query_ctx) .execute(plan, query_ctx)
.await .await

View File

@@ -87,6 +87,13 @@ impl LogicalPlan {
.context(DataFusionSnafu) .context(DataFusionSnafu)
.map(LogicalPlan::DfPlan) .map(LogicalPlan::DfPlan)
} }
/// Unwrap the logical plan into a DataFusion logical plan
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
match self {
LogicalPlan::DfPlan(plan) => plan,
}
}
} }
impl From<DfLogicalPlan> for LogicalPlan { impl From<DfLogicalPlan> for LogicalPlan {

View File

@@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
pub trait LogicalPlanner: Send + Sync { pub trait LogicalPlanner: Send + Sync {
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>; async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
fn as_any(&self) -> &dyn Any; fn as_any(&self) -> &dyn Any;
} }
@@ -145,6 +147,14 @@ impl DfLogicalPlanner {
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(QueryPlanSnafu) .context(QueryPlanSnafu)
} }
#[tracing::instrument(skip_all)]
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.engine_state
.optimize_logical_plan(plan.unwrap_df_plan())
.context(DataFusionSnafu)
.map(Into::into)
}
} }
#[async_trait] #[async_trait]
@@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner {
} }
} }
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.optimize_logical_plan(plan)
}
fn as_any(&self) -> &dyn Any { fn as_any(&self) -> &dyn Any {
self self
} }

View File

@@ -142,6 +142,11 @@ impl QueryEngineState {
}) })
} }
/// Run the full logical plan optimize phase for the given plan.
pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
self.session_state().optimize(&plan)
}
/// Register an udf function. /// Register an udf function.
/// Will override if the function with same name is already registered. /// Will override if the function with same name is already registered.
pub fn register_function(&self, func: FunctionRef) { pub fn register_function(&self, func: FunctionRef) {

View File

@@ -20,6 +20,7 @@ use sqlparser::ast::Expr;
use sqlparser_derive::{Visit, VisitMut}; use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue}; use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
use crate::statements::statement::Statement;
use crate::statements::OptionMap; use crate::statements::OptionMap;
const LINE_SEP: &str = ",\n"; const LINE_SEP: &str = ",\n";
@@ -237,6 +238,17 @@ pub struct CreateTableLike {
pub source_name: ObjectName, pub source_name: ObjectName,
} }
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
pub struct CreateView {
/// View name
pub view_name: ObjectName,
/// The clause after `As` that defines the VIEW.
/// Can only be either [Statement::Query] or [Statement::Tql].
pub input: Box<Statement>,
/// Whether to replace existing VIEW
pub or_replace: bool,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::assert_matches::assert_matches; use std::assert_matches::assert_matches;

View File

@@ -16,22 +16,20 @@ use datafusion_sql::parser::Statement as DfStatement;
use sqlparser::ast::Statement as SpStatement; use sqlparser::ast::Statement as SpStatement;
use sqlparser_derive::{Visit, VisitMut}; use sqlparser_derive::{Visit, VisitMut};
use super::drop::DropDatabase;
use super::show::ShowVariables;
use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable; use crate::statements::alter::AlterTable;
use crate::statements::create::{ use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateView,
}; };
use crate::statements::delete::Delete; use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable; use crate::statements::describe::DescribeTable;
use crate::statements::drop::DropTable; use crate::statements::drop::{DropDatabase, DropTable};
use crate::statements::explain::Explain; use crate::statements::explain::Explain;
use crate::statements::insert::Insert; use crate::statements::insert::Insert;
use crate::statements::query::Query; use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables; use crate::statements::set_variables::SetVariables;
use crate::statements::show::{ use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
}; };
use crate::statements::tql::Tql; use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable; use crate::statements::truncate::TruncateTable;
@@ -52,6 +50,8 @@ pub enum Statement {
CreateExternalTable(CreateExternalTable), CreateExternalTable(CreateExternalTable),
// CREATE TABLE ... LIKE // CREATE TABLE ... LIKE
CreateTableLike(CreateTableLike), CreateTableLike(CreateTableLike),
// CREATE VIEW ... AS
CreateView(CreateView),
// DROP TABLE // DROP TABLE
DropTable(DropTable), DropTable(DropTable),
// DROP DATABASE // DROP DATABASE