diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 07114ea2a6..aacb255d8c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -286,6 +286,9 @@ pub enum Error { #[snafu(display("Missing required field: {}", name))] MissingRequiredField { name: String, backtrace: Backtrace }, + + #[snafu(display("Cannot find requested database: {}-{}", catalog, schema))] + DatabaseNotFound { catalog: String, schema: String }, } pub type Result = std::result::Result; @@ -328,7 +331,8 @@ impl ErrorExt for Error { | Error::SchemaNotFound { .. } | Error::ConstraintNotSupported { .. } | Error::SchemaExists { .. } - | Error::ParseTimestamp { .. } => StatusCode::InvalidArguments, + | Error::ParseTimestamp { .. } + | Error::DatabaseNotFound { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. Error::StartServer { .. } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 3535f080c3..e55f3effc8 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -18,11 +18,10 @@ use api::v1::query_request::Query; use api::v1::{CreateDatabaseExpr, DdlRequest, GreptimeRequest, InsertRequest}; use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_error::prelude::BoxedError; use common_query::Output; use query::parser::QueryLanguageParser; use query::plan::LogicalPlan; -use servers::query_handler::GrpcQueryHandler; +use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use snafu::prelude::*; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; @@ -91,34 +90,28 @@ impl Instance { DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, } } +} - async fn handle_grpc_query(&self, query: GreptimeRequest) -> Result { +#[async_trait] +impl GrpcQueryHandler for Instance { + type Error = error::Error; + + async fn do_query(&self, query: GreptimeRequest) -> Result { let request = query.request.context(error::MissingRequiredFieldSnafu { name: "GreptimeRequest.request", })?; - let output = match request { - GrpcRequest::Insert(request) => self.handle_insert(request).await?, + match request { + GrpcRequest::Insert(request) => self.handle_insert(request).await, GrpcRequest::Query(query_request) => { let query = query_request .query .context(error::MissingRequiredFieldSnafu { name: "QueryRequest.query", })?; - self.handle_query(query).await? + self.handle_query(query).await } - GrpcRequest::Ddl(request) => self.handle_ddl(request).await?, - }; - Ok(output) - } -} - -#[async_trait] -impl GrpcQueryHandler for Instance { - async fn do_query(&self, query: GreptimeRequest) -> servers::error::Result { - self.handle_grpc_query(query) - .await - .map_err(BoxedError::new) - .context(servers::error::ExecuteGrpcQuerySnafu) + GrpcRequest::Ddl(request) => self.handle_ddl(request).await, + } } } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 57f58666fc..c9abf05ca3 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -14,13 +14,12 @@ use async_trait::async_trait; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; -use common_telemetry::logging::{error, info}; +use common_telemetry::logging::info; use common_telemetry::timer; use query::parser::{QueryLanguageParser, QueryStatement}; -use servers::query_handler::SqlQueryHandler; +use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::prelude::*; use sql::ast::ObjectName; @@ -139,16 +138,16 @@ impl Instance { QueryStatement::Sql(Statement::ShowCreateTable(_stmt)) => { unimplemented!("SHOW CREATE TABLE is unimplemented yet"); } - QueryStatement::Sql(Statement::Use(db)) => { + QueryStatement::Sql(Statement::Use(schema)) => { + let catalog = query_ctx.current_catalog(); + let catalog = catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + ensure!( - self.catalog_manager - .schema(DEFAULT_CATALOG_NAME, &db) - .context(error::CatalogSnafu)? - .is_some(), - error::SchemaNotFoundSnafu { name: &db } + self.is_valid_schema(catalog, &schema)?, + error::DatabaseNotFoundSnafu { catalog, schema } ); - query_ctx.set_current_schema(&db); + query_ctx.set_current_schema(&schema); Ok(Output::RecordBatches(RecordBatches::empty())) } @@ -199,21 +198,12 @@ fn table_idents_to_full_name( #[async_trait] impl SqlQueryHandler for Instance { - async fn do_query( - &self, - query: &str, - query_ctx: QueryContextRef, - ) -> Vec> { + type Error = error::Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); // we assume sql string has only 1 statement in datanode - let result = self - .execute_sql(query, query_ctx) - .await - .map_err(|e| { - error!(e; "Instance failed to execute sql"); - BoxedError::new(e) - }) - .context(servers::error::ExecuteQuerySnafu { query }); + let result = self.execute_sql(query, query_ctx).await; vec![result] } @@ -221,22 +211,17 @@ impl SqlQueryHandler for Instance { &self, stmt: Statement, query_ctx: QueryContextRef, - ) -> servers::error::Result { + ) -> Result { let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED); self.execute_stmt(QueryStatement::Sql(stmt), query_ctx) .await - .map_err(|e| { - error!(e; "Instance failed to execute sql"); - BoxedError::new(e) - }) - .context(servers::error::ExecuteStatementSnafu) } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> servers::error::Result { + fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager .schema(catalog, schema) .map(|s| s.is_some()) - .context(servers::error::CatalogSnafu) + .context(error::CatalogSnafu) } } diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index cdfec1f4ac..49da3c52e1 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -20,6 +20,8 @@ use common_runtime::Builder as RuntimeBuilder; use common_telemetry::tracing::log::info; use servers::grpc::GrpcServer; use servers::mysql::server::MysqlServer; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; +use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; use servers::Mode; use snafu::ResultExt; @@ -60,7 +62,7 @@ impl Services { .context(RuntimeResourceSnafu)?, ); Some(MysqlServer::create_server( - instance.clone(), + ServerSqlQueryHandlerAdaptor::arc(instance.clone()), mysql_io_runtime, Default::default(), None, @@ -69,7 +71,10 @@ impl Services { }; Ok(Self { - grpc_server: GrpcServer::new(instance, grpc_runtime), + grpc_server: GrpcServer::new( + ServerGrpcQueryHandlerAdaptor::arc(instance), + grpc_runtime, + ), mysql_server, }) } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index ef66a60163..327ca922fe 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -213,10 +213,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to insert values to table, source: {}", source))] - Insert { + #[snafu(display("Failed to create AlterExpr from Alter statement, source: {}", source))] + AlterExprFromStmt { #[snafu(backtrace)] - source: client::Error, + source: sql::error::Error, }, #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] @@ -318,10 +318,10 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Failed to convert Arrow schema, source: {}", source))] - ConvertArrowSchema { + #[snafu(display("{source}"))] + InvokeDatanode { #[snafu(backtrace)] - source: datatypes::error::Error, + source: datanode::error::Error, }, #[snafu(display("Missing meta_client_opts section in config"))] @@ -369,6 +369,15 @@ pub enum Error { source: table::metadata::TableMetaBuilderError, backtrace: Backtrace, }, + + #[snafu(display("Not supported: {}", feat))] + NotSupported { feat: String }, + + #[snafu(display("Failed to find new columns on insertion: {}", source))] + FindNewColumnsOnInsertion { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, } pub type Result = std::result::Result; @@ -385,17 +394,20 @@ impl ErrorExt for Error { | Error::ColumnValuesNumberMismatch { .. } | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, + Error::NotSupported { .. } => StatusCode::Unsupported, + Error::RuntimeResource { source, .. } => source.status_code(), Error::StartServer { source, .. } => source.status_code(), - Error::ParseSql { source } => source.status_code(), + Error::ParseSql { source } | Error::AlterExprFromStmt { source } => { + source.status_code() + } Error::Table { source } => source.status_code(), Error::ConvertColumnDefaultConstraint { source, .. } - | Error::ConvertScalarValue { source, .. } - | Error::ConvertArrowSchema { source } => source.status_code(), + | Error::ConvertScalarValue { source, .. } => source.status_code(), Error::RequestDatanode { source } => source.status_code(), @@ -431,9 +443,11 @@ impl ErrorExt for Error { } Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, - Error::Insert { source, .. } => source.status_code(), - Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(), - Error::ToTableInsertRequest { source, .. } => source.status_code(), + + Error::BuildCreateExprOnInsertion { source } + | Error::ToTableInsertRequest { source } + | Error::FindNewColumnsOnInsertion { source } => source.status_code(), + Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, Error::ExecuteStatement { source, .. } => source.status_code(), Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, @@ -442,6 +456,7 @@ impl ErrorExt for Error { Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), Error::BuildVector { source, .. } => source.status_code(), + Error::InvokeDatanode { source } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3f1353d859..812573e23e 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -17,6 +17,7 @@ mod grpc; mod influxdb; mod opentsdb; mod prometheus; +mod standalone; use std::sync::Arc; use std::time::Duration; @@ -31,7 +32,6 @@ use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_query::Output; use common_recordbatch::RecordBatches; @@ -42,10 +42,11 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use servers::error as server_error; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; +use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; +use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef}; use servers::query_handler::{ - GrpcQueryHandler, GrpcQueryHandlerRef, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, - PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler, - SqlQueryHandlerRef, + InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, + ScriptHandlerRef, }; use session::context::QueryContextRef; use snafu::prelude::*; @@ -55,16 +56,17 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{self, MissingMetasrvOptsSnafu, Result}; +use crate::error::{self, Error, MissingMetasrvOptsSnafu, Result}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; +use crate::instance::standalone::{StandaloneGrpcQueryHandler, StandaloneSqlQueryHandler}; use crate::table::route::TableRoutes; use crate::Plugins; #[async_trait] pub trait FrontendInstance: - GrpcQueryHandler - + SqlQueryHandler + GrpcQueryHandler + + SqlQueryHandler + OpentsdbProtocolHandler + InfluxdbLineProtocolHandler + PrometheusProtocolHandler @@ -84,8 +86,8 @@ pub struct Instance { /// Script handler is None in distributed mode, only works on standalone mode. script_handler: Option, - sql_handler: SqlQueryHandlerRef, - grpc_query_handler: GrpcQueryHandlerRef, + sql_handler: SqlQueryHandlerRef, + grpc_query_handler: GrpcQueryHandlerRef, create_expr_factory: CreateExprFactoryRef, @@ -158,8 +160,8 @@ impl Instance { catalog_manager: dn_instance.catalog_manager().clone(), script_handler: None, create_expr_factory: Arc::new(DefaultCreateExprFactory), - sql_handler: dn_instance.clone(), - grpc_query_handler: dn_instance.clone(), + sql_handler: StandaloneSqlQueryHandler::arc(dn_instance.clone()), + grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), plugins: Default::default(), } } @@ -189,10 +191,7 @@ impl Instance { } /// Handle batch inserts - pub async fn handle_inserts( - &self, - requests: Vec, - ) -> server_error::Result { + pub async fn handle_inserts(&self, requests: Vec) -> Result { let mut success = 0; for request in requests { match self.handle_insert(request).await? { @@ -203,7 +202,7 @@ impl Instance { Ok(Output::AffectedRows(success)) } - async fn handle_insert(&self, request: InsertRequest) -> server_error::Result { + async fn handle_insert(&self, request: InsertRequest) -> Result { let schema_name = &request.schema_name; let table_name = &request.table_name; let catalog_name = DEFAULT_CATALOG_NAME; @@ -228,11 +227,11 @@ impl Instance { schema_name: &str, table_name: &str, columns: &[Column], - ) -> server_error::Result<()> { + ) -> Result<()> { let table = self .catalog_manager .table(catalog_name, schema_name, table_name) - .context(server_error::CatalogSnafu)?; + .context(error::CatalogSnafu)?; match table { None => { info!( @@ -250,7 +249,7 @@ impl Instance { let schema = table.schema(); if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns) - .context(server_error::FindNewColumnsOnInsertionSnafu)? + .context(error::FindNewColumnsOnInsertionSnafu)? { info!( "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", @@ -280,14 +279,12 @@ impl Instance { schema_name: &str, table_name: &str, columns: &[Column], - ) -> server_error::Result { + ) -> Result { // Create table automatically, build schema from data. let create_expr = self .create_expr_factory .create_expr_by_columns(catalog_name, schema_name, table_name, columns) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteGrpcQuerySnafu)?; + .await?; info!( "Try to create table: {} automatically with request: {:?}", @@ -309,7 +306,7 @@ impl Instance { schema_name: &str, table_name: &str, add_columns: AddColumns, - ) -> server_error::Result { + ) -> Result { debug!( "Adding new columns: {:?} to table: {}", add_columns, table_name @@ -369,11 +366,7 @@ fn parse_stmt(sql: &str) -> Result> { } impl Instance { - async fn query_statement( - &self, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> server_error::Result { + async fn query_statement(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result { // TODO(sunng87): provide a better form to log or track statement let query = &format!("{:?}", &stmt); match stmt.clone() { @@ -388,9 +381,8 @@ impl Instance { return self.sql_handler.do_statement_query(stmt, query_ctx).await; } Statement::Alter(alter_stmt) => { - let expr = AlterExpr::try_from(alter_stmt) - .map_err(BoxedError::new) - .context(server_error::ExecuteAlterSnafu { query })?; + let expr = + AlterExpr::try_from(alter_stmt).context(error::AlterExprFromStmtSnafu)?; return self .grpc_query_handler .do_query(GreptimeRequest { @@ -415,32 +407,24 @@ impl Instance { }) .await; } - Statement::ShowCreateTable(_) => { - return server_error::NotSupportedSnafu { feat: query }.fail(); - } + Statement::ShowCreateTable(_) => error::NotSupportedSnafu { feat: query }.fail(), Statement::Use(db) => self.handle_use(db, query_ctx), } - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) } } #[async_trait] impl SqlQueryHandler for Instance { - async fn do_query( - &self, - query: &str, - query_ctx: QueryContextRef, - ) -> Vec> { - let query_interceptor = self.plugins.get::(); + type Error = Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + let query_interceptor = self.plugins.get::>(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { Ok(q) => q, Err(e) => return vec![Err(e)], }; match parse_stmt(query.as_ref()) - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) .and_then(|stmts| query_interceptor.post_parsing(stmts, query_ctx.clone())) { Ok(stmts) => { @@ -477,8 +461,8 @@ impl SqlQueryHandler for Instance { &self, stmt: Statement, query_ctx: QueryContextRef, - ) -> server_error::Result { - let query_interceptor = self.plugins.get::(); + ) -> Result { + let query_interceptor = self.plugins.get::>(); // TODO(sunng87): figure out at which stage we can call // this hook after ArrowFlight adoption. We need to provide @@ -489,11 +473,11 @@ impl SqlQueryHandler for Instance { .and_then(|output| query_interceptor.post_execute(output, query_ctx.clone())) } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result { + fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager .schema(catalog, schema) .map(|s| s.is_some()) - .context(server_error::CatalogSnafu) + .context(error::CatalogSnafu) } } @@ -636,11 +620,13 @@ mod tests { } impl SqlQueryInterceptor for AssertionHook { + type Error = Error; + fn pre_parsing<'a>( &self, query: &'a str, _query_ctx: QueryContextRef, - ) -> server_error::Result> { + ) -> Result> { self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); assert!(query.starts_with("CREATE TABLE demo")); Ok(Cow::Borrowed(query)) @@ -650,7 +636,7 @@ mod tests { &self, statements: Vec, _query_ctx: QueryContextRef, - ) -> server_error::Result> { + ) -> Result> { self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); assert!(matches!(statements[0], Statement::CreateTable(_))); Ok(statements) @@ -661,7 +647,7 @@ mod tests { _statement: &Statement, _plan: Option<&query::plan::LogicalPlan>, _query_ctx: QueryContextRef, - ) -> server_error::Result<()> { + ) -> Result<()> { self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); Ok(()) } @@ -670,7 +656,7 @@ mod tests { &self, mut output: Output, _query_ctx: QueryContextRef, - ) -> server_error::Result { + ) -> Result { self.c.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match &mut output { Output::AffectedRows(rows) => { @@ -689,7 +675,7 @@ mod tests { let mut plugins = Plugins::new(); let counter_hook = Arc::new(AssertionHook::default()); - plugins.insert::(counter_hook.clone()); + plugins.insert::>(counter_hook.clone()); Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); let sql = r#"CREATE TABLE demo( @@ -720,15 +706,17 @@ mod tests { struct DisableDBOpHook; impl SqlQueryInterceptor for DisableDBOpHook { + type Error = Error; + fn post_parsing( &self, statements: Vec, _query_ctx: QueryContextRef, - ) -> server_error::Result> { + ) -> Result> { for s in &statements { match s { Statement::CreateDatabase(_) | Statement::ShowDatabases(_) => { - return Err(server_error::Error::NotSupported { + return Err(Error::NotSupported { feat: "Database operations".to_owned(), }) } @@ -747,7 +735,7 @@ mod tests { let mut plugins = Plugins::new(); let hook = Arc::new(DisableDBOpHook::default()); - plugins.insert::(hook.clone()); + plugins.insert::>(hook.clone()); Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); let sql = r#"CREATE TABLE demo( @@ -774,7 +762,7 @@ mod tests { .await .remove(0) { - assert!(matches!(e, server_error::Error::NotSupported { .. })); + assert!(matches!(e, error::Error::NotSupported { .. })); } else { unreachable!(); } @@ -784,7 +772,7 @@ mod tests { .await .remove(0) { - assert!(matches!(e, server_error::Error::NotSupported { .. })); + assert!(matches!(e, error::Error::NotSupported { .. })); } else { unreachable!(); } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4e8c333a78..ecd6849692 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -25,7 +25,6 @@ use catalog::{CatalogList, CatalogManager}; use chrono::DateTime; use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_error::prelude::BoxedError; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; @@ -38,8 +37,7 @@ use meta_client::rpc::{ use query::parser::QueryStatement; use query::sql::{describe_table, explain, show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; -use servers::error as server_error; -use servers::query_handler::SqlQueryHandler; +use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::Value as SqlValue; @@ -372,37 +370,25 @@ impl DistInstance { #[async_trait] impl SqlQueryHandler for DistInstance { - async fn do_query( - &self, - query: &str, - query_ctx: QueryContextRef, - ) -> Vec> { - self.handle_sql(query, query_ctx) - .await - .into_iter() - .map(|r| { - r.map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query }) - }) - .collect() + type Error = error::Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + self.handle_sql(query, query_ctx).await } async fn do_statement_query( &self, stmt: Statement, query_ctx: QueryContextRef, - ) -> server_error::Result { - self.handle_statement(stmt, query_ctx) - .await - .map_err(BoxedError::new) - .context(server_error::ExecuteStatementSnafu) + ) -> Result { + self.handle_statement(stmt, query_ctx).await } - fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result { + fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager .schema(catalog, schema) .map(|s| s.is_some()) - .context(server_error::CatalogSnafu) + .context(CatalogSnafu) } } @@ -596,7 +582,7 @@ fn find_partition_columns( #[cfg(test)] mod test { use itertools::Itertools; - use servers::query_handler::SqlQueryHandlerRef; + use servers::query_handler::sql::SqlQueryHandlerRef; use session::context::QueryContext; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -604,6 +590,7 @@ mod test { use super::*; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; + use crate::instance::standalone::StandaloneSqlQueryHandler; #[tokio::test] async fn test_parse_partitions() { @@ -732,7 +719,7 @@ ENGINE=mito", .remove(0) .unwrap(); - async fn assert_show_tables(instance: SqlQueryHandlerRef) { + async fn assert_show_tables(instance: SqlQueryHandlerRef) { let sql = "show tables in test_show_tables"; let output = instance .do_query(sql, QueryContext::arc()) @@ -756,7 +743,7 @@ ENGINE=mito", // Asserts that new table is created in Datanode as well. for x in datanode_instances.values() { - assert_show_tables(x.clone()).await + assert_show_tables(StandaloneSqlQueryHandler::arc(x.clone())).await } } } diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs index 61e9b5eaca..a3e3eed9b2 100644 --- a/src/frontend/src/instance/distributed/grpc.rs +++ b/src/frontend/src/instance/distributed/grpc.rs @@ -16,21 +16,23 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::GreptimeRequest; use async_trait::async_trait; -use common_error::prelude::BoxedError; use common_query::Output; -use servers::query_handler::GrpcQueryHandler; -use snafu::{OptionExt, ResultExt}; +use servers::query_handler::grpc::GrpcQueryHandler; +use snafu::OptionExt; use crate::error::{self, Result}; use crate::instance::distributed::DistInstance; -impl DistInstance { - async fn handle_grpc_query(&self, query: GreptimeRequest) -> Result { +#[async_trait] +impl GrpcQueryHandler for DistInstance { + type Error = error::Error; + + async fn do_query(&self, query: GreptimeRequest) -> Result { let request = query.request.context(error::IncompleteGrpcResultSnafu { err_msg: "Missing 'request' in GreptimeRequest", })?; - let output = match request { - Request::Insert(request) => self.handle_dist_insert(request).await?, + match request { + Request::Insert(request) => self.handle_dist_insert(request).await, Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") } @@ -39,13 +41,13 @@ impl DistInstance { err_msg: "Missing 'expr' in DDL request", })?; match expr { - DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await?, + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await, DdlExpr::CreateTable(mut expr) => { // TODO(LFC): Support creating distributed table through GRPC interface. // Currently only SQL supports it; how to design the fields in CreateTableExpr? - self.create_table(&mut expr, None).await? + self.create_table(&mut expr, None).await } - DdlExpr::Alter(expr) => self.handle_alter_table(expr).await?, + DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, DdlExpr::DropTable(_) => { // TODO(LFC): Implement distributed drop table. // Seems the whole "drop table through GRPC interface" feature is not implemented? @@ -53,17 +55,6 @@ impl DistInstance { } } } - }; - Ok(output) - } -} - -#[async_trait] -impl GrpcQueryHandler for DistInstance { - async fn do_query(&self, query: GreptimeRequest) -> servers::error::Result { - self.handle_grpc_query(query) - .await - .map_err(BoxedError::new) - .context(servers::error::ExecuteGrpcQuerySnafu) + } } } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index f9d54d0ec1..4372820e62 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -17,26 +17,29 @@ use api::v1::query_request::Query; use api::v1::GreptimeRequest; use async_trait::async_trait; use common_query::Output; -use servers::error::{self, Result}; -use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use servers::query_handler::grpc::GrpcQueryHandler; +use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use snafu::{ensure, OptionExt}; +use crate::error::{self, Result}; use crate::instance::Instance; #[async_trait] impl GrpcQueryHandler for Instance { + type Error = error::Error; + async fn do_query(&self, query: GreptimeRequest) -> Result { - let request = query.request.context(error::GrpcRequestMissingFieldSnafu { - name: "GreptimeRequest.request", + let request = query.request.context(error::IncompleteGrpcResultSnafu { + err_msg: "Missing field 'GreptimeRequest.request'", })?; let output = match request { Request::Insert(request) => self.handle_insert(request).await?, Request::Query(query_request) => { let query = query_request .query - .context(error::GrpcRequestMissingFieldSnafu { - name: "QueryRequest.query", + .context(error::IncompleteGrpcResultSnafu { + err_msg: "Missing field 'QueryRequest.query'", })?; match query { Query::Sql(sql) => { diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index a295598562..c64593cae6 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -13,8 +13,10 @@ // limitations under the License. use async_trait::async_trait; +use common_error::prelude::BoxedError; use servers::influxdb::InfluxdbRequest; use servers::query_handler::InfluxdbLineProtocolHandler; +use snafu::ResultExt; use crate::instance::Instance; @@ -22,7 +24,10 @@ use crate::instance::Instance; impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { let requests = request.try_into()?; - self.handle_inserts(requests).await?; + self.handle_inserts(requests) + .await + .map_err(BoxedError::new) + .context(servers::error::ExecuteGrpcQuerySnafu)?; Ok(()) } } @@ -33,7 +38,7 @@ mod test { use common_query::Output; use common_recordbatch::RecordBatches; - use servers::query_handler::SqlQueryHandler; + use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use super::*; diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 32841365d6..a6519d2147 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -42,7 +42,7 @@ mod tests { use common_query::Output; use common_recordbatch::RecordBatches; use itertools::Itertools; - use servers::query_handler::SqlQueryHandler; + use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use super::*; diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 239269e85c..62c8b04db3 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -17,13 +17,15 @@ use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, Wri use api::v1::greptime_request::Request; use api::v1::{query_request, GreptimeRequest, QueryRequest}; use async_trait::async_trait; +use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; -use servers::query_handler::{GrpcQueryHandler, PrometheusProtocolHandler, PrometheusResponse}; +use servers::query_handler::grpc::GrpcQueryHandler; +use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use snafu::{OptionExt, ResultExt}; use crate::instance::Instance; @@ -90,7 +92,11 @@ impl Instance { query: Some(query_request::Query::Sql(sql.to_string())), })), }; - let output = self.do_query(query).await?; + let output = self + .do_query(query) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; results.push((table_name, output)); } @@ -102,7 +108,10 @@ impl Instance { impl PrometheusProtocolHandler for Instance { async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { let requests = prometheus::to_grpc_insert_requests(database, request.clone())?; - self.handle_inserts(requests).await?; + self.handle_inserts(requests) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; Ok(()) } @@ -150,7 +159,7 @@ mod tests { use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, LabelMatcher, Sample}; - use servers::query_handler::SqlQueryHandler; + use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContext; use super::*; diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs new file mode 100644 index 0000000000..f088595777 --- /dev/null +++ b/src/frontend/src/instance/standalone.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::GreptimeRequest; +use async_trait::async_trait; +use common_query::Output; +use datanode::error::Error as DatanodeError; +use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; +use servers::query_handler::sql::{SqlQueryHandler, SqlQueryHandlerRef}; +use session::context::QueryContextRef; +use snafu::ResultExt; +use sql::statements::statement::Statement; + +use crate::error::{self, Result}; + +pub(crate) struct StandaloneSqlQueryHandler(SqlQueryHandlerRef); + +impl StandaloneSqlQueryHandler { + pub(crate) fn arc(handler: SqlQueryHandlerRef) -> Arc { + Arc::new(Self(handler)) + } +} + +#[async_trait] +impl SqlQueryHandler for StandaloneSqlQueryHandler { + type Error = error::Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + self.0 + .do_query(query, query_ctx) + .await + .into_iter() + .map(|x| x.context(error::InvokeDatanodeSnafu)) + .collect() + } + + async fn do_statement_query( + &self, + stmt: Statement, + query_ctx: QueryContextRef, + ) -> Result { + self.0 + .do_statement_query(stmt, query_ctx) + .await + .context(error::InvokeDatanodeSnafu) + } + + fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + self.0 + .is_valid_schema(catalog, schema) + .context(error::InvokeDatanodeSnafu) + } +} + +pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef); + +impl StandaloneGrpcQueryHandler { + pub(crate) fn arc(handler: GrpcQueryHandlerRef) -> Arc { + Arc::new(Self(handler)) + } +} + +#[async_trait] +impl GrpcQueryHandler for StandaloneGrpcQueryHandler { + type Error = error::Error; + + async fn do_query(&self, query: GreptimeRequest) -> Result { + self.0 + .do_query(query) + .await + .context(error::InvokeDatanodeSnafu) + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index a8414594d0..d71c5adb44 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -23,6 +23,8 @@ use servers::http::HttpServer; use servers::mysql::server::MysqlServer; use servers::opentsdb::OpentsdbServer; use servers::postgres::PostgresServer; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; +use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; use snafu::ResultExt; use tokio::try_join; @@ -56,7 +58,10 @@ impl Services { .context(error::RuntimeResourceSnafu)?, ); - let grpc_server = GrpcServer::new(instance.clone(), grpc_runtime); + let grpc_server = GrpcServer::new( + ServerGrpcQueryHandlerAdaptor::arc(instance.clone()), + grpc_runtime, + ); Some((Box::new(grpc_server) as _, grpc_addr)) } else { @@ -75,7 +80,7 @@ impl Services { ); let mysql_server = MysqlServer::create_server( - instance.clone(), + ServerSqlQueryHandlerAdaptor::arc(instance.clone()), mysql_io_runtime, opts.tls.clone(), user_provider.clone(), @@ -98,7 +103,7 @@ impl Services { ); let pg_server = Box::new(PostgresServer::new( - instance.clone(), + ServerSqlQueryHandlerAdaptor::arc(instance.clone()), opts.tls.clone(), pg_io_runtime, user_provider.clone(), @@ -130,7 +135,10 @@ impl Services { let http_server_and_addr = if let Some(http_options) = &opts.http_options { let http_addr = parse_addr(&http_options.addr)?; - let mut http_server = HttpServer::new(instance.clone(), http_options.clone()); + let mut http_server = HttpServer::new( + ServerSqlQueryHandlerAdaptor::arc(instance.clone()), + http_options.clone(), + ); if let Some(user_provider) = user_provider { http_server.set_user_provider(user_provider); } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 80650d25e2..799d60aa51 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -29,6 +29,7 @@ use meta_srv::mocks::MockInfo; use meta_srv::service::store::kv::KvStoreRef; use meta_srv::service::store::memory::MemStore; use servers::grpc::GrpcServer; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::Mode; use tempdir::TempDir; use tonic::transport::Server; @@ -110,7 +111,11 @@ pub(crate) async fn create_datanode_client( // create a mock datanode grpc service, see example here: // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs - let datanode_service = GrpcServer::new(datanode_instance, runtime).create_service(); + let datanode_service = GrpcServer::new( + ServerGrpcQueryHandlerAdaptor::arc(datanode_instance), + runtime, + ) + .create_service(); tokio::spawn(async move { Server::builder() .add_service(datanode_service) diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 95ea2e263c..e2a79b9763 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -82,7 +82,7 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to execute GRPC query, source: {}", source))] + #[snafu(display("{source}"))] ExecuteGrpcQuery { #[snafu(backtrace)] source: BoxedError, @@ -94,9 +94,8 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to execute insert: {}, source: {}", msg, source))] - ExecuteInsert { - msg: String, + #[snafu(display("Failed to check database validity, source: {}", source))] + CheckDatabaseValidity { #[snafu(backtrace)] source: BoxedError, }, @@ -258,15 +257,6 @@ pub enum Error { #[snafu(display("Cannot find requested database: {}-{}", catalog, schema))] DatabaseNotFound { catalog: String, schema: String }, - - #[snafu(display("Failed to find new columns on insertion: {}", source))] - FindNewColumnsOnInsertion { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - - #[snafu(display("GRPC request missing field: {}", name))] - GrpcRequestMissingField { name: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -293,7 +283,7 @@ impl ErrorExt for Error { | ExecuteQuery { source, .. } | ExecuteGrpcQuery { source, .. } | ExecuteStatement { source, .. } - | ExecuteInsert { source, .. } + | CheckDatabaseValidity { source, .. } | ExecuteAlter { source, .. } | PutOpentsdbDataPoint { source, .. } => source.status_code(), @@ -307,7 +297,6 @@ impl ErrorExt for Error { | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } | InvalidFlightTicket { .. } - | GrpcRequestMissingField { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | ConvertFlightMessage { source } => { @@ -327,8 +316,6 @@ impl ErrorExt for Error { | InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader, DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, - - FindNewColumnsOnInsertion { source } => source.status_code(), } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index acbbf5fdcd..4eee7617ca 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -30,17 +30,17 @@ use tokio_stream::wrappers::TcpListenerStream; use crate::error::{AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; use crate::grpc::flight::FlightHandler; -use crate::query_handler::GrpcQueryHandlerRef; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::server::Server; pub struct GrpcServer { - query_handler: GrpcQueryHandlerRef, + query_handler: ServerGrpcQueryHandlerRef, shutdown_tx: Mutex>>, runtime: Arc, } impl GrpcServer { - pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { + pub fn new(query_handler: ServerGrpcQueryHandlerRef, runtime: Arc) -> Self { Self { query_handler, shutdown_tx: Mutex::new(None), diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index e4ab590bd4..e9f05c5179 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -35,18 +35,18 @@ use tonic::{Request, Response, Status, Streaming}; use crate::error; use crate::grpc::flight::stream::FlightRecordBatchStream; -use crate::query_handler::GrpcQueryHandlerRef; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; type TonicResult = Result; type TonicStream = Pin> + Send + Sync + 'static>>; pub(crate) struct FlightHandler { - handler: GrpcQueryHandlerRef, + handler: ServerGrpcQueryHandlerRef, runtime: Arc, } impl FlightHandler { - pub(crate) fn new(handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { + pub(crate) fn new(handler: ServerGrpcQueryHandlerRef, runtime: Arc) -> Self { Self { handler, runtime } } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 42dc29eb1a..0ac7fb9c62 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -54,21 +54,21 @@ use self::authorize::HttpAuth; use self::influxdb::influxdb_write; use crate::auth::UserProviderRef; use crate::error::{AlreadyStartedSnafu, Result, StartHttpSnafu}; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, - ScriptHandlerRef, SqlQueryHandlerRef, + ScriptHandlerRef, }; use crate::server::Server; /// create query context from database name information, catalog and schema are /// resolved from the name pub(crate) fn query_context_from_db( - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, db: Option, ) -> std::result::Result, JsonResponse> { if let Some(db) = &db { let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); - let catalog = catalog.unwrap_or(DEFAULT_CATALOG_NAME); match query_handler.is_valid_schema(catalog, schema) { Ok(true) => Ok(Arc::new(QueryContext::with( @@ -96,7 +96,7 @@ const HTTP_API_VERSION: &str = "v1"; const HTTP_API_PREFIX: &str = "/v1/"; pub struct HttpServer { - sql_handler: SqlQueryHandlerRef, + sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions, influxdb_handler: Option, opentsdb_handler: Option, @@ -345,12 +345,12 @@ async fn serve_docs() -> Html { #[derive(Clone)] pub struct ApiState { - pub sql_handler: SqlQueryHandlerRef, + pub sql_handler: ServerSqlQueryHandlerRef, pub script_handler: Option, } impl HttpServer { - pub fn new(sql_handler: SqlQueryHandlerRef, options: HttpOptions) -> Self { + pub fn new(sql_handler: ServerSqlQueryHandlerRef, options: HttpOptions) -> Self { Self { sql_handler, options, @@ -569,7 +569,8 @@ mod test { use tokio::sync::mpsc; use super::*; - use crate::query_handler::SqlQueryHandler; + use crate::error::Error; + use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, @@ -577,6 +578,8 @@ mod test { #[async_trait] impl SqlQueryHandler for DummyInstance { + type Error = Error; + async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -604,6 +607,7 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); + let instance = ServerSqlQueryHandlerAdaptor::arc(instance); let server = HttpServer::new(instance, HttpOptions::default()); server.make_app().route( "/test/timeout", diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index ccc4f468f7..fa1cf83862 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -15,19 +15,24 @@ use std::borrow::Cow; use std::sync::Arc; +use common_error::prelude::ErrorExt; use common_query::Output; use query::plan::LogicalPlan; use session::context::QueryContextRef; use sql::statements::statement::Statement; -use crate::error::Result; - /// SqlQueryInterceptor can track life cycle of a sql query and customize or /// abort its execution at given point. pub trait SqlQueryInterceptor { + type Error: ErrorExt; + /// Called before a query string is parsed into sql statements. /// The implementation is allowed to change the sql string if needed. - fn pre_parsing<'a>(&self, query: &'a str, _query_ctx: QueryContextRef) -> Result> { + fn pre_parsing<'a>( + &self, + query: &'a str, + _query_ctx: QueryContextRef, + ) -> Result, Self::Error> { Ok(Cow::Borrowed(query)) } @@ -38,7 +43,7 @@ pub trait SqlQueryInterceptor { &self, statements: Vec, _query_ctx: QueryContextRef, - ) -> Result> { + ) -> Result, Self::Error> { Ok(statements) } @@ -48,21 +53,35 @@ pub trait SqlQueryInterceptor { _statement: &Statement, _plan: Option<&LogicalPlan>, _query_ctx: QueryContextRef, - ) -> Result<()> { + ) -> Result<(), Self::Error> { Ok(()) } /// Called after execution finished. The implementation can modify the /// output if needed. - fn post_execute(&self, output: Output, _query_ctx: QueryContextRef) -> Result { + fn post_execute( + &self, + output: Output, + _query_ctx: QueryContextRef, + ) -> Result { Ok(output) } } -pub type SqlQueryInterceptorRef = Arc; +pub type SqlQueryInterceptorRef = + Arc + Send + Sync + 'static>; -impl SqlQueryInterceptor for Option<&SqlQueryInterceptorRef> { - fn pre_parsing<'a>(&self, query: &'a str, query_ctx: QueryContextRef) -> Result> { +impl SqlQueryInterceptor for Option<&SqlQueryInterceptorRef> +where + E: ErrorExt, +{ + type Error = E; + + fn pre_parsing<'a>( + &self, + query: &'a str, + query_ctx: QueryContextRef, + ) -> Result, Self::Error> { if let Some(this) = self { this.pre_parsing(query, query_ctx) } else { @@ -74,7 +93,7 @@ impl SqlQueryInterceptor for Option<&SqlQueryInterceptorRef> { &self, statements: Vec, query_ctx: QueryContextRef, - ) -> Result> { + ) -> Result, Self::Error> { if let Some(this) = self { this.post_parsing(statements, query_ctx) } else { @@ -87,7 +106,7 @@ impl SqlQueryInterceptor for Option<&SqlQueryInterceptorRef> { statement: &Statement, plan: Option<&LogicalPlan>, query_ctx: QueryContextRef, - ) -> Result<()> { + ) -> Result<(), Self::Error> { if let Some(this) = self { this.pre_execute(statement, plan, query_ctx) } else { @@ -95,7 +114,11 @@ impl SqlQueryInterceptor for Option<&SqlQueryInterceptorRef> { } } - fn post_execute(&self, output: Output, query_ctx: QueryContextRef) -> Result { + fn post_execute( + &self, + output: Output, + query_ctx: QueryContextRef, + ) -> Result { if let Some(this) = self { this.post_execute(output, query_ctx) } else { diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index d34cd73951..6233ef1e5c 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -14,6 +14,7 @@ #![feature(assert_matches)] +use common_catalog::consts::DEFAULT_CATALOG_NAME; use serde::{Deserialize, Serialize}; pub mod auth; @@ -56,33 +57,34 @@ pub enum Mode { /// schema name /// - if `[-]` is provided, we split database name with `-` and use /// `` and ``. -pub(crate) fn parse_catalog_and_schema_from_client_database_name(db: &str) -> (Option<&str>, &str) { +pub(crate) fn parse_catalog_and_schema_from_client_database_name(db: &str) -> (&str, &str) { let parts = db.splitn(2, '-').collect::>(); if parts.len() == 2 { - (Some(parts[0]), parts[1]) + (parts[0], parts[1]) } else { - (None, db) + (DEFAULT_CATALOG_NAME, db) } } #[cfg(test)] mod tests { + use super::*; #[test] - fn test_parse_catalog_and_schema_from_client_database_name() { + fn test_parse_catalog_and_schema() { assert_eq!( - (None, "fullschema"), - super::parse_catalog_and_schema_from_client_database_name("fullschema") + (DEFAULT_CATALOG_NAME, "fullschema"), + parse_catalog_and_schema_from_client_database_name("fullschema") ); assert_eq!( - (Some("catalog"), "schema"), - super::parse_catalog_and_schema_from_client_database_name("catalog-schema") + ("catalog", "schema"), + parse_catalog_and_schema_from_client_database_name("catalog-schema") ); assert_eq!( - (Some("catalog"), "schema1-schema2"), - super::parse_catalog_and_schema_from_client_database_name("catalog-schema1-schema2") + ("catalog", "schema1-schema2"), + parse_catalog_and_schema_from_client_database_name("catalog-schema1-schema2") ); } } diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 0e9b3a1b2b..eb367d7c1e 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::Output; use common_telemetry::{error, trace}; use opensrv_mysql::{ @@ -26,16 +25,17 @@ use opensrv_mysql::{ use rand::RngCore; use session::context::Channel; use session::Session; +use snafu::ensure; use tokio::io::AsyncWrite; use crate::auth::{Identity, Password, UserProviderRef}; use crate::error::{self, Result}; use crate::mysql::writer::MysqlResultWriter; -use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; // An intermediate shim for executing MySQL queries. pub struct MysqlInstanceShim { - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, salt: [u8; 20], session: Arc, user_provider: Option, @@ -43,7 +43,7 @@ pub struct MysqlInstanceShim { impl MysqlInstanceShim { pub fn create( - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, client_addr: SocketAddr, user_provider: Option, ) -> MysqlInstanceShim { @@ -184,21 +184,16 @@ impl AsyncMysqlShim for MysqlInstanceShi } async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> { - // TODO(sunng87): set catalog - if self - .query_handler - .is_valid_schema(DEFAULT_CATALOG_NAME, database)? - { - let context = self.session.context(); - // TODO(sunng87): set catalog - context.set_current_schema(database); - w.ok().await.map_err(|e| e.into()) - } else { - error::DatabaseNotFoundSnafu { - catalog: DEFAULT_CATALOG_NAME, - schema: database, - } - .fail() - } + let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(database); + ensure!( + self.query_handler.is_valid_schema(catalog, schema)?, + error::DatabaseNotFoundSnafu { catalog, schema } + ); + + let context = self.session.context(); + context.set_current_catalog(catalog); + context.set_current_schema(database); + + w.ok().await.map_err(|e| e.into()) } } diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index e8df8583ae..49c4d67c8b 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -31,7 +31,7 @@ use tokio_rustls::rustls::ServerConfig; use crate::auth::UserProviderRef; use crate::error::{Error, Result}; use crate::mysql::handler::MysqlInstanceShim; -use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::server::{AbortableStream, BaseTcpServer, Server}; use crate::tls::TlsOption; @@ -40,14 +40,14 @@ const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024; pub struct MysqlServer { base_server: BaseTcpServer, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, tls: TlsOption, user_provider: Option, } impl MysqlServer { pub fn create_server( - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, io_runtime: Arc, tls: TlsOption, user_provider: Option, @@ -102,7 +102,7 @@ impl MysqlServer { async fn handle( stream: TcpStream, io_runtime: Arc, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, tls_conf: Option>, force_tls: bool, user_provider: Option, @@ -122,7 +122,7 @@ impl MysqlServer { async fn do_handle( stream: TcpStream, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, tls_conf: Option>, force_tls: bool, user_provider: Option, diff --git a/src/servers/src/postgres/auth_handler.rs b/src/servers/src/postgres/auth_handler.rs index 8259ed662e..dd7315f9d2 100644 --- a/src/servers/src/postgres/auth_handler.rs +++ b/src/servers/src/postgres/auth_handler.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::fmt::Debug; use async_trait::async_trait; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use futures::{Sink, SinkExt}; use pgwire::api::auth::{ServerParameterProvider, StartupHandler}; use pgwire::api::{auth, ClientInfo, PgWireConnectionState}; @@ -29,7 +28,7 @@ use snafu::ResultExt; use crate::auth::{Identity, Password, UserProviderRef}; use crate::error; use crate::error::Result; -use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; struct PgPwdVerifier { user_provider: Option, @@ -110,14 +109,14 @@ pub struct PgAuthStartupHandler { verifier: PgPwdVerifier, param_provider: GreptimeDBStartupParameters, force_tls: bool, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, } impl PgAuthStartupHandler { pub fn new( user_provider: Option, force_tls: bool, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, ) -> Self { PgAuthStartupHandler { verifier: PgPwdVerifier { user_provider }, @@ -219,7 +218,7 @@ enum DbResolution { /// A function extracted to resolve lifetime and readability issues: fn resolve_db_info( client: &mut C, - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, ) -> PgWireResult where C: ClientInfo + Unpin + Send, @@ -227,7 +226,6 @@ where let db_ref = client.metadata().get(super::METADATA_DATABASE); if let Some(db) = db_ref { let (catalog, schema) = crate::parse_catalog_and_schema_from_client_database_name(db); - let catalog = catalog.unwrap_or(DEFAULT_CATALOG_NAME); if query_handler .is_valid_schema(catalog, schema) .map_err(|e| PgWireError::ApiError(Box::new(e)))? diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 5d65bac618..dcc18f8281 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -30,14 +30,14 @@ use pgwire::error::{ErrorInfo, PgWireError, PgWireResult}; use session::context::QueryContext; use crate::error::{self, Error, Result}; -use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; pub struct PostgresServerHandler { - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, } impl PostgresServerHandler { - pub fn new(query_handler: SqlQueryHandlerRef) -> Self { + pub fn new(query_handler: ServerSqlQueryHandlerRef) -> Self { PostgresServerHandler { query_handler } } } diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index 1859c5d896..800f32a474 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -29,7 +29,7 @@ use crate::auth::UserProviderRef; use crate::error::Result; use crate::postgres::auth_handler::PgAuthStartupHandler; use crate::postgres::handler::PostgresServerHandler; -use crate::query_handler::SqlQueryHandlerRef; +use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::server::{AbortableStream, BaseTcpServer, Server}; use crate::tls::TlsOption; @@ -43,7 +43,7 @@ pub struct PostgresServer { impl PostgresServer { /// Creates a new Postgres server with provided query_handler and async runtime pub fn new( - query_handler: SqlQueryHandlerRef, + query_handler: ServerSqlQueryHandlerRef, tls: TlsOption, io_runtime: Arc, user_provider: Option, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index aed712ab52..c678d7885c 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod grpc; +pub mod sql; + use std::sync::Arc; use api::prometheus::remote::{ReadRequest, WriteRequest}; -use api::v1::GreptimeRequest; use async_trait::async_trait; use common_query::Output; -use session::context::QueryContextRef; -use sql::statements::statement::Statement; use crate::error::Result; use crate::influxdb::InfluxdbRequest; @@ -36,38 +36,17 @@ use crate::prometheus::Metrics; /// used as some kind of "convention", it's the "Q" in "SQL". So we might better stick to the /// word "query". -pub type SqlQueryHandlerRef = Arc; -pub type GrpcQueryHandlerRef = Arc; pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PrometheusProtocolHandlerRef = Arc; pub type ScriptHandlerRef = Arc; -#[async_trait] -pub trait SqlQueryHandler { - async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec>; - - async fn do_statement_query( - &self, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> Result; - - /// check if schema is valid - fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result; -} - #[async_trait] pub trait ScriptHandler { async fn insert_script(&self, name: &str, script: &str) -> Result<()>; async fn execute_script(&self, name: &str) -> Result; } -#[async_trait] -pub trait GrpcQueryHandler { - async fn do_query(&self, query: GreptimeRequest) -> Result; -} - #[async_trait] pub trait InfluxdbLineProtocolHandler { /// A successful request will not return a response. diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs new file mode 100644 index 0000000000..d40d277d82 --- /dev/null +++ b/src/servers/src/query_handler/grpc.rs @@ -0,0 +1,56 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::GreptimeRequest; +use async_trait::async_trait; +use common_error::prelude::*; +use common_query::Output; + +use crate::error::{self, Result}; + +pub type GrpcQueryHandlerRef = Arc + Send + Sync>; +pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef; + +#[async_trait] +pub trait GrpcQueryHandler { + type Error: ErrorExt; + + async fn do_query(&self, query: GreptimeRequest) -> std::result::Result; +} + +pub struct ServerGrpcQueryHandlerAdaptor(GrpcQueryHandlerRef); + +impl ServerGrpcQueryHandlerAdaptor { + pub fn arc(handler: GrpcQueryHandlerRef) -> Arc { + Arc::new(Self(handler)) + } +} + +#[async_trait] +impl GrpcQueryHandler for ServerGrpcQueryHandlerAdaptor +where + E: ErrorExt + Send + Sync + 'static, +{ + type Error = error::Error; + + async fn do_query(&self, query: GreptimeRequest) -> Result { + self.0 + .do_query(query) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } +} diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs new file mode 100644 index 0000000000..0b82ae134f --- /dev/null +++ b/src/servers/src/query_handler/sql.rs @@ -0,0 +1,96 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::prelude::*; +use common_query::Output; +use session::context::QueryContextRef; +use sql::statements::statement::Statement; + +use crate::error::{self, Result}; + +pub type SqlQueryHandlerRef = Arc + Send + Sync>; +pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef; + +#[async_trait] +pub trait SqlQueryHandler { + type Error: ErrorExt; + + async fn do_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec>; + + async fn do_statement_query( + &self, + stmt: Statement, + query_ctx: QueryContextRef, + ) -> std::result::Result; + + fn is_valid_schema( + &self, + catalog: &str, + schema: &str, + ) -> std::result::Result; +} + +pub struct ServerSqlQueryHandlerAdaptor(SqlQueryHandlerRef); + +impl ServerSqlQueryHandlerAdaptor { + pub fn arc(handler: SqlQueryHandlerRef) -> Arc { + Arc::new(Self(handler)) + } +} + +#[async_trait] +impl SqlQueryHandler for ServerSqlQueryHandlerAdaptor +where + E: ErrorExt + Send + Sync + 'static, +{ + type Error = error::Error; + + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + self.0 + .do_query(query, query_ctx) + .await + .into_iter() + .map(|x| { + x.map_err(BoxedError::new) + .context(error::ExecuteQuerySnafu { query }) + }) + .collect() + } + + async fn do_statement_query( + &self, + stmt: Statement, + query_ctx: QueryContextRef, + ) -> Result { + self.0 + .do_statement_query(stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteStatementSnafu) + } + + fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + self.0 + .is_valid_schema(catalog, schema) + .map_err(BoxedError::new) + .context(error::CheckDatabaseValiditySnafu) + } +} diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index e795eebcaf..06e73f68bb 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -20,10 +20,11 @@ use axum::{http, Router}; use axum_test_helper::TestClient; use common_query::Output; use servers::auth::user_provider::StaticUserProvider; -use servers::error::Result; +use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::influxdb::InfluxdbRequest; -use servers::query_handler::{InfluxdbLineProtocolHandler, SqlQueryHandler}; +use servers::query_handler::sql::SqlQueryHandler; +use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; @@ -46,6 +47,8 @@ impl InfluxdbLineProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { + type Error = Error; + async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 84e212341e..32954def9d 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -21,7 +21,8 @@ use common_query::Output; use servers::error::{self, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::opentsdb::codec::DataPoint; -use servers::query_handler::{OpentsdbProtocolHandler, SqlQueryHandler}; +use servers::query_handler::sql::SqlQueryHandler; +use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use tokio::sync::mpsc; @@ -45,6 +46,8 @@ impl OpentsdbProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { + type Error = error::Error; + async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 0a2fe1c873..a57b70136d 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -22,11 +22,12 @@ use axum::Router; use axum_test_helper::TestClient; use common_query::Output; use prost::Message; -use servers::error::Result; +use servers::error::{Error, Result}; use servers::http::{HttpOptions, HttpServer}; use servers::prometheus; use servers::prometheus::{snappy_compress, Metrics}; -use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQueryHandler}; +use servers::query_handler::sql::SqlQueryHandler; +use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use session::context::QueryContextRef; use tokio::sync::mpsc; @@ -70,6 +71,8 @@ impl PrometheusProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { + type Error = Error; + async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 3a072568a4..593fa89207 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -15,13 +15,15 @@ use std::borrow::Cow; use std::sync::Arc; -use servers::error::Result; +use servers::error::{self, Result}; use servers::interceptor::SqlQueryInterceptor; use session::context::{QueryContext, QueryContextRef}; pub struct NoopInterceptor; impl SqlQueryInterceptor for NoopInterceptor { + type Error = error::Error; + fn pre_parsing<'a>(&self, query: &'a str, _query_ctx: QueryContextRef) -> Result> { let modified_query = format!("{query};"); Ok(Cow::Owned(modified_query)) diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 484ff95b5e..498940cbcb 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -22,16 +22,15 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use query::parser::QueryLanguageParser; use query::{QueryEngineFactory, QueryEngineRef}; -use servers::error::Result; -use servers::query_handler::{ - ScriptHandler, ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef, -}; +use servers::error::{Error, Result}; +use servers::query_handler::{ScriptHandler, ScriptHandlerRef}; use table::test_util::MemTable; mod http; mod mysql; use script::engine::{CompileContext, EvalContext, Script, ScriptEngine}; use script::python::{PyEngine, PyScript}; +use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler}; use session::context::QueryContextRef; mod interceptor; @@ -56,6 +55,8 @@ impl DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { + type Error = Error; + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { let stmt = QueryLanguageParser::parse_sql(query).unwrap(); let plan = self @@ -126,6 +127,6 @@ fn create_testing_script_handler(table: MemTable) -> ScriptHandlerRef { Arc::new(create_testing_instance(table)) as _ } -fn create_testing_sql_query_handler(table: MemTable) -> SqlQueryHandlerRef { +fn create_testing_sql_query_handler(table: MemTable) -> ServerSqlQueryHandlerRef { Arc::new(create_testing_instance(table)) as _ } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index c16988585f..a6353628a7 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -29,7 +29,7 @@ use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; -use frontend::instance::{FrontendInstance, Instance as FeInstance}; +use frontend::instance::Instance as FeInstance; use object_store::backend::s3; use object_store::test_util::TempFolder; use object_store::ObjectStore; @@ -37,12 +37,15 @@ use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServer}; +use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; +use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; use servers::server::Server; use servers::Mode; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; use tempdir::TempDir; + static PORTS: OnceCell = OnceCell::new(); fn get_port() -> usize { @@ -234,7 +237,10 @@ pub async fn setup_test_app(store_type: StorageType, name: &str) -> (Router, Tes ) .await .unwrap(); - let http_server = HttpServer::new(instance, HttpOptions::default()); + let http_server = HttpServer::new( + ServerSqlQueryHandlerAdaptor::arc(instance), + HttpOptions::default(), + ); (http_server.make_app(), guard) } @@ -244,7 +250,7 @@ pub async fn setup_test_app_with_frontend( ) -> (Router, TestGuard) { let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); let instance = Arc::new(Instance::with_mock_meta_client(&opts).await.unwrap()); - let mut frontend = build_frontend_instance(instance.clone()).await; + let frontend = build_frontend_instance(instance.clone()).await; instance.start().await.unwrap(); create_test_table( frontend.catalog_manager(), @@ -253,8 +259,10 @@ pub async fn setup_test_app_with_frontend( ) .await .unwrap(); - frontend.start().await.unwrap(); - let mut http_server = HttpServer::new(Arc::new(frontend), HttpOptions::default()); + let mut http_server = HttpServer::new( + ServerSqlQueryHandlerAdaptor::arc(Arc::new(frontend)), + HttpOptions::default(), + ); http_server.set_script_handler(instance.clone()); let app = http_server.make_app(); (app, guard) @@ -282,7 +290,10 @@ pub async fn setup_grpc_server( let fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); let fe_instance_ref = Arc::new(fe_instance); - let fe_grpc_server = Arc::new(GrpcServer::new(fe_instance_ref, runtime)); + let fe_grpc_server = Arc::new(GrpcServer::new( + ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref), + runtime, + )); let grpc_server_clone = fe_grpc_server.clone(); let fe_grpc_addr_clone = fe_grpc_addr.clone();