mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
2 Commits
v0.15.2
...
create-vie
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94409967be | ||
|
|
7503992d61 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6298,6 +6298,7 @@ dependencies = [
|
|||||||
"sql",
|
"sql",
|
||||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||||
"store-api",
|
"store-api",
|
||||||
|
"substrait 0.7.2",
|
||||||
"table",
|
"table",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tonic 0.11.0",
|
"tonic 0.11.0",
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ snafu.workspace = true
|
|||||||
sql.workspace = true
|
sql.workspace = true
|
||||||
sqlparser.workspace = true
|
sqlparser.workspace = true
|
||||||
store-api.workspace = true
|
store-api.workspace = true
|
||||||
|
substrait.workspace = true
|
||||||
table.workspace = true
|
table.workspace = true
|
||||||
tokio.workspace = true
|
tokio.workspace = true
|
||||||
tonic.workspace = true
|
tonic.workspace = true
|
||||||
|
|||||||
@@ -541,6 +541,12 @@ pub enum Error {
|
|||||||
end: String,
|
end: String,
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to convert between logical plan and substrait plan"))]
|
||||||
|
SubstraitCodec {
|
||||||
|
location: Location,
|
||||||
|
source: substrait::error::Error,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@@ -597,6 +603,7 @@ impl ErrorExt for Error {
|
|||||||
Error::RequestInserts { source, .. } => source.status_code(),
|
Error::RequestInserts { source, .. } => source.status_code(),
|
||||||
Error::RequestRegion { source, .. } => source.status_code(),
|
Error::RequestRegion { source, .. } => source.status_code(),
|
||||||
Error::RequestDeletes { source, .. } => source.status_code(),
|
Error::RequestDeletes { source, .. } => source.status_code(),
|
||||||
|
Error::SubstraitCodec { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
|
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
|
||||||
source.status_code()
|
source.status_code()
|
||||||
|
|||||||
@@ -164,6 +164,10 @@ impl StatementExecutor {
|
|||||||
let _ = self.create_external_table(stmt, query_ctx).await?;
|
let _ = self.create_external_table(stmt, query_ctx).await?;
|
||||||
Ok(Output::new_with_affected_rows(0))
|
Ok(Output::new_with_affected_rows(0))
|
||||||
}
|
}
|
||||||
|
Statement::CreateView(stmt) => {
|
||||||
|
let _ = self.create_view(stmt, query_ctx).await?;
|
||||||
|
Ok(Output::new_with_affected_rows(0))
|
||||||
|
}
|
||||||
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
|
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
|
||||||
Statement::DropTable(stmt) => {
|
Statement::DropTable(stmt) => {
|
||||||
let (catalog, schema, table) =
|
let (catalog, schema, table) =
|
||||||
@@ -256,6 +260,13 @@ impl StatementExecutor {
|
|||||||
.context(PlanStatementSnafu)
|
.context(PlanStatementSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||||
|
self.query_engine
|
||||||
|
.planner()
|
||||||
|
.optimize(plan)
|
||||||
|
.context(PlanStatementSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
|
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||||
let plan = self.plan(stmt, query_ctx.clone()).await?;
|
let plan = self.plan(stmt, query_ctx.clone()).await?;
|
||||||
|
|||||||
@@ -39,16 +39,21 @@ use datatypes::value::Value;
|
|||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||||
use partition::partition::{PartitionBound, PartitionDef};
|
use partition::partition::{PartitionBound, PartitionDef};
|
||||||
|
use query::parser::QueryStatement;
|
||||||
use query::sql::create_table_stmt;
|
use query::sql::create_table_stmt;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use session::table_name::table_idents_to_full_name;
|
use session::table_name::table_idents_to_full_name;
|
||||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||||
use sql::statements::alter::AlterTable;
|
use sql::statements::alter::AlterTable;
|
||||||
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
|
use sql::statements::create::{
|
||||||
|
CreateExternalTable, CreateTable, CreateTableLike, CreateView, Partitions,
|
||||||
|
};
|
||||||
use sql::statements::sql_value_to_value;
|
use sql::statements::sql_value_to_value;
|
||||||
|
use sql::statements::statement::Statement;
|
||||||
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
|
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
|
||||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||||
|
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||||
use table::dist_table::DistTable;
|
use table::dist_table::DistTable;
|
||||||
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
||||||
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
|
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
|
||||||
@@ -60,7 +65,7 @@ use crate::error::{
|
|||||||
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
|
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
|
||||||
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
|
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
|
||||||
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
|
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
|
||||||
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
|
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
|
||||||
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
||||||
};
|
};
|
||||||
use crate::expr_factory;
|
use crate::expr_factory;
|
||||||
@@ -320,6 +325,33 @@ impl StatementExecutor {
|
|||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub async fn create_view(
|
||||||
|
&self,
|
||||||
|
create_view: CreateView,
|
||||||
|
ctx: QueryContextRef,
|
||||||
|
) -> Result<TableRef> {
|
||||||
|
// convert input into logical plan
|
||||||
|
let logical_plan = match *create_view.input {
|
||||||
|
Statement::Query(query) => {
|
||||||
|
self.plan(QueryStatement::Sql(Statement::Query(query)), ctx)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
Statement::Tql(query) => self.plan_tql(query, &ctx).await?,
|
||||||
|
_ => {
|
||||||
|
todo!("throw an error")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let optimized_plan = self.optimize_logical_plan(logical_plan)?;
|
||||||
|
|
||||||
|
// encode logical plan
|
||||||
|
let encoded_plan = DFLogicalSubstraitConvertor
|
||||||
|
.encode(&optimized_plan.unwrap_df_plan())
|
||||||
|
.context(SubstraitCodecSnafu)?;
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
|
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
|
||||||
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ use query::parser::{
|
|||||||
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
|
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
|
||||||
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
|
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
|
||||||
};
|
};
|
||||||
|
use query::plan::LogicalPlan;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use sql::statements::tql::Tql;
|
use sql::statements::tql::Tql;
|
||||||
@@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
|
|||||||
use crate::statement::StatementExecutor;
|
use crate::statement::StatementExecutor;
|
||||||
|
|
||||||
impl StatementExecutor {
|
impl StatementExecutor {
|
||||||
|
/// Plan the given [Tql] query and return the [LogicalPlan].
|
||||||
#[tracing::instrument(skip_all)]
|
#[tracing::instrument(skip_all)]
|
||||||
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
|
pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
|
||||||
let stmt = match tql {
|
let stmt = match tql {
|
||||||
Tql::Eval(eval) => {
|
Tql::Eval(eval) => {
|
||||||
let promql = PromQuery {
|
let promql = PromQuery {
|
||||||
@@ -86,12 +88,17 @@ impl StatementExecutor {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let plan = self
|
self.query_engine
|
||||||
.query_engine
|
|
||||||
.planner()
|
.planner()
|
||||||
.plan(stmt, query_ctx.clone())
|
.plan(stmt, query_ctx.clone())
|
||||||
.await
|
.await
|
||||||
.context(PlanStatementSnafu)?;
|
.context(PlanStatementSnafu)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute the given [Tql] query and return the result.
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
|
||||||
|
let plan = self.plan_tql(tql, &query_ctx).await?;
|
||||||
self.query_engine
|
self.query_engine
|
||||||
.execute(plan, query_ctx)
|
.execute(plan, query_ctx)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -87,6 +87,13 @@ impl LogicalPlan {
|
|||||||
.context(DataFusionSnafu)
|
.context(DataFusionSnafu)
|
||||||
.map(LogicalPlan::DfPlan)
|
.map(LogicalPlan::DfPlan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Unwrap the logical plan into a DataFusion logical plan
|
||||||
|
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
|
||||||
|
match self {
|
||||||
|
LogicalPlan::DfPlan(plan) => plan,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<DfLogicalPlan> for LogicalPlan {
|
impl From<DfLogicalPlan> for LogicalPlan {
|
||||||
|
|||||||
@@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
|
|||||||
pub trait LogicalPlanner: Send + Sync {
|
pub trait LogicalPlanner: Send + Sync {
|
||||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||||
|
|
||||||
|
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
|
||||||
|
|
||||||
fn as_any(&self) -> &dyn Any;
|
fn as_any(&self) -> &dyn Any;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,6 +147,14 @@ impl DfLogicalPlanner {
|
|||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(QueryPlanSnafu)
|
.context(QueryPlanSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all)]
|
||||||
|
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||||
|
self.engine_state
|
||||||
|
.optimize_logical_plan(plan.unwrap_df_plan())
|
||||||
|
.context(DataFusionSnafu)
|
||||||
|
.map(Into::into)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||||
|
self.optimize_logical_plan(plan)
|
||||||
|
}
|
||||||
|
|
||||||
fn as_any(&self) -> &dyn Any {
|
fn as_any(&self) -> &dyn Any {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -142,6 +142,11 @@ impl QueryEngineState {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run the full logical plan optimize phase for the given plan.
|
||||||
|
pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
|
||||||
|
self.session_state().optimize(&plan)
|
||||||
|
}
|
||||||
|
|
||||||
/// Register an udf function.
|
/// Register an udf function.
|
||||||
/// Will override if the function with same name is already registered.
|
/// Will override if the function with same name is already registered.
|
||||||
pub fn register_function(&self, func: FunctionRef) {
|
pub fn register_function(&self, func: FunctionRef) {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ use sqlparser::ast::Expr;
|
|||||||
use sqlparser_derive::{Visit, VisitMut};
|
use sqlparser_derive::{Visit, VisitMut};
|
||||||
|
|
||||||
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
|
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
|
||||||
|
use crate::statements::statement::Statement;
|
||||||
use crate::statements::OptionMap;
|
use crate::statements::OptionMap;
|
||||||
|
|
||||||
const LINE_SEP: &str = ",\n";
|
const LINE_SEP: &str = ",\n";
|
||||||
@@ -237,6 +238,17 @@ pub struct CreateTableLike {
|
|||||||
pub source_name: ObjectName,
|
pub source_name: ObjectName,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
|
||||||
|
pub struct CreateView {
|
||||||
|
/// View name
|
||||||
|
pub view_name: ObjectName,
|
||||||
|
/// The clause after `As` that defines the VIEW.
|
||||||
|
/// Can only be either [Statement::Query] or [Statement::Tql].
|
||||||
|
pub input: Box<Statement>,
|
||||||
|
/// Whether to replace existing VIEW
|
||||||
|
pub or_replace: bool,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::assert_matches::assert_matches;
|
use std::assert_matches::assert_matches;
|
||||||
|
|||||||
@@ -16,22 +16,20 @@ use datafusion_sql::parser::Statement as DfStatement;
|
|||||||
use sqlparser::ast::Statement as SpStatement;
|
use sqlparser::ast::Statement as SpStatement;
|
||||||
use sqlparser_derive::{Visit, VisitMut};
|
use sqlparser_derive::{Visit, VisitMut};
|
||||||
|
|
||||||
use super::drop::DropDatabase;
|
|
||||||
use super::show::ShowVariables;
|
|
||||||
use crate::error::{ConvertToDfStatementSnafu, Error};
|
use crate::error::{ConvertToDfStatementSnafu, Error};
|
||||||
use crate::statements::alter::AlterTable;
|
use crate::statements::alter::AlterTable;
|
||||||
use crate::statements::create::{
|
use crate::statements::create::{
|
||||||
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike,
|
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateView,
|
||||||
};
|
};
|
||||||
use crate::statements::delete::Delete;
|
use crate::statements::delete::Delete;
|
||||||
use crate::statements::describe::DescribeTable;
|
use crate::statements::describe::DescribeTable;
|
||||||
use crate::statements::drop::DropTable;
|
use crate::statements::drop::{DropDatabase, DropTable};
|
||||||
use crate::statements::explain::Explain;
|
use crate::statements::explain::Explain;
|
||||||
use crate::statements::insert::Insert;
|
use crate::statements::insert::Insert;
|
||||||
use crate::statements::query::Query;
|
use crate::statements::query::Query;
|
||||||
use crate::statements::set_variables::SetVariables;
|
use crate::statements::set_variables::SetVariables;
|
||||||
use crate::statements::show::{
|
use crate::statements::show::{
|
||||||
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
|
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
|
||||||
};
|
};
|
||||||
use crate::statements::tql::Tql;
|
use crate::statements::tql::Tql;
|
||||||
use crate::statements::truncate::TruncateTable;
|
use crate::statements::truncate::TruncateTable;
|
||||||
@@ -52,6 +50,8 @@ pub enum Statement {
|
|||||||
CreateExternalTable(CreateExternalTable),
|
CreateExternalTable(CreateExternalTable),
|
||||||
// CREATE TABLE ... LIKE
|
// CREATE TABLE ... LIKE
|
||||||
CreateTableLike(CreateTableLike),
|
CreateTableLike(CreateTableLike),
|
||||||
|
// CREATE VIEW ... AS
|
||||||
|
CreateView(CreateView),
|
||||||
// DROP TABLE
|
// DROP TABLE
|
||||||
DropTable(DropTable),
|
DropTable(DropTable),
|
||||||
// DROP DATABASE
|
// DROP DATABASE
|
||||||
|
|||||||
Reference in New Issue
Block a user