mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
2 Commits
feat/prefi
...
create-vie
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
94409967be | ||
|
|
7503992d61 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -6298,6 +6298,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
|
||||
"store-api",
|
||||
"substrait 0.7.2",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.11.0",
|
||||
|
||||
@@ -52,6 +52,7 @@ snafu.workspace = true
|
||||
sql.workspace = true
|
||||
sqlparser.workspace = true
|
||||
store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -541,6 +541,12 @@ pub enum Error {
|
||||
end: String,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert between logical plan and substrait plan"))]
|
||||
SubstraitCodec {
|
||||
location: Location,
|
||||
source: substrait::error::Error,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -597,6 +603,7 @@ impl ErrorExt for Error {
|
||||
Error::RequestInserts { source, .. } => source.status_code(),
|
||||
Error::RequestRegion { source, .. } => source.status_code(),
|
||||
Error::RequestDeletes { source, .. } => source.status_code(),
|
||||
Error::SubstraitCodec { source, .. } => source.status_code(),
|
||||
|
||||
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
|
||||
source.status_code()
|
||||
|
||||
@@ -164,6 +164,10 @@ impl StatementExecutor {
|
||||
let _ = self.create_external_table(stmt, query_ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
Statement::CreateView(stmt) => {
|
||||
let _ = self.create_view(stmt, query_ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
|
||||
Statement::DropTable(stmt) => {
|
||||
let (catalog, schema, table) =
|
||||
@@ -256,6 +260,13 @@ impl StatementExecutor {
|
||||
.context(PlanStatementSnafu)
|
||||
}
|
||||
|
||||
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.query_engine
|
||||
.planner()
|
||||
.optimize(plan)
|
||||
.context(PlanStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let plan = self.plan(stmt, query_ctx.clone()).await?;
|
||||
|
||||
@@ -39,16 +39,21 @@ use datatypes::value::Value;
|
||||
use lazy_static::lazy_static;
|
||||
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
|
||||
use partition::partition::{PartitionBound, PartitionDef};
|
||||
use query::parser::QueryStatement;
|
||||
use query::sql::create_table_stmt;
|
||||
use regex::Regex;
|
||||
use session::context::QueryContextRef;
|
||||
use session::table_name::table_idents_to_full_name;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
use sql::statements::alter::AlterTable;
|
||||
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
|
||||
use sql::statements::create::{
|
||||
CreateExternalTable, CreateTable, CreateTableLike, CreateView, Partitions,
|
||||
};
|
||||
use sql::statements::sql_value_to_value;
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
|
||||
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use table::dist_table::DistTable;
|
||||
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
|
||||
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
|
||||
@@ -60,7 +65,7 @@ use crate::error::{
|
||||
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
|
||||
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
|
||||
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
|
||||
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
|
||||
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
|
||||
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
|
||||
};
|
||||
use crate::expr_factory;
|
||||
@@ -320,6 +325,33 @@ impl StatementExecutor {
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn create_view(
|
||||
&self,
|
||||
create_view: CreateView,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
// convert input into logical plan
|
||||
let logical_plan = match *create_view.input {
|
||||
Statement::Query(query) => {
|
||||
self.plan(QueryStatement::Sql(Statement::Query(query)), ctx)
|
||||
.await?
|
||||
}
|
||||
Statement::Tql(query) => self.plan_tql(query, &ctx).await?,
|
||||
_ => {
|
||||
todo!("throw an error")
|
||||
}
|
||||
};
|
||||
let optimized_plan = self.optimize_logical_plan(logical_plan)?;
|
||||
|
||||
// encode logical plan
|
||||
let encoded_plan = DFLogicalSubstraitConvertor
|
||||
.encode(&optimized_plan.unwrap_df_plan())
|
||||
.context(SubstraitCodecSnafu)?;
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
|
||||
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();
|
||||
|
||||
@@ -20,6 +20,7 @@ use query::parser::{
|
||||
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
|
||||
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
|
||||
};
|
||||
use query::plan::LogicalPlan;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sql::statements::tql::Tql;
|
||||
@@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
impl StatementExecutor {
|
||||
/// Plan the given [Tql] query and return the [LogicalPlan].
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
|
||||
let stmt = match tql {
|
||||
Tql::Eval(eval) => {
|
||||
let promql = PromQuery {
|
||||
@@ -86,12 +88,17 @@ impl StatementExecutor {
|
||||
.unwrap()
|
||||
}
|
||||
};
|
||||
let plan = self
|
||||
.query_engine
|
||||
self.query_engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx.clone())
|
||||
.await
|
||||
.context(PlanStatementSnafu)?;
|
||||
.context(PlanStatementSnafu)
|
||||
}
|
||||
|
||||
/// Execute the given [Tql] query and return the result.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let plan = self.plan_tql(tql, &query_ctx).await?;
|
||||
self.query_engine
|
||||
.execute(plan, query_ctx)
|
||||
.await
|
||||
|
||||
@@ -87,6 +87,13 @@ impl LogicalPlan {
|
||||
.context(DataFusionSnafu)
|
||||
.map(LogicalPlan::DfPlan)
|
||||
}
|
||||
|
||||
/// Unwrap the logical plan into a DataFusion logical plan
|
||||
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
|
||||
match self {
|
||||
LogicalPlan::DfPlan(plan) => plan,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DfLogicalPlan> for LogicalPlan {
|
||||
|
||||
@@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
|
||||
pub trait LogicalPlanner: Send + Sync {
|
||||
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
|
||||
|
||||
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
@@ -145,6 +147,14 @@ impl DfLogicalPlanner {
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryPlanSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.engine_state
|
||||
.optimize_logical_plan(plan.unwrap_df_plan())
|
||||
.context(DataFusionSnafu)
|
||||
.map(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner {
|
||||
}
|
||||
}
|
||||
|
||||
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
|
||||
self.optimize_logical_plan(plan)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
@@ -142,6 +142,11 @@ impl QueryEngineState {
|
||||
})
|
||||
}
|
||||
|
||||
/// Run the full logical plan optimize phase for the given plan.
|
||||
pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
|
||||
self.session_state().optimize(&plan)
|
||||
}
|
||||
|
||||
/// Register an udf function.
|
||||
/// Will override if the function with same name is already registered.
|
||||
pub fn register_function(&self, func: FunctionRef) {
|
||||
|
||||
@@ -20,6 +20,7 @@ use sqlparser::ast::Expr;
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::statements::OptionMap;
|
||||
|
||||
const LINE_SEP: &str = ",\n";
|
||||
@@ -237,6 +238,17 @@ pub struct CreateTableLike {
|
||||
pub source_name: ObjectName,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
|
||||
pub struct CreateView {
|
||||
/// View name
|
||||
pub view_name: ObjectName,
|
||||
/// The clause after `As` that defines the VIEW.
|
||||
/// Can only be either [Statement::Query] or [Statement::Tql].
|
||||
pub input: Box<Statement>,
|
||||
/// Whether to replace existing VIEW
|
||||
pub or_replace: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
@@ -16,22 +16,20 @@ use datafusion_sql::parser::Statement as DfStatement;
|
||||
use sqlparser::ast::Statement as SpStatement;
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
use super::drop::DropDatabase;
|
||||
use super::show::ShowVariables;
|
||||
use crate::error::{ConvertToDfStatementSnafu, Error};
|
||||
use crate::statements::alter::AlterTable;
|
||||
use crate::statements::create::{
|
||||
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike,
|
||||
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateView,
|
||||
};
|
||||
use crate::statements::delete::Delete;
|
||||
use crate::statements::describe::DescribeTable;
|
||||
use crate::statements::drop::DropTable;
|
||||
use crate::statements::drop::{DropDatabase, DropTable};
|
||||
use crate::statements::explain::Explain;
|
||||
use crate::statements::insert::Insert;
|
||||
use crate::statements::query::Query;
|
||||
use crate::statements::set_variables::SetVariables;
|
||||
use crate::statements::show::{
|
||||
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
|
||||
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
|
||||
};
|
||||
use crate::statements::tql::Tql;
|
||||
use crate::statements::truncate::TruncateTable;
|
||||
@@ -52,6 +50,8 @@ pub enum Statement {
|
||||
CreateExternalTable(CreateExternalTable),
|
||||
// CREATE TABLE ... LIKE
|
||||
CreateTableLike(CreateTableLike),
|
||||
// CREATE VIEW ... AS
|
||||
CreateView(CreateView),
|
||||
// DROP TABLE
|
||||
DropTable(DropTable),
|
||||
// DROP DATABASE
|
||||
|
||||
Reference in New Issue
Block a user