From 7503992d6130e7de9589bee204246116185f71a3 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 17 Apr 2024 19:13:54 +0800 Subject: [PATCH] add statement Signed-off-by: Ruihang Xia --- Cargo.lock | 1 + src/operator/Cargo.toml | 1 + src/operator/src/error.rs | 7 ++++++ src/operator/src/statement.rs | 11 +++++++++ src/operator/src/statement/ddl.rs | 36 +++++++++++++++++++++++++++-- src/operator/src/statement/tql.rs | 15 ++++++++---- src/query/src/plan.rs | 7 ++++++ src/query/src/planner.rs | 14 +++++++++++ src/query/src/query_engine/state.rs | 5 ++++ src/sql/src/statements/create.rs | 12 ++++++++++ src/sql/src/statements/statement.rs | 12 ++++++---- 11 files changed, 110 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4f44c1d83..9f7f378c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6188,6 +6188,7 @@ dependencies = [ "sql", "sqlparser 0.38.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=6a93567ae38d42be5c8d08b13c8ff4dde26502ef)", "store-api", + "substrait 0.7.2", "table", "tokio", "tonic 0.10.2", diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index 83e0892a49..40037c0b11 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -52,6 +52,7 @@ snafu.workspace = true sql.workspace = true sqlparser.workspace = true store-api.workspace = true +substrait.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index dd8d64f10f..d446aec86f 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -541,6 +541,12 @@ pub enum Error { end: String, location: Location, }, + + #[snafu(display("Failed to convert between logical plan and substrait plan"))] + SubstraitCodec { + location: Location, + source: substrait::error::Error, + }, } pub type Result = std::result::Result; @@ -597,6 +603,7 @@ impl ErrorExt for Error { Error::RequestInserts { source, .. } => source.status_code(), Error::RequestRegion { source, .. } => source.status_code(), Error::RequestDeletes { source, .. } => source.status_code(), + Error::SubstraitCodec { source, .. } => source.status_code(), Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => { source.status_code() diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 2d60fa1f2d..17ef9f81fc 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -160,6 +160,10 @@ impl StatementExecutor { let _ = self.create_external_table(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) } + Statement::CreateView(stmt) => { + let _ = self.create_view(stmt, query_ctx).await?; + Ok(Output::new_with_affected_rows(0)) + } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { let (catalog, schema, table) = @@ -252,6 +256,13 @@ impl StatementExecutor { .context(PlanStatementSnafu) } + pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { + self.query_engine + .planner() + .optimize(plan) + .context(PlanStatementSnafu) + } + #[tracing::instrument(skip_all)] async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { let plan = self.plan(stmt, query_ctx.clone()).await?; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 2ab87fa451..e9e3c97beb 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -39,16 +39,21 @@ use datatypes::value::Value; use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; +use query::parser::QueryStatement; use query::sql::create_table_stmt; use regex::Regex; use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; -use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions}; +use sql::statements::create::{ + CreateExternalTable, CreateTable, CreateTableLike, CreateView, Partitions, +}; use sql::statements::sql_value_to_value; +use sql::statements::statement::Statement; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterKind, AlterTableRequest, TableOptions}; @@ -60,7 +65,7 @@ use crate::error::{ CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu, DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, - ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, + ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; @@ -320,6 +325,33 @@ impl StatementExecutor { .collect()) } + #[tracing::instrument(skip_all)] + pub async fn create_view( + &self, + create_view: CreateView, + ctx: QueryContextRef, + ) -> Result { + // convert input into logical plan + let logical_plan = match *create_view.input { + Statement::Query(query) => { + self.plan(QueryStatement::Sql(Statement::Query(query)), ctx) + .await? + } + Statement::Tql(query) => self.plan_tql(query, &ctx).await?, + _ => { + todo!("throw an error") + } + }; + let optimized_plan = self.optimize_logical_plan(logical_plan)?; + + // encode logical plan + let encoded_plan = DFLogicalSubstraitConvertor + .encode(&optimized_plan.unwrap_df_plan()) + .context(SubstraitCodecSnafu)?; + + todo!() + } + #[tracing::instrument(skip_all)] pub async fn alter_logical_tables(&self, alter_table_exprs: Vec) -> Result { let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index 02b739c285..1654005d49 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -20,6 +20,7 @@ use query::parser::{ PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, }; +use query::plan::LogicalPlan; use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::tql::Tql; @@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re use crate::statement::StatementExecutor; impl StatementExecutor { + /// Plan the given [Tql] query and return the [LogicalPlan]. #[tracing::instrument(skip_all)] - pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result { let stmt = match tql { Tql::Eval(eval) => { let promql = PromQuery { @@ -84,12 +86,17 @@ impl StatementExecutor { .unwrap() } }; - let plan = self - .query_engine + self.query_engine .planner() .plan(stmt, query_ctx.clone()) .await - .context(PlanStatementSnafu)?; + .context(PlanStatementSnafu) + } + + /// Execute the given [Tql] query and return the result. + #[tracing::instrument(skip_all)] + pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + let plan = self.plan_tql(tql, &query_ctx).await?; self.query_engine .execute(plan, query_ctx) .await diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 4462302d98..e81d00a845 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -86,6 +86,13 @@ impl LogicalPlan { .context(DataFusionSnafu) .map(LogicalPlan::DfPlan) } + + /// Unwrap the logical plan into a DataFusion logical plan + pub fn unwrap_df_plan(self) -> DfLogicalPlan { + match self { + LogicalPlan::DfPlan(plan) => plan, + } + } } impl From for LogicalPlan { diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index eb3cb255d6..5f350a638d 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext}; pub trait LogicalPlanner: Send + Sync { async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result; + fn optimize(&self, plan: LogicalPlan) -> Result; + fn as_any(&self) -> &dyn Any; } @@ -145,6 +147,14 @@ impl DfLogicalPlanner { .map_err(BoxedError::new) .context(QueryPlanSnafu) } + + #[tracing::instrument(skip_all)] + fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { + self.engine_state + .optimize_logical_plan(plan.unwrap_df_plan()) + .context(DataFusionSnafu) + .map(Into::into) + } } #[async_trait] @@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner { } } + fn optimize(&self, plan: LogicalPlan) -> Result { + self.optimize_logical_plan(plan) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 18af09973e..942ac7c18f 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -147,6 +147,11 @@ impl QueryEngineState { }) } + /// Run the full logical plan optimize phase for the given plan. + pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult { + self.session_state().optimize(&plan) + } + /// Register an udf function. /// Will override if the function with same name is already registered. pub fn register_function(&self, func: FunctionRef) { diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index cfcbd8d682..3542b81ba6 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -20,6 +20,7 @@ use sqlparser::ast::Expr; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue}; +use crate::statements::statement::Statement; use crate::statements::OptionMap; const LINE_SEP: &str = ",\n"; @@ -237,6 +238,17 @@ pub struct CreateTableLike { pub source_name: ObjectName, } +#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] +pub struct CreateView { + /// View name + pub view_name: ObjectName, + /// The clause after `As` that defines the VIEW. + /// Can only be either [Statement::Query] or [Statement::Tql]. + pub input: Box, + /// Whether to replace existing VIEW + pub or_replace: bool, +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 54ca637283..167531168d 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -16,21 +16,21 @@ use datafusion_sql::parser::Statement as DfStatement; use sqlparser::ast::Statement as SpStatement; use sqlparser_derive::{Visit, VisitMut}; -use super::drop::DropDatabase; -use super::show::ShowVariables; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::alter::AlterTable; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, + CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateView, }; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; -use crate::statements::drop::DropTable; +use crate::statements::drop::{DropDatabase, DropTable}; use crate::statements::explain::Explain; use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::set_variables::SetVariables; -use crate::statements::show::{ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowTables}; +use crate::statements::show::{ + ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowTables, ShowVariables, +}; use crate::statements::tql::Tql; use crate::statements::truncate::TruncateTable; @@ -50,6 +50,8 @@ pub enum Statement { CreateExternalTable(CreateExternalTable), // CREATE TABLE ... LIKE CreateTableLike(CreateTableLike), + // CREATE VIEW ... AS + CreateView(CreateView), // DROP TABLE DropTable(DropTable), // DROP DATABASE