From 38b5df574c040a1b124362a2f5f3c802097b49c1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 30 Jan 2026 15:19:45 +0800 Subject: [PATCH] refactor: SQL and gRPC server handlers (#7637) Signed-off-by: Ruihang Xia --- docs/how-to/how-to-implement-sql-statement.md | 8 +- src/flow/src/batching_mode/frontend_client.rs | 13 +- src/frontend/src/instance.rs | 76 ++- src/frontend/src/instance/grpc.rs | 576 +++++++++--------- src/frontend/src/server.rs | 13 +- src/servers/src/http.rs | 16 +- src/servers/src/query_handler/grpc.rs | 73 +-- src/servers/src/query_handler/sql.rs | 100 +-- src/servers/tests/http/influxdb_test.rs | 12 +- src/servers/tests/http/opentsdb_test.rs | 10 +- src/servers/tests/http/prom_store_test.rs | 12 +- src/servers/tests/mod.rs | 24 +- tests-integration/src/grpc/flight.rs | 3 +- tests-integration/src/instance.rs | 8 +- tests-integration/src/test_util.rs | 22 +- tests-integration/src/tests/instance_test.rs | 66 +- tests-integration/src/tests/test_util.rs | 8 +- tests-integration/tests/region_migration.rs | 6 +- tests-integration/tests/repartition.rs | 4 +- 19 files changed, 487 insertions(+), 563 deletions(-) diff --git a/docs/how-to/how-to-implement-sql-statement.md b/docs/how-to/how-to-implement-sql-statement.md index dc2728e8bc..f0978812d0 100644 --- a/docs/how-to/how-to-implement-sql-statement.md +++ b/docs/how-to/how-to-implement-sql-statement.md @@ -5,9 +5,11 @@ implemented `SqlQueryHandler`: ```rust impl SqlQueryHandler for Instance { - type Error = Error; - - async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { + async fn do_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { // ... } } diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 3bf449b1b2..8ec6de3a08 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -22,7 +22,7 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{CreateTableExpr, QueryRequest}; use client::{Client, Database}; -use common_error::ext::{BoxedError, ErrorExt}; +use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config}; use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; use common_meta::peer::Peer; @@ -47,12 +47,9 @@ use crate::error::{ }; use crate::{Error, FlowAuthHeader}; -/// Just like [`GrpcQueryHandler`] but use BoxedError +/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`]. /// -/// basically just a specialized `GrpcQueryHandler` -/// -/// this is only useful for flownode to -/// invoke frontend Instance in standalone mode +/// This is mainly used by flownode to invoke a frontend instance in standalone mode. #[async_trait::async_trait] pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static { async fn do_query( @@ -64,9 +61,7 @@ pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static { /// auto impl #[async_trait::async_trait] -impl + Send + Sync + 'static> - GrpcQueryHandlerWithBoxedError for T -{ +impl GrpcQueryHandlerWithBoxedError for T { async fn do_query( &self, query: Request, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 1a5f2fad66..f7f0479ce9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -518,12 +518,9 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result { Ok(output) } -#[async_trait] -impl SqlQueryHandler for Instance { - type Error = Error; - - #[tracing::instrument(skip_all)] - async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { +impl Instance { + #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")] + async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { if self.is_suspended() { return vec![error::SuspendedSnafu {}.fail()]; } @@ -589,7 +586,7 @@ impl SqlQueryHandler for Instance { } } - async fn do_exec_plan( + async fn do_exec_plan_inner( &self, stmt: Option, plan: LogicalPlan, @@ -652,8 +649,8 @@ impl SqlQueryHandler for Instance { } } - #[tracing::instrument(skip_all)] - async fn do_promql_query( + #[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")] + async fn do_promql_query_inner( &self, query: &PromQuery, query_ctx: QueryContextRef, @@ -671,7 +668,7 @@ impl SqlQueryHandler for Instance { vec![result] } - async fn do_describe( + async fn do_describe_inner( &self, stmt: Statement, query_ctx: QueryContextRef, @@ -704,7 +701,7 @@ impl SqlQueryHandler for Instance { } } - async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { + async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result { self.catalog_manager .schema_exists(catalog, schema, None) .await @@ -712,6 +709,63 @@ impl SqlQueryHandler for Instance { } } +#[async_trait] +impl SqlQueryHandler for Instance { + async fn do_query( + &self, + query: &str, + query_ctx: QueryContextRef, + ) -> Vec> { + self.do_query_inner(query, query_ctx) + .await + .into_iter() + .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu)) + .collect() + } + + async fn do_exec_plan( + &self, + stmt: Option, + plan: LogicalPlan, + query_ctx: QueryContextRef, + ) -> server_error::Result { + self.do_exec_plan_inner(stmt, plan, query_ctx) + .await + .map_err(BoxedError::new) + .context(server_error::ExecutePlanSnafu) + } + + async fn do_promql_query( + &self, + query: &PromQuery, + query_ctx: QueryContextRef, + ) -> Vec> { + self.do_promql_query_inner(query, query_ctx) + .await + .into_iter() + .map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu)) + .collect() + } + + async fn do_describe( + &self, + stmt: Statement, + query_ctx: QueryContextRef, + ) -> server_error::Result> { + self.do_describe_inner(stmt, query_ctx) + .await + .map_err(BoxedError::new) + .context(server_error::DescribeStatementSnafu) + } + + async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result { + self.is_valid_schema_inner(catalog, schema) + .await + .map_err(BoxedError::new) + .context(server_error::CheckDatabaseValiditySnafu) + } +} + /// Attaches a timer to the output and observes it once the output is exhausted. pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output { match output.data { diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index ee519e8077..c4191145f8 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -37,10 +37,10 @@ use datafusion::datasource::DefaultTableSource; use futures::Stream; use futures::stream::StreamExt; use query::parser::PromQuery; +use servers::error as server_error; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; -use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; use table::TableRef; @@ -59,197 +59,205 @@ use crate::metrics::{ #[async_trait] impl GrpcQueryHandler for Instance { - type Error = Error; + async fn do_query( + &self, + request: Request, + ctx: QueryContextRef, + ) -> server_error::Result { + let result: Result = async { + let interceptor_ref = self.plugins.get::>(); + let interceptor = interceptor_ref.as_ref(); + interceptor.pre_execute(&request, ctx.clone())?; - async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { - let interceptor_ref = self.plugins.get::>(); - let interceptor = interceptor_ref.as_ref(); - interceptor.pre_execute(&request, ctx.clone())?; + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request)) + .context(PermissionSnafu)?; - self.plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request)) - .context(PermissionSnafu)?; - - let output = match request { - Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, - Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) { - Some(physical_table) => { - self.handle_metric_row_inserts( - requests, - ctx.clone(), - physical_table.to_string(), - ) - .await? - } - None => { - self.handle_row_inserts(requests, ctx.clone(), false, false) + let output = match request { + Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, + Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) { + Some(physical_table) => { + self.handle_metric_row_inserts( + requests, + ctx.clone(), + physical_table.to_string(), + ) .await? - } - }, - Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, - Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?, - Request::Query(query_request) => { - let query = query_request.query.context(IncompleteGrpcRequestSnafu { - err_msg: "Missing field 'QueryRequest.query'", - })?; - match query { - Query::Sql(sql) => { - let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer(); - let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await; - ensure!( - result.len() == 1, - NotSupportedSnafu { - feat: "execute multiple statements in SQL query string through GRPC interface" - } - ); - let output = result.remove(0)?; - attach_timer(output, timer) } - Query::LogicalPlan(plan) => { - // this path is useful internally when flownode needs to execute a logical plan through gRPC interface - let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer(); + None => { + self.handle_row_inserts(requests, ctx.clone(), false, false) + .await? + } + }, + Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, + Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?, + Request::Query(query_request) => { + let query = query_request.query.context(IncompleteGrpcRequestSnafu { + err_msg: "Missing field 'QueryRequest.query'", + })?; + match query { + Query::Sql(sql) => { + let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer(); + let mut result = self.do_query_inner(&sql, ctx.clone()).await; + ensure!( + result.len() == 1, + NotSupportedSnafu { + feat: "execute multiple statements in SQL query string through GRPC interface" + } + ); + let output = result.remove(0)?; + attach_timer(output, timer) + } + Query::LogicalPlan(plan) => { + // this path is useful internally when flownode needs to execute a logical plan through gRPC interface + let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer(); - // use dummy catalog to provide table - let plan_decoder = self - .query_engine() - .engine_context(ctx.clone()) - .new_plan_decoder() - .context(PlanStatementSnafu)?; + // use dummy catalog to provide table + let plan_decoder = self + .query_engine() + .engine_context(ctx.clone()) + .new_plan_decoder() + .context(PlanStatementSnafu)?; - let dummy_catalog_list = - Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new( - self.catalog_manager().clone(), - )); + let dummy_catalog_list = + Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new( + self.catalog_manager().clone(), + )); - let logical_plan = plan_decoder - .decode(bytes::Bytes::from(plan), dummy_catalog_list, true) - .await - .context(SubstraitDecodeLogicalPlanSnafu)?; - let output = - SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone()) + let logical_plan = plan_decoder + .decode(bytes::Bytes::from(plan), dummy_catalog_list, true) + .await + .context(SubstraitDecodeLogicalPlanSnafu)?; + let output = + self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?; + + attach_timer(output, timer) + } + Query::InsertIntoPlan(insert) => { + self.handle_insert_plan(insert, ctx.clone()).await? + } + Query::PromRangeQuery(promql) => { + let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer(); + let prom_query = PromQuery { + query: promql.query, + start: promql.start, + end: promql.end, + step: promql.step, + lookback: promql.lookback, + alias: None, + }; + let mut result = + self.do_promql_query_inner(&prom_query, ctx.clone()).await; + ensure!( + result.len() == 1, + NotSupportedSnafu { + feat: "execute multiple statements in PromQL query string through GRPC interface" + } + ); + let output = result.remove(0)?; + attach_timer(output, timer) + } + } + } + Request::Ddl(request) => { + let mut expr = request.expr.context(IncompleteGrpcRequestSnafu { + err_msg: "'expr' is absent in DDL request", + })?; + + fill_catalog_and_schema_from_context(&mut expr, &ctx); + + match expr { + DdlExpr::CreateTable(mut expr) => { + let _ = self + .statement_executor + .create_table_inner(&mut expr, None, ctx.clone()) + .await?; + Output::new_with_affected_rows(0) + } + DdlExpr::AlterDatabase(expr) => { + let _ = self + .statement_executor + .alter_database_inner(expr, ctx.clone()) + .await?; + Output::new_with_affected_rows(0) + } + DdlExpr::AlterTable(expr) => { + self.statement_executor + .alter_table_inner(expr, ctx.clone()) + .await? + } + DdlExpr::CreateDatabase(expr) => { + self.statement_executor + .create_database( + &expr.schema_name, + expr.create_if_not_exists, + expr.options, + ctx.clone(), + ) + .await? + } + DdlExpr::DropTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + self.statement_executor + .drop_table(table_name, expr.drop_if_exists, ctx.clone()) + .await? + } + DdlExpr::TruncateTable(expr) => { + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); + let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default()) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + self.statement_executor + .truncate_table(table_name, time_ranges, ctx.clone()) + .await? + } + DdlExpr::CreateFlow(expr) => { + self.statement_executor + .create_flow_inner(expr, ctx.clone()) + .await? + } + DdlExpr::DropFlow(DropFlowExpr { + catalog_name, + flow_name, + drop_if_exists, + .. + }) => { + self.statement_executor + .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone()) + .await? + } + DdlExpr::CreateView(expr) => { + let _ = self + .statement_executor + .create_view_by_expr(expr, ctx.clone()) .await?; - attach_timer(output, timer) - } - Query::InsertIntoPlan(insert) => { - self.handle_insert_plan(insert, ctx.clone()).await? - } - Query::PromRangeQuery(promql) => { - let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer(); - let prom_query = PromQuery { - query: promql.query, - start: promql.start, - end: promql.end, - step: promql.step, - lookback: promql.lookback, - alias: None, - }; - let mut result = - SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; - ensure!( - result.len() == 1, - NotSupportedSnafu { - feat: "execute multiple statements in PromQL query string through GRPC interface" - } - ); - let output = result.remove(0)?; - attach_timer(output, timer) + Output::new_with_affected_rows(0) + } + DdlExpr::DropView(_) => { + todo!("implemented in the following PR") + } + DdlExpr::CommentOn(expr) => { + self.statement_executor + .comment_by_expr(expr, ctx.clone()) + .await? + } } } - } - Request::Ddl(request) => { - let mut expr = request.expr.context(IncompleteGrpcRequestSnafu { - err_msg: "'expr' is absent in DDL request", - })?; + }; - fill_catalog_and_schema_from_context(&mut expr, &ctx); + let output = interceptor.post_execute(output, ctx)?; + Ok(output) + } + .await; - match expr { - DdlExpr::CreateTable(mut expr) => { - let _ = self - .statement_executor - .create_table_inner(&mut expr, None, ctx.clone()) - .await?; - Output::new_with_affected_rows(0) - } - DdlExpr::AlterDatabase(expr) => { - let _ = self - .statement_executor - .alter_database_inner(expr, ctx.clone()) - .await?; - Output::new_with_affected_rows(0) - } - DdlExpr::AlterTable(expr) => { - self.statement_executor - .alter_table_inner(expr, ctx.clone()) - .await? - } - DdlExpr::CreateDatabase(expr) => { - self.statement_executor - .create_database( - &expr.schema_name, - expr.create_if_not_exists, - expr.options, - ctx.clone(), - ) - .await? - } - DdlExpr::DropTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.statement_executor - .drop_table(table_name, expr.drop_if_exists, ctx.clone()) - .await? - } - DdlExpr::TruncateTable(expr) => { - let table_name = - TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default()) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - self.statement_executor - .truncate_table(table_name, time_ranges, ctx.clone()) - .await? - } - DdlExpr::CreateFlow(expr) => { - self.statement_executor - .create_flow_inner(expr, ctx.clone()) - .await? - } - DdlExpr::DropFlow(DropFlowExpr { - catalog_name, - flow_name, - drop_if_exists, - .. - }) => { - self.statement_executor - .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone()) - .await? - } - DdlExpr::CreateView(expr) => { - let _ = self - .statement_executor - .create_view_by_expr(expr, ctx.clone()) - .await?; - - Output::new_with_affected_rows(0) - } - DdlExpr::DropView(_) => { - todo!("implemented in the following PR") - } - DdlExpr::CommentOn(expr) => { - self.statement_executor - .comment_by_expr(expr, ctx.clone()) - .await? - } - } - } - }; - - let output = interceptor.post_execute(output, ctx)?; - Ok(output) + result + .map_err(BoxedError::new) + .context(server_error::ExecuteGrpcQuerySnafu) } async fn put_record_batch( @@ -257,51 +265,126 @@ impl GrpcQueryHandler for Instance { request: servers::grpc::flight::PutRecordBatchRequest, table_ref: &mut Option, ctx: QueryContextRef, - ) -> Result { - let table = if let Some(table) = table_ref { - table.clone() - } else { - let table = self - .catalog_manager() - .table( - &request.table_name.catalog_name, - &request.table_name.schema_name, - &request.table_name.table_name, - None, + ) -> server_error::Result { + let result: Result = async { + let table = if let Some(table) = table_ref { + table.clone() + } else { + let table = self + .catalog_manager() + .table( + &request.table_name.catalog_name, + &request.table_name.schema_name, + &request.table_name.table_name, + None, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: request.table_name.to_string(), + })?; + *table_ref = Some(table.clone()); + table + }; + + let interceptor_ref = self.plugins.get::>(); + let interceptor = interceptor_ref.as_ref(); + interceptor.pre_bulk_insert(table.clone(), ctx.clone())?; + + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::BulkInsert) + .context(PermissionSnafu)?; + + // do we check limit for bulk insert? + + self.inserter + .handle_bulk_insert( + table, + request.flight_data, + request.record_batch, + request.schema_bytes, ) .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: request.table_name.to_string(), - })?; - *table_ref = Some(table.clone()); - table - }; + .context(TableOperationSnafu) + } + .await; - let interceptor_ref = self.plugins.get::>(); - let interceptor = interceptor_ref.as_ref(); - interceptor.pre_bulk_insert(table.clone(), ctx.clone())?; - - self.plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::BulkInsert) - .context(PermissionSnafu)?; - - // do we check limit for bulk insert? - - self.inserter - .handle_bulk_insert( - table, - request.flight_data, - request.record_batch, - request.schema_bytes, - ) - .await - .context(TableOperationSnafu) + result + .map_err(BoxedError::new) + .context(server_error::ExecuteGrpcRequestSnafu) } fn handle_put_record_batch_stream( + &self, + stream: servers::grpc::flight::PutRecordBatchRequestStream, + ctx: QueryContextRef, + ) -> Pin> + Send>> { + Box::pin( + self.handle_put_record_batch_stream_inner(stream, ctx) + .map(|result| { + result + .map_err(BoxedError::new) + .context(server_error::ExecuteGrpcRequestSnafu) + }), + ) + } +} + +fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) { + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + + macro_rules! check_and_fill { + ($expr:ident) => { + if $expr.catalog_name.is_empty() { + $expr.catalog_name = catalog.to_string(); + } + if $expr.schema_name.is_empty() { + $expr.schema_name = schema.to_string(); + } + }; + } + + match ddl_expr { + Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ } + Expr::CreateTable(expr) => { + check_and_fill!(expr); + } + Expr::AlterTable(expr) => { + check_and_fill!(expr); + } + Expr::DropTable(expr) => { + check_and_fill!(expr); + } + Expr::TruncateTable(expr) => { + check_and_fill!(expr); + } + Expr::CreateFlow(expr) => { + if expr.catalog_name.is_empty() { + expr.catalog_name = catalog.to_string(); + } + } + Expr::DropFlow(expr) => { + if expr.catalog_name.is_empty() { + expr.catalog_name = catalog.to_string(); + } + } + Expr::CreateView(expr) => { + check_and_fill!(expr); + } + Expr::DropView(expr) => { + check_and_fill!(expr); + } + Expr::CommentOn(expr) => { + check_and_fill!(expr); + } + } +} + +impl Instance { + fn handle_put_record_batch_stream_inner( &self, mut stream: servers::grpc::flight::PutRecordBatchRequestStream, ctx: QueryContextRef, @@ -372,60 +455,7 @@ impl GrpcQueryHandler for Instance { } }) } -} -fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) { - let catalog = ctx.current_catalog(); - let schema = ctx.current_schema(); - - macro_rules! check_and_fill { - ($expr:ident) => { - if $expr.catalog_name.is_empty() { - $expr.catalog_name = catalog.to_string(); - } - if $expr.schema_name.is_empty() { - $expr.schema_name = schema.to_string(); - } - }; - } - - match ddl_expr { - Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ } - Expr::CreateTable(expr) => { - check_and_fill!(expr); - } - Expr::AlterTable(expr) => { - check_and_fill!(expr); - } - Expr::DropTable(expr) => { - check_and_fill!(expr); - } - Expr::TruncateTable(expr) => { - check_and_fill!(expr); - } - Expr::CreateFlow(expr) => { - if expr.catalog_name.is_empty() { - expr.catalog_name = catalog.to_string(); - } - } - Expr::DropFlow(expr) => { - if expr.catalog_name.is_empty() { - expr.catalog_name = catalog.to_string(); - } - } - Expr::CreateView(expr) => { - check_and_fill!(expr); - } - Expr::DropView(expr) => { - check_and_fill!(expr); - } - Expr::CommentOn(expr) => { - check_and_fill!(expr); - } - } -} - -impl Instance { async fn handle_insert_plan( &self, insert: InsertIntoPlan, @@ -493,7 +523,9 @@ impl Instance { // Optimize the plan let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?; - let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?; + let output = self + .do_exec_plan_inner(None, optimized_plan, ctx.clone()) + .await?; Ok(attach_timer(output, timer)) } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 207c5416db..45c3ec3649 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -38,8 +38,6 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::otel_arrow::OtelArrowServiceHandler; use servers::postgres::PostgresServer; -use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; -use servers::query_handler::sql::ServerSqlQueryHandlerAdapter; use servers::request_memory_limiter::ServerMemoryLimiter; use servers::server::{Server, ServerHandlers}; use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config}; @@ -105,7 +103,7 @@ where ) -> HttpServerBuilder { let mut builder = HttpServerBuilder::new(opts.http.clone()) .with_memory_limiter(request_memory_limiter) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone())); + .with_sql_handler(self.instance.clone()); let validator = self.plugins.get::(); let ingest_interceptor = self.plugins.get::>(); @@ -212,7 +210,7 @@ where }; let greptime_request_handler = GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), + self.instance.clone(), user_provider.clone(), runtime, grpc.flight_compression, @@ -336,10 +334,7 @@ where let mysql_server = MysqlServer::create_server( common_runtime::global_runtime(), - Arc::new(MysqlSpawnRef::new( - ServerSqlQueryHandlerAdapter::arc(instance.clone()), - user_provider.clone(), - )), + Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())), Arc::new(MysqlSpawnConfig::new( opts.tls.should_force_tls(), tls_server_config, @@ -364,7 +359,7 @@ where maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?; let pg_server = Box::new(PostgresServer::new( - ServerSqlQueryHandlerAdapter::arc(instance.clone()), + instance.clone(), opts.tls.should_force_tls(), tls_server_config, opts.keep_alive.as_secs(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 1745284692..5211360985 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1318,9 +1318,8 @@ mod test { use tokio::time::Instant; use super::*; - use crate::error::Error; use crate::http::test_helpers::TestClient; - use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler}; + use crate::query_handler::sql::SqlQueryHandler; struct DummyInstance { _tx: mpsc::Sender<(String, Vec)>, @@ -1328,17 +1327,11 @@ mod test { #[async_trait] impl SqlQueryHandler for DummyInstance { - type Error = Error; - async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } - async fn do_promql_query( - &self, - _: &PromQuery, - _: QueryContextRef, - ) -> Vec> { + async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -1347,7 +1340,7 @@ mod test { _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, - ) -> std::result::Result { + ) -> Result { unimplemented!() } @@ -1378,9 +1371,8 @@ mod test { fn make_test_app_custom(tx: mpsc::Sender<(String, Vec)>, options: HttpOptions) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); - let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone()); let server = HttpServerBuilder::new(options) - .with_sql_handler(sql_instance) + .with_sql_handler(instance.clone()) .build(); server.build(server.make_app()).unwrap().route( "/test/timeout", diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index 2403c82905..67d8b3890e 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -18,96 +18,33 @@ use std::sync::Arc; use api::v1::greptime_request::Request; use async_trait::async_trait; use common_base::AffectedRows; -use common_error::ext::{BoxedError, ErrorExt}; use common_grpc::flight::do_put::DoPutResponse; use common_query::Output; use futures::Stream; use session::context::QueryContextRef; -use snafu::ResultExt; use table::TableRef; -use crate::error::{self, Result}; +use crate::error::Result; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; -pub type GrpcQueryHandlerRef = Arc + Send + Sync>; -pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef; +pub type ServerGrpcQueryHandlerRef = Arc; pub type RawRecordBatch = bytes::Bytes; #[async_trait] pub trait GrpcQueryHandler { - type Error: ErrorExt; - - async fn do_query( - &self, - query: Request, - ctx: QueryContextRef, - ) -> std::result::Result; + async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result; async fn put_record_batch( &self, request: PutRecordBatchRequest, table_ref: &mut Option, ctx: QueryContextRef, - ) -> std::result::Result; + ) -> Result; fn handle_put_record_batch_stream( &self, stream: PutRecordBatchRequestStream, ctx: QueryContextRef, - ) -> Pin> + Send>>; -} - -pub struct ServerGrpcQueryHandlerAdapter(GrpcQueryHandlerRef); - -impl ServerGrpcQueryHandlerAdapter { - pub fn arc(handler: GrpcQueryHandlerRef) -> Arc { - Arc::new(Self(handler)) - } -} - -#[async_trait] -impl GrpcQueryHandler for ServerGrpcQueryHandlerAdapter -where - E: ErrorExt + Send + Sync + 'static, -{ - type Error = error::Error; - - async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result { - self.0 - .do_query(query, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu) - } - - async fn put_record_batch( - &self, - request: PutRecordBatchRequest, - table_ref: &mut Option, - ctx: QueryContextRef, - ) -> Result { - self.0 - .put_record_batch(request, table_ref, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcRequestSnafu) - } - - fn handle_put_record_batch_stream( - &self, - stream: PutRecordBatchRequestStream, - ctx: QueryContextRef, - ) -> Pin> + Send>> { - use futures_util::StreamExt; - Box::pin( - self.0 - .handle_put_record_batch_stream(stream, ctx) - .map(|result| { - result - .map_err(|e| BoxedError::new(e)) - .context(error::ExecuteGrpcRequestSnafu) - }), - ) - } + ) -> Pin> + Send>>; } diff --git a/src/servers/src/query_handler/sql.rs b/src/servers/src/query_handler/sql.rs index aebb566f80..544f5bceb6 100644 --- a/src/servers/src/query_handler/sql.rs +++ b/src/servers/src/query_handler/sql.rs @@ -15,123 +15,39 @@ use std::sync::Arc; use async_trait::async_trait; -use common_error::ext::{BoxedError, ErrorExt}; use common_query::Output; use datafusion_expr::LogicalPlan; use query::parser::PromQuery; +use query::query_engine::DescribeResult; use session::context::QueryContextRef; -use snafu::ResultExt; use sql::statements::statement::Statement; -use crate::error::{self, Result}; +use crate::error::Result; -pub type SqlQueryHandlerRef = Arc + Send + Sync>; -pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef; -use query::query_engine::DescribeResult; +pub type ServerSqlQueryHandlerRef = Arc; #[async_trait] pub trait SqlQueryHandler { - type Error: ErrorExt; - - async fn do_query( - &self, - query: &str, - query_ctx: QueryContextRef, - ) -> Vec>; + async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec>; async fn do_exec_plan( &self, stmt: Option, plan: LogicalPlan, query_ctx: QueryContextRef, - ) -> std::result::Result; + ) -> Result; async fn do_promql_query( &self, query: &PromQuery, query_ctx: QueryContextRef, - ) -> Vec>; + ) -> Vec>; async fn do_describe( &self, stmt: Statement, query_ctx: QueryContextRef, - ) -> std::result::Result, Self::Error>; + ) -> Result>; - async fn is_valid_schema( - &self, - catalog: &str, - schema: &str, - ) -> std::result::Result; -} - -pub struct ServerSqlQueryHandlerAdapter(SqlQueryHandlerRef); - -impl ServerSqlQueryHandlerAdapter { - pub fn arc(handler: SqlQueryHandlerRef) -> Arc { - Arc::new(Self(handler)) - } -} - -#[async_trait] -impl SqlQueryHandler for ServerSqlQueryHandlerAdapter -where - E: ErrorExt + Send + Sync + 'static, -{ - type Error = error::Error; - - async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - self.0 - .do_query(query, query_ctx) - .await - .into_iter() - .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu)) - .collect() - } - - async fn do_exec_plan( - &self, - stmt: Option, - plan: LogicalPlan, - query_ctx: QueryContextRef, - ) -> Result { - self.0 - .do_exec_plan(stmt, plan, query_ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecutePlanSnafu) - } - - async fn do_promql_query( - &self, - query: &PromQuery, - query_ctx: QueryContextRef, - ) -> Vec> { - self.0 - .do_promql_query(query, query_ctx) - .await - .into_iter() - .map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu)) - .collect() - } - - async fn do_describe( - &self, - stmt: Statement, - query_ctx: QueryContextRef, - ) -> Result> { - self.0 - .do_describe(stmt, query_ctx) - .await - .map_err(BoxedError::new) - .context(error::DescribeStatementSnafu) - } - - async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result { - self.0 - .is_valid_schema(catalog, schema) - .await - .map_err(BoxedError::new) - .context(error::CheckDatabaseValiditySnafu) - } + async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result; } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index a88b0750ea..2939d2a11c 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -23,7 +23,7 @@ use common_test_util::ports; use datafusion_expr::LogicalPlan; use query::parser::PromQuery; use query::query_engine::DescribeResult; -use servers::error::{Error, Result}; +use servers::error::Result; use servers::http::header::constants::GREPTIME_DB_HEADER_NAME; use servers::http::test_helpers::TestClient; use servers::http::{HttpOptions, HttpServerBuilder}; @@ -52,8 +52,6 @@ impl InfluxdbLineProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { - type Error = Error; - async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -63,15 +61,11 @@ impl SqlQueryHandler for DummyInstance { _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, - ) -> std::result::Result { + ) -> Result { unimplemented!() } - async fn do_promql_query( - &self, - _: &PromQuery, - _: QueryContextRef, - ) -> Vec> { + async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 0fe6ff8df3..a7a044d4db 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -52,8 +52,6 @@ impl OpentsdbProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { - type Error = error::Error; - async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -63,15 +61,11 @@ impl SqlQueryHandler for DummyInstance { _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, - ) -> std::result::Result { + ) -> Result { unimplemented!() } - async fn do_promql_query( - &self, - _: &PromQuery, - _: QueryContextRef, - ) -> Vec> { + async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 55ce9089bf..40c6b03670 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -26,7 +26,7 @@ use datafusion_expr::LogicalPlan; use prost::Message; use query::parser::PromQuery; use query::query_engine::DescribeResult; -use servers::error::{Error, Result}; +use servers::error::Result; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::test_helpers::TestClient; use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode}; @@ -80,8 +80,6 @@ impl PromStoreProtocolHandler for DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { - type Error = Error; - async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -91,15 +89,11 @@ impl SqlQueryHandler for DummyInstance { _stmt: Option, _plan: LogicalPlan, _query_ctx: QueryContextRef, - ) -> std::result::Result { + ) -> Result { unimplemented!() } - async fn do_promql_query( - &self, - _: &PromQuery, - _: QueryContextRef, - ) -> Vec> { + async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec> { unimplemented!() } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 3f85b6d3ad..e3f8f8fc79 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -27,7 +27,7 @@ use query::options::QueryOptions; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; -use servers::error::{Error, NotSupportedSnafu, Result}; +use servers::error::{NotSupportedSnafu, Result}; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler}; use session::context::QueryContextRef; @@ -52,8 +52,6 @@ impl DummyInstance { #[async_trait] impl SqlQueryHandler for DummyInstance { - type Error = Error; - async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { let stmt = QueryLanguageParser::parse_sql(query, &query_ctx).unwrap(); let plan = self @@ -75,11 +73,7 @@ impl SqlQueryHandler for DummyInstance { Ok(self.query_engine.execute(plan, query_ctx).await.unwrap()) } - async fn do_promql_query( - &self, - _: &PromQuery, - _: QueryContextRef, - ) -> Vec> { + async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec> { unimplemented!() } @@ -109,13 +103,7 @@ impl SqlQueryHandler for DummyInstance { #[async_trait] impl GrpcQueryHandler for DummyInstance { - type Error = Error; - - async fn do_query( - &self, - request: Request, - ctx: QueryContextRef, - ) -> std::result::Result { + async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { let output = match request { Request::Inserts(_) | Request::Deletes(_) @@ -166,7 +154,7 @@ impl GrpcQueryHandler for DummyInstance { _request: servers::grpc::flight::PutRecordBatchRequest, _table_ref: &mut Option, _ctx: QueryContextRef, - ) -> std::result::Result { + ) -> Result { unimplemented!() } @@ -174,9 +162,7 @@ impl GrpcQueryHandler for DummyInstance { &self, _stream: servers::grpc::flight::PutRecordBatchRequestStream, _ctx: QueryContextRef, - ) -> std::pin::Pin< - Box> + Send>, - > { + ) -> std::pin::Pin> + Send>> { unimplemented!() } } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index aa0d2356ff..9ed6b8176f 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -35,7 +35,6 @@ mod test { use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{FlightCompression, GrpcServerConfig}; - use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; use servers::server::Server; use crate::cluster::GreptimeDbClusterBuilder; @@ -88,7 +87,7 @@ mod test { let runtime = common_runtime::global_runtime().clone(); let greptime_request_handler = GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdapter::arc(db.frontend.instance.clone()), + db.frontend.instance.clone(), user_provider_from_option("static_user_provider:cmd:greptime_user=greptime_pwd").ok(), Some(runtime.clone()), FlightCompression::default(), diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index f58f2ece3c..6d39473e97 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -23,13 +23,15 @@ mod tests { use client::OutputData; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; use common_meta::key::table_name::TableNameKey; use common_meta::rpc::router::region_distribution; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; use datafusion_expr::LogicalPlan; - use frontend::error::{self, Error, Result}; + use frontend::error::{Error, Result}; use frontend::instance::Instance; use query::parser::QueryLanguageParser; use query::query_engine::DefaultSerializer; @@ -443,7 +445,7 @@ mod tests { .await .remove(0) { - assert!(matches!(e, error::Error::NotSupported { .. })); + assert_eq!(e.status_code(), StatusCode::Unsupported); } else { unreachable!(); } @@ -453,7 +455,7 @@ mod tests { .await .remove(0) { - assert!(matches!(e, error::Error::NotSupported { .. })); + assert_eq!(e.status_code(), StatusCode::Unsupported); } else { unreachable!(); } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 89e98c9bc6..9d7ee592a4 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -48,8 +48,7 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::otel_arrow::OtelArrowServiceHandler; use servers::postgres::PostgresServer; -use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter; -use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler}; +use servers::query_handler::sql::SqlQueryHandler; use servers::request_memory_limiter::ServerMemoryLimiter; use servers::server::Server; use servers::tls::ReloadableTlsServerConfig; @@ -416,9 +415,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ..Default::default() }; let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc( - instance.fe_instance().clone(), - )) + .with_sql_handler(instance.fe_instance().clone()) .with_logs_handler(instance.fe_instance().clone()) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap()) @@ -474,9 +471,7 @@ pub async fn setup_test_http_app_with_frontend_and_custom_options( }); let mut http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc( - instance.fe_instance().clone(), - )) + .with_sql_handler(instance.fe_instance().clone()) .with_log_ingest_handler(instance.fe_instance().clone(), None, None) .with_logs_handler(instance.fe_instance().clone()) .with_influxdb_handler(instance.fe_instance().clone()) @@ -563,7 +558,7 @@ pub async fn setup_test_prom_app_with_frontend( }; let frontend_ref = instance.fe_instance().clone(); let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) + .with_sql_handler(frontend_ref.clone()) .with_logs_handler(instance.fe_instance().clone()) .with_prom_handler( frontend_ref.clone(), @@ -611,7 +606,7 @@ pub async fn setup_grpc_server_with( let fe_instance_ref = instance.fe_instance().clone(); let greptime_request_handler = GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()), + fe_instance_ref.clone(), user_provider.clone(), Some(runtime.clone()), FlightCompression::default(), @@ -677,10 +672,7 @@ pub async fn setup_mysql_server_with_user_provider( }; let mut mysql_server = MysqlServer::create_server( runtime, - Arc::new(MysqlSpawnRef::new( - ServerSqlQueryHandlerAdapter::arc(fe_instance_ref), - user_provider, - )), + Arc::new(MysqlSpawnRef::new(fe_instance_ref, user_provider)), Arc::new(MysqlSpawnConfig::new( false, Arc::new( @@ -735,7 +727,7 @@ pub async fn setup_pg_server_with_user_provider( ); let mut pg_server = Box::new(PostgresServer::new( - ServerSqlQueryHandlerAdapter::arc(fe_instance_ref), + fe_instance_ref, opts.tls.should_force_tls(), tls_server_config, 0, diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index db72f75d35..8c2967bc2a 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; use std::env; use std::sync::Arc; use client::{DEFAULT_SCHEMA_NAME, OutputData}; use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_error::ext::ErrorExt; use common_query::Output; use common_recordbatch::util; use common_test_util::recordbatch::check_output_stream; @@ -25,11 +25,12 @@ use common_test_util::temp_dir; use datatypes::arrow::array::{ ArrayRef, AsArray, StringArray, TimestampMillisecondArray, UInt64Array, }; -use frontend::error::{Error, Result}; +use frontend::error::Error; use frontend::instance::Instance; use operator::error::Error as OperatorError; use rstest::rstest; use rstest_reuse::apply; +use servers::error as server_error; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; @@ -176,7 +177,11 @@ async fn test_extra_external_table_options(instance: Arc) { ); let result = try_execute_sql(&frontend, sql).await; - assert!(matches!(result, Err(Error::ParseSql { .. }))); + let err = result.unwrap_err(); + assert!(matches!( + unwrap_frontend_error(&err), + Error::ParseSql { .. } + )); } #[apply(both_instances_cases)] @@ -385,19 +390,22 @@ async fn test_execute_insert_by_select(instance: Arc) { .data; assert!(matches!(output, OutputData::AffectedRows(2))); + let err = try_execute_sql(&instance, "insert into demo2(host) select * from demo1") + .await + .unwrap_err(); assert!(matches!( - try_execute_sql(&instance, "insert into demo2(host) select * from demo1") - .await - .unwrap_err(), + unwrap_frontend_error(&err), Error::TableOperation { source: OperatorError::PlanStatement { .. }, .. } )); + + let err = try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1") + .await + .unwrap_err(); assert!(matches!( - try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1") - .await - .unwrap_err(), + unwrap_frontend_error(&err), Error::TableOperation { source: OperatorError::PlanStatement { .. }, .. @@ -633,7 +641,11 @@ async fn test_execute_external_create_without_ts(instance: Arc ), ) .await; - assert_matches!(result, Err(Error::ParseSql { .. })); + let err = result.unwrap_err(); + assert!(matches!( + unwrap_frontend_error(&err), + Error::ParseSql { .. } + )); } #[apply(both_instances_cases)] @@ -655,7 +667,11 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc) { check_output_stream(output, expected).await; } +fn unwrap_frontend_error(err: &server_error::Error) -> &Error { + match err { + server_error::Error::ExecuteQuery { source, .. } + | server_error::Error::ExecutePlan { source, .. } + | server_error::Error::ExecuteGrpcQuery { source, .. } + | server_error::Error::ExecuteGrpcRequest { source, .. } + | server_error::Error::CheckDatabaseValidity { source, .. } => source + .as_any() + .downcast_ref::() + .expect("Expected frontend::error::Error inside BoxedError"), + server_error::Error::DescribeStatement { source } => source + .as_any() + .downcast_ref::() + .expect("Expected frontend::error::Error inside BoxedError"), + other => panic!("Unexpected error type: {other}"), + } +} + async fn execute_sql(instance: &Arc, sql: &str) -> Output { execute_sql_with(instance, sql, QueryContext::arc()).await } -async fn try_execute_sql(instance: &Arc, sql: &str) -> Result { +async fn try_execute_sql(instance: &Arc, sql: &str) -> server_error::Result { try_execute_sql_with(instance, sql, QueryContext::arc()).await } @@ -2093,7 +2131,7 @@ async fn try_execute_sql_with( instance: &Arc, sql: &str, query_ctx: QueryContextRef, -) -> Result { +) -> server_error::Result { instance.do_query(sql, query_ctx).await.remove(0) } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 7ceec77c55..9cb0ed5315 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -33,7 +33,6 @@ use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig} use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::datanode::Datanode; -use frontend::error::Result; use frontend::instance::Instance; use futures::TryStreamExt; use meta_srv::metasrv::Metasrv; @@ -439,7 +438,10 @@ pub fn find_testing_resource(path: &str) -> String { prepare_path(&p) } -pub async fn try_execute_sql(instance: &Arc, sql: &str) -> Result { +pub async fn try_execute_sql( + instance: &Arc, + sql: &str, +) -> servers::error::Result { try_execute_sql_with(instance, sql, QueryContext::arc()).await } @@ -447,7 +449,7 @@ pub async fn try_execute_sql_with( instance: &Arc, sql: &str, query_ctx: QueryContextRef, -) -> Result { +) -> servers::error::Result { instance.do_query(sql, query_ctx).await.remove(0) } diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index b9f106e183..4e26fab3bb 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -36,7 +36,6 @@ use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datatypes::arrow::array::AsArray; use datatypes::arrow::datatypes::UInt64Type; -use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::future::BoxFuture; use meta_srv::error; @@ -52,6 +51,7 @@ use meta_srv::procedure::region_migration::{ }; use meta_srv::selector::{Selector, SelectorOptions}; use sea_query::{Expr, Iden, Order, PostgresQueryBuilder, Query}; +use servers::error::Result as ServerResult; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use store_api::storage::RegionId; @@ -1258,7 +1258,7 @@ async fn query_procedure_by_sql(instance: &Arc, pid: &str) -> String { column.value(0).to_string() } -async fn insert_values(instance: &Arc, ts: u64) -> Vec> { +async fn insert_values(instance: &Arc, ts: u64) -> Vec> { let query_ctx = QueryContext::arc(); let mut results = Vec::new(); @@ -1279,7 +1279,7 @@ async fn run_sql( instance: &Arc, sql: &str, query_ctx: QueryContextRef, -) -> FrontendResult { +) -> ServerResult { info!("Run SQL: {sql}"); instance.do_query(sql, query_ctx).await.remove(0) } diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs index b1dbee37ef..5421a67759 100644 --- a/tests-integration/tests/repartition.rs +++ b/tests-integration/tests/repartition.rs @@ -23,11 +23,11 @@ use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::DatanodeWalConfig; -use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use meta_srv::gc::{self, BatchGcProcedure, GcSchedulerOptions, GcTickerRef}; use meta_srv::metasrv::Metasrv; use mito2::gc::GcConfig; +use servers::error::Result as ServerResult; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use store_api::codec::PrimaryKeyEncoding; @@ -867,7 +867,7 @@ async fn run_sql( instance: &Arc, sql: &str, query_ctx: QueryContextRef, -) -> FrontendResult { +) -> ServerResult { info!("Run SQL: {sql}"); instance.do_query(sql, query_ctx).await.remove(0) }