diff --git a/docs/how-to/how-to-implement-sql-statement.md b/docs/how-to/how-to-implement-sql-statement.md new file mode 100644 index 0000000000..e7c9bc8322 --- /dev/null +++ b/docs/how-to/how-to-implement-sql-statement.md @@ -0,0 +1,74 @@ +This document introduces how to implement SQL statements in GreptimeDB. + +The execution entry point for SQL statements locates at Frontend Instance. You can see it has +implemented `SqlQueryHandler`: + +```rust +impl SqlQueryHandler for Instance { + type Error = Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + // ... + } +} +``` + +Normally, when a SQL query arrives at GreptimeDB, the `do_query` method will be called. After some parsing work, the SQL +will be feed into `StatementExecutor`: + +```rust +// in Frontend Instance: +self.statement_executor.execute_sql(stmt, query_ctx).await +``` + +That's where we handle our SQL statements. You can just create a new match arm for your statement there, then the +statement is implemented for both GreptimeDB Standalone and Cluster. You can see how `DESCRIBE TABLE` is implemented as +an example. + +Now, what if the statements should be handled differently for GreptimeDB Standalone and Cluster? You can see there's +a `SqlStatementExecutor` field in `StatementExecutor`. Each GreptimeDB Standalone and Cluster has its own implementation +of `SqlStatementExecutor`. If you are going to implement the statements differently in the two mode ( +like `CREATE TABLE`), you have to implement them in their own `SqlStatementExecutor`s. + +Summarize as the diagram below: + +```text + SQL query + | + v + +---------------------------+ + | SqlQueryHandler::do_query | + +---------------------------+ + | + | SQL parsing + v + +--------------------------------+ + | StatementExecutor::execute_sql | + +--------------------------------+ + | + | SQL execution + v + +----------------------------------+ + | commonly handled statements like | + | "plan_exec" for selection or | + +----------------------------------+ + | | + For Standalone | | For Cluster + v v ++---------------------------+ +---------------------------+ +| SqlStatementExecutor impl | | SqlStatementExecutor impl | +| in Datanode Instance | | in Frontend DistInstance | ++---------------------------+ +---------------------------+ +``` + +Note that some SQL statements can be executed in our QueryEngine, in the form of `LogicalPlan`. You can follow the +invocation path down to the `QueryEngine` implementation from `StatementExecutor::plan_exec`. For now, there's only +one `DatafusionQueryEngine` for both GreptimeDB Standalone and Cluster. That lone query engine works for both modes is +because GreptimeDB read/write data through `Table` trait, and each mode has its own `Table` implementation. + +We don't have any bias towards whether statements should be handled in query engine or `StatementExecutor`. You can +implement one kind of statement in both places. For example, `Insert` with selection is handled in query engine, because +we can easily do the query part there. However, `Insert` without selection is not, for the cost of parsing statement +to `LogicalPlan` is not neglectable. So generally if the SQL query is simple enough, you can handle it +in `StatementExecutor`; otherwise if it is complex or has some part of selection, it should be parsed to `LogicalPlan` +and handled in query engine. diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 7bef31dc3e..5656ef5a9d 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use common_query::Output; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; +use query::query_engine::SqlStatementExecutor; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; @@ -65,7 +66,7 @@ impl Instance { match stmt { // TODO(LFC): Remove SQL execution branch here. // Keep this because substrait can't handle much of SQLs now. - QueryStatement::Sql(Statement::Query(_)) => { + QueryStatement::Sql(Statement::Query(_)) | QueryStatement::Promql(_) => { let plan = self .query_engine .planner() @@ -77,7 +78,9 @@ impl Instance { .await .context(ExecuteLogicalPlanSnafu) } - _ => self.execute_stmt(stmt, ctx).await, + QueryStatement::Sql(stmt) => { + self.execute_sql(stmt, ctx).await.context(ExecuteSqlSnafu) + } } } Query::LogicalPlan(plan) => self.execute_logical(plan).await, @@ -242,12 +245,11 @@ mod test { let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); assert!(matches!(output, Output::AffectedRows(0))); - let stmt = QueryLanguageParser::parse_sql( + let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql( "INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)", - ) - .unwrap(); + ) else { unreachable!() }; let output = instance - .execute_stmt(stmt, QueryContext::arc()) + .execute_sql(stmt, QueryContext::arc()) .await .unwrap(); assert!(matches!(output, Output::AffectedRows(1))); diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 01553b29c4..cf1a461940 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -21,7 +21,7 @@ use common_telemetry::logging::info; use common_telemetry::timer; use query::error::QueryExecutionSnafu; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; -use query::query_engine::StatementHandler; +use query::query_engine::SqlStatementExecutor; use session::context::QueryContextRef; use snafu::prelude::*; use sql::ast::ObjectName; @@ -31,27 +31,23 @@ use table::engine::TableReference; use table::requests::{CopyDirection, CopyTableRequest, CreateDatabaseRequest, DropTableRequest}; use crate::error::{ - self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result, - TableIdProviderNotFoundSnafu, + self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, NotSupportSqlSnafu, + PlanStatementSnafu, Result, TableIdProviderNotFoundSnafu, }; use crate::instance::Instance; use crate::metrics; use crate::sql::{SqlHandler, SqlRequest}; impl Instance { - pub async fn execute_stmt( - &self, - stmt: QueryStatement, - query_ctx: QueryContextRef, - ) -> Result { + async fn do_execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { match stmt { - QueryStatement::Sql(Statement::Insert(insert)) => { + Statement::Insert(insert) => { let request = SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx) .await?; self.sql_handler.insert(request).await } - QueryStatement::Sql(Statement::CreateDatabase(create_database)) => { + Statement::CreateDatabase(create_database) => { let request = CreateDatabaseRequest { db_name: create_database.name.to_string(), create_if_not_exists: create_database.if_not_exists, @@ -64,7 +60,7 @@ impl Instance { .await } - QueryStatement::Sql(Statement::CreateTable(create_table)) => { + Statement::CreateTable(create_table) => { let table_id = self .table_id_provider .as_ref() @@ -88,10 +84,7 @@ impl Instance { .execute(SqlRequest::CreateTable(request), query_ctx) .await } - QueryStatement::Sql(Statement::CreateExternalTable(_create_external_table)) => { - unimplemented!() - } - QueryStatement::Sql(Statement::Alter(alter_table)) => { + Statement::Alter(alter_table) => { let name = alter_table.table_name().clone(); let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?; let table_ref = TableReference::full(&catalog, &schema, &table); @@ -100,7 +93,7 @@ impl Instance { .execute(SqlRequest::Alter(req), query_ctx) .await } - QueryStatement::Sql(Statement::DropTable(drop_table)) => { + Statement::DropTable(drop_table) => { let (catalog_name, schema_name, table_name) = table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?; let req = DropTableRequest { @@ -112,20 +105,7 @@ impl Instance { .execute(SqlRequest::DropTable(req), query_ctx) .await } - QueryStatement::Sql(Statement::ShowDatabases(show_databases)) => { - self.sql_handler - .execute(SqlRequest::ShowDatabases(show_databases), query_ctx) - .await - } - QueryStatement::Sql(Statement::ShowTables(show_tables)) => { - self.sql_handler - .execute(SqlRequest::ShowTables(show_tables), query_ctx) - .await - } - QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => { - unimplemented!("SHOW CREATE TABLE is unimplemented yet"); - } - QueryStatement::Sql(Statement::Copy(copy_table)) => { + Statement::Copy(copy_table) => { let req = match copy_table { CopyTable::To(copy_table) => { let CopyTableArgument { @@ -173,13 +153,10 @@ impl Instance { .execute(SqlRequest::CopyTable(req), query_ctx) .await } - QueryStatement::Sql(Statement::Query(_)) - | QueryStatement::Sql(Statement::Explain(_)) - | QueryStatement::Sql(Statement::Use(_)) - | QueryStatement::Sql(Statement::Tql(_)) - | QueryStatement::Sql(Statement::Delete(_)) - | QueryStatement::Sql(Statement::DescribeTable(_)) - | QueryStatement::Promql(_) => unreachable!(), + _ => NotSupportSqlSnafu { + msg: format!("not supported to execute {stmt:?}"), + } + .fail(), } } @@ -276,13 +253,13 @@ pub fn table_idents_to_full_name( } #[async_trait] -impl StatementHandler for Instance { - async fn handle_statement( +impl SqlStatementExecutor for Instance { + async fn execute_sql( &self, - stmt: QueryStatement, + stmt: Statement, query_ctx: QueryContextRef, ) -> query::error::Result { - self.execute_stmt(stmt, query_ctx) + self.do_execute_sql(stmt, query_ctx) .await .map_err(BoxedError::new) .context(QueryExecutionSnafu) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index c5f611aa5d..67547d7e9a 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -19,18 +19,15 @@ use common_error::prelude::BoxedError; use common_procedure::ProcedureManagerRef; use common_query::Output; use common_telemetry::error; -use query::sql::{show_databases, show_tables}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use sql::statements::show::{ShowDatabases, ShowTables}; use table::engine::manager::TableEngineManagerRef; use table::engine::{TableEngineProcedureRef, TableEngineRef, TableReference}; use table::requests::*; use table::{Table, TableRef}; use crate::error::{ - self, CloseTableEngineSnafu, ExecuteSqlSnafu, Result, TableEngineNotFoundSnafu, - TableNotFoundSnafu, + self, CloseTableEngineSnafu, Result, TableEngineNotFoundSnafu, TableNotFoundSnafu, }; use crate::instance::sql::table_idents_to_full_name; @@ -49,8 +46,6 @@ pub enum SqlRequest { Alter(AlterTableRequest), DropTable(DropTableRequest), FlushTable(FlushTableRequest), - ShowDatabases(ShowDatabases), - ShowTables(ShowTables), CopyTable(CopyTableRequest), } @@ -92,13 +87,6 @@ impl SqlHandler { CopyDirection::Export => self.copy_table_to(req).await, CopyDirection::Import => self.copy_table_from(req).await, }, - SqlRequest::ShowDatabases(req) => { - show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu) - } - SqlRequest::ShowTables(req) => { - show_tables(req, self.catalog_manager.clone(), query_ctx.clone()) - .context(ExecuteSqlSnafu) - } SqlRequest::FlushTable(req) => self.flush_table(req).await, }; if let Err(e) = &result { diff --git a/src/datanode/src/sql/create.rs b/src/datanode/src/sql/create.rs index ced3ef89ac..ca1ad2106e 100644 --- a/src/datanode/src/sql/create.rs +++ b/src/datanode/src/sql/create.rs @@ -326,7 +326,8 @@ mod tests { use common_base::readable_size::ReadableSize; use datatypes::prelude::ConcreteDataType; use datatypes::schema::Schema; - use query::parser::QueryLanguageParser; + use query::parser::{QueryLanguageParser, QueryStatement}; + use query::query_engine::SqlStatementExecutor; use session::context::QueryContext; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -560,10 +561,10 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host) ) engine=mito with(regions=1);"#; - let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); + let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() }; let output = instance .inner() - .execute_stmt(stmt, QueryContext::arc()) + .execute_sql(stmt, QueryContext::arc()) .await .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); @@ -577,10 +578,10 @@ mod tests { TIME INDEX (ts), PRIMARY KEY(host) ) engine=mito with(regions=1);"#; - let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); + let Ok(QueryStatement::Sql(stmt)) = QueryLanguageParser::parse_sql(sql) else { unreachable!() }; let output = instance .inner() - .execute_stmt(stmt, QueryContext::arc()) + .execute_sql(stmt, QueryContext::arc()) .await .unwrap(); assert!(matches!(output, Output::AffectedRows(0))); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 246b7c721b..5b3a036493 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -36,7 +36,6 @@ use common_catalog::consts::MITO_ENGINE; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; -use common_recordbatch::RecordBatches; use common_telemetry::logging::{debug, info}; use common_telemetry::timer; use datafusion::sql::sqlparser::ast::ObjectName; @@ -50,7 +49,6 @@ use partition::manager::PartitionRuleManager; use partition::route::TableRoutes; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; -use query::query_engine::StatementHandlerRef; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error as server_error; use servers::error::{ExecuteQuerySnafu, ParsePromQLSnafu}; @@ -66,17 +64,13 @@ use snafu::prelude::*; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::copy::CopyTable; -use sql::statements::describe::DescribeTable; use sql::statements::statement::Statement; -use sql::statements::tql::Tql; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, CatalogSnafu, DescribeStatementSnafu, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, - ExecuteStatementSnafu, ExternalSnafu, InvalidInsertRequestSnafu, MissingMetasrvOptsSnafu, - NotSupportedSnafu, ParseQuerySnafu, ParseSqlSnafu, PlanStatementSnafu, Result, - SqlExecInterceptedSnafu, TableNotFoundSnafu, + self, Error, ExecutePromqlSnafu, ExternalSnafu, InvalidInsertRequestSnafu, + MissingMetasrvOptsSnafu, ParseSqlSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; @@ -84,6 +78,7 @@ use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metric; use crate::script::ScriptExecutor; use crate::server::{start_server, ServerHandlers, Services}; +use crate::statement::StatementExecutor; #[async_trait] pub trait FrontendInstance: @@ -107,7 +102,7 @@ pub type FrontendInstanceRef = Arc; pub struct Instance { catalog_manager: CatalogManagerRef, script_executor: Arc, - statement_handler: StatementHandlerRef, + statement_executor: Arc, query_engine: QueryEngineRef, grpc_query_handler: GrpcQueryHandlerRef, @@ -154,11 +149,17 @@ impl Instance { let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + dist_instance.clone(), + )); + Ok(Instance { catalog_manager, script_executor, create_expr_factory: Arc::new(DefaultCreateExprFactory), - statement_handler: dist_instance.clone(), + statement_executor, query_engine, grpc_query_handler: dist_instance, plugins: plugins.clone(), @@ -201,11 +202,18 @@ impl Instance { let query_engine = dn_instance.query_engine(); let script_executor = Arc::new(ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?); + + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + dn_instance.clone(), + )); + Ok(Instance { catalog_manager: catalog_manager.clone(), script_executor, create_expr_factory: Arc::new(DefaultCreateExprFactory), - statement_handler: dn_instance.clone(), + statement_executor, query_engine, grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), plugins: Default::default(), @@ -235,10 +243,17 @@ impl Instance { .await .unwrap(), ); + + let statement_executor = Arc::new(StatementExecutor::new( + catalog_manager.clone(), + query_engine.clone(), + dist_instance.clone(), + )); + Instance { catalog_manager, script_executor, - statement_handler: dist_instance.clone(), + statement_executor, query_engine, create_expr_factory: Arc::new(DefaultCreateExprFactory), grpc_query_handler: dist_instance, @@ -389,21 +404,6 @@ impl Instance { .await } - fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { - let catalog = &query_ctx.current_catalog(); - ensure!( - self.catalog_manager - .schema(catalog, &db) - .context(error::CatalogSnafu)? - .is_some(), - error::SchemaNotFoundSnafu { schema_info: &db } - ); - - query_ctx.set_current_schema(&db); - - Ok(Output::RecordBatches(RecordBatches::empty())) - } - pub fn set_plugins(&mut self, map: Arc) { self.plugins = map; } @@ -418,6 +418,11 @@ impl Instance { .context(error::ShutdownServerSnafu) .map(|_| ()) } + + #[cfg(test)] + pub(crate) fn statement_executor(&self) -> Arc { + self.statement_executor.clone() + } } #[async_trait] @@ -437,104 +442,11 @@ fn parse_stmt(sql: &str) -> Result> { } impl Instance { - pub(crate) async fn plan_exec( - &self, - stmt: QueryStatement, - query_ctx: QueryContextRef, - ) -> Result { - let planner = self.query_engine.planner(); - let plan = planner - .plan(stmt, query_ctx.clone()) - .await - .context(PlanStatementSnafu)?; - self.query_engine - .execute(plan, query_ctx) - .await - .context(ExecLogicalPlanSnafu) - } - - async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { - let plan = match tql { - Tql::Eval(eval) => { - let promql = PromQuery { - start: eval.start, - end: eval.end, - step: eval.step, - query: eval.query, - }; - let stmt = QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?; - self.query_engine - .planner() - .plan(stmt, query_ctx.clone()) - .await - .context(PlanStatementSnafu)? - } - Tql::Explain(_) => unimplemented!(), - }; - self.query_engine - .execute(plan, query_ctx) - .await - .context(ExecLogicalPlanSnafu) - } - - async fn describe_table( - &self, - stmt: DescribeTable, - query_ctx: QueryContextRef, - ) -> Result { - let (catalog, schema, table) = table_idents_to_full_name(stmt.name(), query_ctx) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - let table = self - .catalog_manager - .table(&catalog, &schema, &table) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: stmt.name().to_string(), - })?; - - query::sql::describe_table(table).context(DescribeStatementSnafu) - } - async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { check_permission(self.plugins.clone(), &stmt, &query_ctx)?; - match stmt { - Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { - self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await - } - - // For performance consideration, only "insert with select" is executed by query engine. - // Plain insert ("insert with values") is still executed directly in statement. - Statement::Insert(ref insert) if insert.is_insert_select() => { - self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await - } - - Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, - - Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await, - - Statement::CreateDatabase(_) - | Statement::CreateExternalTable(_) - | Statement::ShowDatabases(_) - | Statement::CreateTable(_) - | Statement::ShowTables(_) - | Statement::Insert(_) - | Statement::Alter(_) - | Statement::DropTable(_) - | Statement::Copy(_) => self - .statement_handler - .handle_statement(QueryStatement::Sql(stmt), query_ctx) - .await - .context(ExecuteStatementSnafu), - Statement::Use(db) => self.handle_use(db, query_ctx), - Statement::ShowCreateTable(_) => NotSupportedSnafu { - feat: format!("{stmt:?}"), - } - .fail(), - } + let stmt = QueryStatement::Sql(stmt); + self.statement_executor.execute_stmt(stmt, query_ctx).await } } @@ -630,7 +542,8 @@ impl PromHandler for Instance { let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu { query: query.clone(), })?; - self.plan_exec(stmt, QueryContext::arc()) + self.statement_executor + .execute_stmt(stmt, QueryContext::arc()) .await .map_err(BoxedError::new) .with_context(|_| ExecuteQuerySnafu { @@ -732,6 +645,7 @@ mod tests { use api::v1::column::Values; use catalog::helper::{TableGlobalKey, TableGlobalValue}; + use common_recordbatch::RecordBatches; use datatypes::prelude::{ConcreteDataType, Value}; use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema}; use query::query_engine::options::QueryOptions; diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 5aae790900..4b92459338 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -44,9 +44,7 @@ use meta_client::rpc::{ }; use partition::partition::{PartitionBound, PartitionDef}; use query::error::QueryExecutionSnafu; -use query::parser::QueryStatement; -use query::query_engine::StatementHandler; -use query::sql::{show_databases, show_tables}; +use query::query_engine::SqlStatementExecutor; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::Value as SqlValue; @@ -62,10 +60,10 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogSnafu, ColumnDataTypeSnafu, - DeserializePartitionSnafu, InvokeDatanodeSnafu, NotSupportedSnafu, ParseSqlSnafu, - PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, - StartMetaClientSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, - ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu, + DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, + RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu, + TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu, + UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::table::DistTable; @@ -342,10 +340,6 @@ impl DistInstance { let table_name = TableName::new(catalog, schema, table); return self.drop_table(table_name).await; } - Statement::ShowDatabases(stmt) => show_databases(stmt, self.catalog_manager.clone()), - Statement::ShowTables(stmt) => { - show_tables(stmt, self.catalog_manager.clone(), query_ctx) - } Statement::Insert(insert) => { let (catalog, schema, table) = table_idents_to_full_name(insert.table_name(), query_ctx.clone()) @@ -521,21 +515,16 @@ impl DistInstance { } #[async_trait] -impl StatementHandler for DistInstance { - async fn handle_statement( +impl SqlStatementExecutor for DistInstance { + async fn execute_sql( &self, - stmt: QueryStatement, + stmt: Statement, query_ctx: QueryContextRef, ) -> query::error::Result { - match stmt { - QueryStatement::Sql(stmt) => self.handle_statement(stmt, query_ctx).await, - QueryStatement::Promql(_) => NotSupportedSnafu { - feat: "distributed execute promql".to_string(), - } - .fail(), - } - .map_err(BoxedError::new) - .context(QueryExecutionSnafu) + self.handle_statement(stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(QueryExecutionSnafu) } } @@ -693,16 +682,12 @@ fn find_partition_columns( #[cfg(test)] mod test { - use itertools::Itertools; - use query::parser::QueryLanguageParser; - use query::query_engine::StatementHandlerRef; use session::context::QueryContext; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; use super::*; - use crate::instance::parse_stmt; #[tokio::test] async fn test_parse_partitions() { @@ -744,107 +729,4 @@ ENGINE=mito", } } } - - async fn handle_sql(instance: &Arc, sql: &str) -> Output { - let stmt = parse_stmt(sql).unwrap().remove(0); - instance - .handle_statement(stmt, QueryContext::arc()) - .await - .unwrap() - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_show_databases() { - let instance = crate::tests::create_distributed_instance("test_show_databases").await; - let dist_instance = &instance.dist_instance; - - let sql = "create database test_show_databases"; - let output = handle_sql(dist_instance, sql).await; - match output { - Output::AffectedRows(rows) => assert_eq!(rows, 1), - _ => unreachable!(), - } - - let sql = "show databases"; - let output = handle_sql(dist_instance, sql).await; - match output { - Output::RecordBatches(r) => { - let expected1 = vec![ - "+---------------------+", - "| Schemas |", - "+---------------------+", - "| public |", - "| test_show_databases |", - "+---------------------+", - ] - .into_iter() - .join("\n"); - let expected2 = vec![ - "+---------------------+", - "| Schemas |", - "+---------------------+", - "| test_show_databases |", - "| public |", - "+---------------------+", - ] - .into_iter() - .join("\n"); - let lines = r.pretty_print().unwrap(); - assert!(lines == expected1 || lines == expected2) - } - _ => unreachable!(), - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_show_tables() { - let instance = crate::tests::create_distributed_instance("test_show_tables").await; - let dist_instance = &instance.dist_instance; - let datanode_instances = instance.datanodes; - - let sql = "create database test_show_tables"; - handle_sql(dist_instance, sql).await; - - let sql = " - CREATE TABLE greptime.test_show_tables.dist_numbers ( - ts BIGINT, - n INT, - TIME INDEX (ts), - ) - PARTITION BY RANGE COLUMNS (n) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), - ) - ENGINE=mito"; - handle_sql(dist_instance, sql).await; - - async fn assert_show_tables(handler: StatementHandlerRef) { - let sql = "show tables in test_show_tables"; - let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); - let output = handler - .handle_statement(stmt, QueryContext::arc()) - .await - .unwrap(); - match output { - Output::RecordBatches(r) => { - let expected = r#"+--------------+ -| Tables | -+--------------+ -| dist_numbers | -+--------------+"#; - assert_eq!(r.pretty_print().unwrap(), expected); - } - _ => unreachable!(), - } - } - - assert_show_tables(dist_instance.clone()).await; - - // Asserts that new table is created in Datanode as well. - for x in datanode_instances.values() { - assert_show_tables(x.clone()).await - } - } } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index db936c069e..e30f7ddc97 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -31,6 +31,7 @@ pub mod prom; pub mod prometheus; mod script; mod server; +pub(crate) mod statement; mod table; #[cfg(test)] mod tests; diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs new file mode 100644 index 0000000000..bc4306dd94 --- /dev/null +++ b/src/frontend/src/statement.rs @@ -0,0 +1,132 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod describe; +mod show; +mod tql; + +use catalog::CatalogManagerRef; +use common_query::Output; +use common_recordbatch::RecordBatches; +use query::parser::QueryStatement; +use query::query_engine::SqlStatementExecutorRef; +use query::QueryEngineRef; +use session::context::QueryContextRef; +use snafu::{ensure, ResultExt}; +use sql::statements::statement::Statement; + +use crate::error::{ + CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result, + SchemaNotFoundSnafu, +}; + +#[derive(Clone)] +pub(crate) struct StatementExecutor { + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + sql_stmt_executor: SqlStatementExecutorRef, +} + +impl StatementExecutor { + pub(crate) fn new( + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + sql_stmt_executor: SqlStatementExecutorRef, + ) -> Self { + Self { + catalog_manager, + query_engine, + sql_stmt_executor, + } + } + + pub(crate) async fn execute_stmt( + &self, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Result { + match stmt { + QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await, + QueryStatement::Promql(_) => self.plan_exec(stmt, query_ctx).await, + } + } + + async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { + match stmt { + Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => { + self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await + } + + // For performance consideration, only "insert with select" is executed by query engine. + // Plain insert ("insert with values") is still executed directly in statement. + Statement::Insert(ref insert) if insert.is_insert_select() => { + self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await + } + + Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await, + + Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await, + + Statement::Use(db) => self.handle_use(db, query_ctx), + + Statement::ShowDatabases(stmt) => self.show_databases(stmt), + + Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx), + + Statement::CreateDatabase(_) + | Statement::CreateTable(_) + | Statement::CreateExternalTable(_) + | Statement::Insert(_) + | Statement::Alter(_) + | Statement::DropTable(_) + | Statement::Copy(_) + | Statement::ShowCreateTable(_) => self + .sql_stmt_executor + .execute_sql(stmt, query_ctx) + .await + .context(ExecuteStatementSnafu), + } + } + + pub(crate) async fn plan_exec( + &self, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Result { + let planner = self.query_engine.planner(); + let plan = planner + .plan(stmt, query_ctx.clone()) + .await + .context(PlanStatementSnafu)?; + self.query_engine + .execute(plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu) + } + + fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { + let catalog = &query_ctx.current_catalog(); + ensure!( + self.catalog_manager + .schema(catalog, &db) + .context(CatalogSnafu)? + .is_some(), + SchemaNotFoundSnafu { schema_info: &db } + ); + + query_ctx.set_current_schema(&db); + + Ok(Output::RecordBatches(RecordBatches::empty())) + } +} diff --git a/src/frontend/src/statement/describe.rs b/src/frontend/src/statement/describe.rs new file mode 100644 index 0000000000..c98fa01ef5 --- /dev/null +++ b/src/frontend/src/statement/describe.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_error::prelude::BoxedError; +use common_query::Output; +use datanode::instance::sql::table_idents_to_full_name; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; +use sql::statements::describe::DescribeTable; + +use crate::error::{ + CatalogSnafu, DescribeStatementSnafu, ExternalSnafu, Result, TableNotFoundSnafu, +}; +use crate::statement::StatementExecutor; + +impl StatementExecutor { + pub(super) async fn describe_table( + &self, + stmt: DescribeTable, + query_ctx: QueryContextRef, + ) -> Result { + let (catalog, schema, table) = table_idents_to_full_name(stmt.name(), query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let table = self + .catalog_manager + .table(&catalog, &schema, &table) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: stmt.name().to_string(), + })?; + + query::sql::describe_table(table).context(DescribeStatementSnafu) + } +} diff --git a/src/frontend/src/statement/show.rs b/src/frontend/src/statement/show.rs new file mode 100644 index 0000000000..bfe4c9db0f --- /dev/null +++ b/src/frontend/src/statement/show.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::Output; +use session::context::QueryContextRef; +use snafu::ResultExt; +use sql::statements::show::{ShowDatabases, ShowTables}; + +use crate::error::{ExecuteStatementSnafu, Result}; +use crate::statement::StatementExecutor; + +impl StatementExecutor { + pub(super) fn show_databases(&self, stmt: ShowDatabases) -> Result { + query::sql::show_databases(stmt, self.catalog_manager.clone()) + .context(ExecuteStatementSnafu) + } + + pub(super) fn show_tables( + &self, + stmt: ShowTables, + query_ctx: QueryContextRef, + ) -> Result { + query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx) + .context(ExecuteStatementSnafu) + } +} diff --git a/src/frontend/src/statement/tql.rs b/src/frontend/src/statement/tql.rs new file mode 100644 index 0000000000..0f4a4f50a9 --- /dev/null +++ b/src/frontend/src/statement/tql.rs @@ -0,0 +1,55 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_query::Output; +use query::parser::{PromQuery, QueryLanguageParser}; +use session::context::QueryContextRef; +use snafu::ResultExt; +use sql::statements::tql::Tql; + +use crate::error::{ + ExecLogicalPlanSnafu, NotSupportedSnafu, ParseQuerySnafu, PlanStatementSnafu, Result, +}; +use crate::statement::StatementExecutor; + +impl StatementExecutor { + pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + let plan = match tql { + Tql::Eval(eval) => { + let promql = PromQuery { + start: eval.start, + end: eval.end, + step: eval.step, + query: eval.query, + }; + let stmt = QueryLanguageParser::parse_promql(&promql).context(ParseQuerySnafu)?; + self.query_engine + .planner() + .plan(stmt, query_ctx.clone()) + .await + .context(PlanStatementSnafu)? + } + Tql::Explain(_) => { + return NotSupportedSnafu { + feat: "TQL EXPLAIN", + } + .fail() + } + }; + self.query_engine + .execute(plan, query_ctx) + .await + .context(ExecLogicalPlanSnafu) + } +} diff --git a/src/frontend/src/tests/promql_test.rs b/src/frontend/src/tests/promql_test.rs index b26a24b870..935f8a893c 100644 --- a/src/frontend/src/tests/promql_test.rs +++ b/src/frontend/src/tests/promql_test.rs @@ -53,6 +53,7 @@ async fn create_insert_query_assert( eval_stmt.lookback_delta = lookback; let query_output = instance + .statement_executor() .plan_exec(QueryStatement::Promql(eval_stmt), QueryContext::arc()) .await .unwrap(); diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 3e212a816f..90c711f48c 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -27,25 +27,20 @@ use common_query::prelude::ScalarUdf; use common_query::Output; use datatypes::schema::Schema; use session::context::QueryContextRef; +use sql::statements::statement::Statement; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; -use crate::parser::QueryStatement; use crate::plan::LogicalPlan; use crate::planner::LogicalPlanner; pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; -pub type StatementHandlerRef = Arc; +pub type SqlStatementExecutorRef = Arc; -// TODO(LFC): Gradually make more statements executed in the form of logical plan, and remove this trait. Tracked in #1010. #[async_trait] -pub trait StatementHandler: Send + Sync { - async fn handle_statement( - &self, - stmt: QueryStatement, - query_ctx: QueryContextRef, - ) -> Result; +pub trait SqlStatementExecutor: Send + Sync { + async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result; } #[async_trait]