Compare commits

...

2 Commits

Author SHA1 Message Date
Ruihang Xia
94409967be Merge branch 'main' into create-view 2024-04-22 21:08:22 +08:00
Ruihang Xia
7503992d61 add statement
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-04-17 19:13:54 +08:00
11 changed files with 108 additions and 11 deletions

1
Cargo.lock generated
View File

@@ -6298,6 +6298,7 @@ dependencies = [
"sql",
"sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)",
"store-api",
"substrait 0.7.2",
"table",
"tokio",
"tonic 0.11.0",

View File

@@ -52,6 +52,7 @@ snafu.workspace = true
sql.workspace = true
sqlparser.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
tonic.workspace = true

View File

@@ -541,6 +541,12 @@ pub enum Error {
end: String,
location: Location,
},
#[snafu(display("Failed to convert between logical plan and substrait plan"))]
SubstraitCodec {
location: Location,
source: substrait::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -597,6 +603,7 @@ impl ErrorExt for Error {
Error::RequestInserts { source, .. } => source.status_code(),
Error::RequestRegion { source, .. } => source.status_code(),
Error::RequestDeletes { source, .. } => source.status_code(),
Error::SubstraitCodec { source, .. } => source.status_code(),
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
source.status_code()

View File

@@ -164,6 +164,10 @@ impl StatementExecutor {
let _ = self.create_external_table(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::CreateView(stmt) => {
let _ = self.create_view(stmt, query_ctx).await?;
Ok(Output::new_with_affected_rows(0))
}
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
@@ -256,6 +260,13 @@ impl StatementExecutor {
.context(PlanStatementSnafu)
}
pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.query_engine
.planner()
.optimize(plan)
.context(PlanStatementSnafu)
}
#[tracing::instrument(skip_all)]
async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan(stmt, query_ctx.clone()).await?;

View File

@@ -39,16 +39,21 @@ use datatypes::value::Value;
use lazy_static::lazy_static;
use partition::expr::{Operand, PartitionExpr, RestrictedOp};
use partition::partition::{PartitionBound, PartitionDef};
use query::parser::QueryStatement;
use query::sql::create_table_stmt;
use regex::Regex;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{ensure, IntoError, OptionExt, ResultExt};
use sql::statements::alter::AlterTable;
use sql::statements::create::{CreateExternalTable, CreateTable, CreateTableLike, Partitions};
use sql::statements::create::{
CreateExternalTable, CreateTable, CreateTableLike, CreateView, Partitions,
};
use sql::statements::sql_value_to_value;
use sql::statements::statement::Statement;
use sqlparser::ast::{Expr, Ident, Value as ParserValue};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::dist_table::DistTable;
use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType};
use table::requests::{AlterKind, AlterTableRequest, TableOptions};
@@ -60,7 +65,7 @@ use crate::error::{
CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu,
DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu,
InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu,
ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, SubstraitCodecSnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
@@ -320,6 +325,33 @@ impl StatementExecutor {
.collect())
}
#[tracing::instrument(skip_all)]
pub async fn create_view(
&self,
create_view: CreateView,
ctx: QueryContextRef,
) -> Result<TableRef> {
// convert input into logical plan
let logical_plan = match *create_view.input {
Statement::Query(query) => {
self.plan(QueryStatement::Sql(Statement::Query(query)), ctx)
.await?
}
Statement::Tql(query) => self.plan_tql(query, &ctx).await?,
_ => {
todo!("throw an error")
}
};
let optimized_plan = self.optimize_logical_plan(logical_plan)?;
// encode logical plan
let encoded_plan = DFLogicalSubstraitConvertor
.encode(&optimized_plan.unwrap_df_plan())
.context(SubstraitCodecSnafu)?;
todo!()
}
#[tracing::instrument(skip_all)]
pub async fn alter_logical_tables(&self, alter_table_exprs: Vec<AlterExpr>) -> Result<Output> {
let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer();

View File

@@ -20,6 +20,7 @@ use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use query::plan::LogicalPlan;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::tql::Tql;
@@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re
use crate::statement::StatementExecutor;
impl StatementExecutor {
/// Plan the given [Tql] query and return the [LogicalPlan].
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result<LogicalPlan> {
let stmt = match tql {
Tql::Eval(eval) => {
let promql = PromQuery {
@@ -86,12 +88,17 @@ impl StatementExecutor {
.unwrap()
}
};
let plan = self
.query_engine
self.query_engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
.context(PlanStatementSnafu)
}
/// Execute the given [Tql] query and return the result.
#[tracing::instrument(skip_all)]
pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result<Output> {
let plan = self.plan_tql(tql, &query_ctx).await?;
self.query_engine
.execute(plan, query_ctx)
.await

View File

@@ -87,6 +87,13 @@ impl LogicalPlan {
.context(DataFusionSnafu)
.map(LogicalPlan::DfPlan)
}
/// Unwrap the logical plan into a DataFusion logical plan
pub fn unwrap_df_plan(self) -> DfLogicalPlan {
match self {
LogicalPlan::DfPlan(plan) => plan,
}
}
}
impl From<DfLogicalPlan> for LogicalPlan {

View File

@@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext};
pub trait LogicalPlanner: Send + Sync {
async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<LogicalPlan>;
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan>;
fn as_any(&self) -> &dyn Any;
}
@@ -145,6 +147,14 @@ impl DfLogicalPlanner {
.map_err(BoxedError::new)
.context(QueryPlanSnafu)
}
#[tracing::instrument(skip_all)]
fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.engine_state
.optimize_logical_plan(plan.unwrap_df_plan())
.context(DataFusionSnafu)
.map(Into::into)
}
}
#[async_trait]
@@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner {
}
}
fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.optimize_logical_plan(plan)
}
fn as_any(&self) -> &dyn Any {
self
}

View File

@@ -142,6 +142,11 @@ impl QueryEngineState {
})
}
/// Run the full logical plan optimize phase for the given plan.
pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult<DfLogicalPlan> {
self.session_state().optimize(&plan)
}
/// Register an udf function.
/// Will override if the function with same name is already registered.
pub fn register_function(&self, func: FunctionRef) {

View File

@@ -20,6 +20,7 @@ use sqlparser::ast::Expr;
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{ColumnDef, Ident, ObjectName, SqlOption, TableConstraint, Value as SqlValue};
use crate::statements::statement::Statement;
use crate::statements::OptionMap;
const LINE_SEP: &str = ",\n";
@@ -237,6 +238,17 @@ pub struct CreateTableLike {
pub source_name: ObjectName,
}
#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)]
pub struct CreateView {
/// View name
pub view_name: ObjectName,
/// The clause after `As` that defines the VIEW.
/// Can only be either [Statement::Query] or [Statement::Tql].
pub input: Box<Statement>,
/// Whether to replace existing VIEW
pub or_replace: bool,
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;

View File

@@ -16,22 +16,20 @@ use datafusion_sql::parser::Statement as DfStatement;
use sqlparser::ast::Statement as SpStatement;
use sqlparser_derive::{Visit, VisitMut};
use super::drop::DropDatabase;
use super::show::ShowVariables;
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::alter::AlterTable;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike,
CreateDatabase, CreateExternalTable, CreateTable, CreateTableLike, CreateView,
};
use crate::statements::delete::Delete;
use crate::statements::describe::DescribeTable;
use crate::statements::drop::DropTable;
use crate::statements::drop::{DropDatabase, DropTable};
use crate::statements::explain::Explain;
use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables,
ShowColumns, ShowCreateTable, ShowDatabases, ShowIndex, ShowKind, ShowTables, ShowVariables,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
@@ -52,6 +50,8 @@ pub enum Statement {
CreateExternalTable(CreateExternalTable),
// CREATE TABLE ... LIKE
CreateTableLike(CreateTableLike),
// CREATE VIEW ... AS
CreateView(CreateView),
// DROP TABLE
DropTable(DropTable),
// DROP DATABASE