refactor: SQL and gRPC server handlers (#7637)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-01-30 15:19:45 +08:00
committed by GitHub
parent 235eb39e5b
commit 38b5df574c
19 changed files with 487 additions and 563 deletions

View File

@@ -5,9 +5,11 @@ implemented `SqlQueryHandler`:
```rust
impl SqlQueryHandler for Instance {
type Error = Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<servers::error::Result<Output>> {
// ...
}
}

View File

@@ -22,7 +22,7 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::{BoxedError, ErrorExt};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
@@ -47,12 +47,9 @@ use crate::error::{
};
use crate::{Error, FlowAuthHeader};
/// Just like [`GrpcQueryHandler`] but use BoxedError
/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
///
/// basically just a specialized `GrpcQueryHandler<Error=BoxedError>`
///
/// this is only useful for flownode to
/// invoke frontend Instance in standalone mode
/// This is mainly used by flownode to invoke a frontend instance in standalone mode.
#[async_trait::async_trait]
pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
async fn do_query(
@@ -64,9 +61,7 @@ pub trait GrpcQueryHandlerWithBoxedError: Send + Sync + 'static {
/// auto impl
#[async_trait::async_trait]
impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send + Sync + 'static>
GrpcQueryHandlerWithBoxedError for T
{
impl<T: GrpcQueryHandler + Send + Sync + 'static> GrpcQueryHandlerWithBoxedError for T {
async fn do_query(
&self,
query: Request,

View File

@@ -518,12 +518,9 @@ fn attach_timeout(output: Output, mut timeout: Duration) -> Result<Output> {
Ok(output)
}
#[async_trait]
impl SqlQueryHandler for Instance {
type Error = Error;
#[tracing::instrument(skip_all)]
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
impl Instance {
#[tracing::instrument(skip_all, name = "SqlQueryHandler::do_query")]
async fn do_query_inner(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
if self.is_suspended() {
return vec![error::SuspendedSnafu {}.fail()];
}
@@ -589,7 +586,7 @@ impl SqlQueryHandler for Instance {
}
}
async fn do_exec_plan(
async fn do_exec_plan_inner(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
@@ -652,8 +649,8 @@ impl SqlQueryHandler for Instance {
}
}
#[tracing::instrument(skip_all)]
async fn do_promql_query(
#[tracing::instrument(skip_all, name = "SqlQueryHandler::do_promql_query")]
async fn do_promql_query_inner(
&self,
query: &PromQuery,
query_ctx: QueryContextRef,
@@ -671,7 +668,7 @@ impl SqlQueryHandler for Instance {
vec![result]
}
async fn do_describe(
async fn do_describe_inner(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
@@ -704,7 +701,7 @@ impl SqlQueryHandler for Instance {
}
}
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn is_valid_schema_inner(&self, catalog: &str, schema: &str) -> Result<bool> {
self.catalog_manager
.schema_exists(catalog, schema, None)
.await
@@ -712,6 +709,63 @@ impl SqlQueryHandler for Instance {
}
}
#[async_trait]
impl SqlQueryHandler for Instance {
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<server_error::Result<Output>> {
self.do_query_inner(query, query_ctx)
.await
.into_iter()
.map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
.collect()
}
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
self.do_exec_plan_inner(stmt, plan, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::ExecutePlanSnafu)
}
async fn do_promql_query(
&self,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<server_error::Result<Output>> {
self.do_promql_query_inner(query, query_ctx)
.await
.into_iter()
.map(|result| result.map_err(BoxedError::new).context(ExecuteQuerySnafu))
.collect()
}
async fn do_describe(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> server_error::Result<Option<DescribeResult>> {
self.do_describe_inner(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(server_error::DescribeStatementSnafu)
}
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> server_error::Result<bool> {
self.is_valid_schema_inner(catalog, schema)
.await
.map_err(BoxedError::new)
.context(server_error::CheckDatabaseValiditySnafu)
}
}
/// Attaches a timer to the output and observes it once the output is exhausted.
pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
match output.data {

View File

@@ -37,10 +37,10 @@ use datafusion::datasource::DefaultTableSource;
use futures::Stream;
use futures::stream::StreamExt;
use query::parser::PromQuery;
use servers::error as server_error;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use table::TableRef;
@@ -59,197 +59,205 @@ use crate::metrics::{
#[async_trait]
impl GrpcQueryHandler for Instance {
type Error = Error;
async fn do_query(
&self,
request: Request,
ctx: QueryContextRef,
) -> server_error::Result<Output> {
let result: Result<Output> = async {
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_execute(&request, ctx.clone())?;
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_execute(&request, ctx.clone())?;
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
Some(physical_table) => {
self.handle_metric_row_inserts(
requests,
ctx.clone(),
physical_table.to_string(),
)
.await?
}
None => {
self.handle_row_inserts(requests, ctx.clone(), false, false)
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => match ctx.extension(PHYSICAL_TABLE_PARAM) {
Some(physical_table) => {
self.handle_metric_row_inserts(
requests,
ctx.clone(),
physical_table.to_string(),
)
.await?
}
},
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcRequestSnafu {
err_msg: "Missing field 'QueryRequest.query'",
})?;
match query {
Query::Sql(sql) => {
let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
ensure!(
result.len() == 1,
NotSupportedSnafu {
feat: "execute multiple statements in SQL query string through GRPC interface"
}
);
let output = result.remove(0)?;
attach_timer(output, timer)
}
Query::LogicalPlan(plan) => {
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
None => {
self.handle_row_inserts(requests, ctx.clone(), false, false)
.await?
}
},
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
let query = query_request.query.context(IncompleteGrpcRequestSnafu {
err_msg: "Missing field 'QueryRequest.query'",
})?;
match query {
Query::Sql(sql) => {
let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
let mut result = self.do_query_inner(&sql, ctx.clone()).await;
ensure!(
result.len() == 1,
NotSupportedSnafu {
feat: "execute multiple statements in SQL query string through GRPC interface"
}
);
let output = result.remove(0)?;
attach_timer(output, timer)
}
Query::LogicalPlan(plan) => {
// this path is useful internally when flownode needs to execute a logical plan through gRPC interface
let timer = GRPC_HANDLE_PLAN_ELAPSED.start_timer();
// use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
// use dummy catalog to provide table
let plan_decoder = self
.query_engine()
.engine_context(ctx.clone())
.new_plan_decoder()
.context(PlanStatementSnafu)?;
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
let dummy_catalog_list =
Arc::new(catalog::table_source::dummy_catalog::DummyCatalogList::new(
self.catalog_manager().clone(),
));
let logical_plan = plan_decoder
.decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output =
SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
let logical_plan = plan_decoder
.decode(bytes::Bytes::from(plan), dummy_catalog_list, true)
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output =
self.do_exec_plan_inner(None, logical_plan, ctx.clone()).await?;
attach_timer(output, timer)
}
Query::InsertIntoPlan(insert) => {
self.handle_insert_plan(insert, ctx.clone()).await?
}
Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
let prom_query = PromQuery {
query: promql.query,
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
alias: None,
};
let mut result =
self.do_promql_query_inner(&prom_query, ctx.clone()).await;
ensure!(
result.len() == 1,
NotSupportedSnafu {
feat: "execute multiple statements in PromQL query string through GRPC interface"
}
);
let output = result.remove(0)?;
attach_timer(output, timer)
}
}
}
Request::Ddl(request) => {
let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
err_msg: "'expr' is absent in DDL request",
})?;
fill_catalog_and_schema_from_context(&mut expr, &ctx);
match expr {
DdlExpr::CreateTable(mut expr) => {
let _ = self
.statement_executor
.create_table_inner(&mut expr, None, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterDatabase(expr) => {
let _ = self
.statement_executor
.alter_database_inner(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterTable(expr) => {
self.statement_executor
.alter_table_inner(expr, ctx.clone())
.await?
}
DdlExpr::CreateDatabase(expr) => {
self.statement_executor
.create_database(
&expr.schema_name,
expr.create_if_not_exists,
expr.options,
ctx.clone(),
)
.await?
}
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor
.drop_table(table_name, expr.drop_if_exists, ctx.clone())
.await?
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
self.statement_executor
.truncate_table(table_name, time_ranges, ctx.clone())
.await?
}
DdlExpr::CreateFlow(expr) => {
self.statement_executor
.create_flow_inner(expr, ctx.clone())
.await?
}
DdlExpr::DropFlow(DropFlowExpr {
catalog_name,
flow_name,
drop_if_exists,
..
}) => {
self.statement_executor
.drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
.await?
}
DdlExpr::CreateView(expr) => {
let _ = self
.statement_executor
.create_view_by_expr(expr, ctx.clone())
.await?;
attach_timer(output, timer)
}
Query::InsertIntoPlan(insert) => {
self.handle_insert_plan(insert, ctx.clone()).await?
}
Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
let prom_query = PromQuery {
query: promql.query,
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
alias: None,
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
ensure!(
result.len() == 1,
NotSupportedSnafu {
feat: "execute multiple statements in PromQL query string through GRPC interface"
}
);
let output = result.remove(0)?;
attach_timer(output, timer)
Output::new_with_affected_rows(0)
}
DdlExpr::DropView(_) => {
todo!("implemented in the following PR")
}
DdlExpr::CommentOn(expr) => {
self.statement_executor
.comment_by_expr(expr, ctx.clone())
.await?
}
}
}
}
Request::Ddl(request) => {
let mut expr = request.expr.context(IncompleteGrpcRequestSnafu {
err_msg: "'expr' is absent in DDL request",
})?;
};
fill_catalog_and_schema_from_context(&mut expr, &ctx);
let output = interceptor.post_execute(output, ctx)?;
Ok(output)
}
.await;
match expr {
DdlExpr::CreateTable(mut expr) => {
let _ = self
.statement_executor
.create_table_inner(&mut expr, None, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterDatabase(expr) => {
let _ = self
.statement_executor
.alter_database_inner(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::AlterTable(expr) => {
self.statement_executor
.alter_table_inner(expr, ctx.clone())
.await?
}
DdlExpr::CreateDatabase(expr) => {
self.statement_executor
.create_database(
&expr.schema_name,
expr.create_if_not_exists,
expr.options,
ctx.clone(),
)
.await?
}
DdlExpr::DropTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor
.drop_table(table_name, expr.drop_if_exists, ctx.clone())
.await?
}
DdlExpr::TruncateTable(expr) => {
let table_name =
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
let time_ranges = from_pb_time_ranges(expr.time_ranges.unwrap_or_default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
self.statement_executor
.truncate_table(table_name, time_ranges, ctx.clone())
.await?
}
DdlExpr::CreateFlow(expr) => {
self.statement_executor
.create_flow_inner(expr, ctx.clone())
.await?
}
DdlExpr::DropFlow(DropFlowExpr {
catalog_name,
flow_name,
drop_if_exists,
..
}) => {
self.statement_executor
.drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone())
.await?
}
DdlExpr::CreateView(expr) => {
let _ = self
.statement_executor
.create_view_by_expr(expr, ctx.clone())
.await?;
Output::new_with_affected_rows(0)
}
DdlExpr::DropView(_) => {
todo!("implemented in the following PR")
}
DdlExpr::CommentOn(expr) => {
self.statement_executor
.comment_by_expr(expr, ctx.clone())
.await?
}
}
}
};
let output = interceptor.post_execute(output, ctx)?;
Ok(output)
result
.map_err(BoxedError::new)
.context(server_error::ExecuteGrpcQuerySnafu)
}
async fn put_record_batch(
@@ -257,51 +265,126 @@ impl GrpcQueryHandler for Instance {
request: servers::grpc::flight::PutRecordBatchRequest,
table_ref: &mut Option<TableRef>,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
let table = if let Some(table) = table_ref {
table.clone()
} else {
let table = self
.catalog_manager()
.table(
&request.table_name.catalog_name,
&request.table_name.schema_name,
&request.table_name.table_name,
None,
) -> server_error::Result<AffectedRows> {
let result: Result<AffectedRows> = async {
let table = if let Some(table) = table_ref {
table.clone()
} else {
let table = self
.catalog_manager()
.table(
&request.table_name.catalog_name,
&request.table_name.schema_name,
&request.table_name.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: request.table_name.to_string(),
})?;
*table_ref = Some(table.clone());
table
};
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// do we check limit for bulk insert?
self.inserter
.handle_bulk_insert(
table,
request.flight_data,
request.record_batch,
request.schema_bytes,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: request.table_name.to_string(),
})?;
*table_ref = Some(table.clone());
table
};
.context(TableOperationSnafu)
}
.await;
let interceptor_ref = self.plugins.get::<GrpcQueryInterceptorRef<Error>>();
let interceptor = interceptor_ref.as_ref();
interceptor.pre_bulk_insert(table.clone(), ctx.clone())?;
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::BulkInsert)
.context(PermissionSnafu)?;
// do we check limit for bulk insert?
self.inserter
.handle_bulk_insert(
table,
request.flight_data,
request.record_batch,
request.schema_bytes,
)
.await
.context(TableOperationSnafu)
result
.map_err(BoxedError::new)
.context(server_error::ExecuteGrpcRequestSnafu)
}
fn handle_put_record_batch_stream(
&self,
stream: servers::grpc::flight::PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = server_error::Result<DoPutResponse>> + Send>> {
Box::pin(
self.handle_put_record_batch_stream_inner(stream, ctx)
.map(|result| {
result
.map_err(BoxedError::new)
.context(server_error::ExecuteGrpcRequestSnafu)
}),
)
}
}
fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
macro_rules! check_and_fill {
($expr:ident) => {
if $expr.catalog_name.is_empty() {
$expr.catalog_name = catalog.to_string();
}
if $expr.schema_name.is_empty() {
$expr.schema_name = schema.to_string();
}
};
}
match ddl_expr {
Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
Expr::CreateTable(expr) => {
check_and_fill!(expr);
}
Expr::AlterTable(expr) => {
check_and_fill!(expr);
}
Expr::DropTable(expr) => {
check_and_fill!(expr);
}
Expr::TruncateTable(expr) => {
check_and_fill!(expr);
}
Expr::CreateFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::DropFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::CreateView(expr) => {
check_and_fill!(expr);
}
Expr::DropView(expr) => {
check_and_fill!(expr);
}
Expr::CommentOn(expr) => {
check_and_fill!(expr);
}
}
}
impl Instance {
fn handle_put_record_batch_stream_inner(
&self,
mut stream: servers::grpc::flight::PutRecordBatchRequestStream,
ctx: QueryContextRef,
@@ -372,60 +455,7 @@ impl GrpcQueryHandler for Instance {
}
})
}
}
fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
macro_rules! check_and_fill {
($expr:ident) => {
if $expr.catalog_name.is_empty() {
$expr.catalog_name = catalog.to_string();
}
if $expr.schema_name.is_empty() {
$expr.schema_name = schema.to_string();
}
};
}
match ddl_expr {
Expr::CreateDatabase(_) | Expr::AlterDatabase(_) => { /* do nothing*/ }
Expr::CreateTable(expr) => {
check_and_fill!(expr);
}
Expr::AlterTable(expr) => {
check_and_fill!(expr);
}
Expr::DropTable(expr) => {
check_and_fill!(expr);
}
Expr::TruncateTable(expr) => {
check_and_fill!(expr);
}
Expr::CreateFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::DropFlow(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::CreateView(expr) => {
check_and_fill!(expr);
}
Expr::DropView(expr) => {
check_and_fill!(expr);
}
Expr::CommentOn(expr) => {
check_and_fill!(expr);
}
}
}
impl Instance {
async fn handle_insert_plan(
&self,
insert: InsertIntoPlan,
@@ -493,7 +523,9 @@ impl Instance {
// Optimize the plan
let optimized_plan = state.optimize(&analyzed_plan).context(DataFusionSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
let output = self
.do_exec_plan_inner(None, optimized_plan, ctx.clone())
.await?;
Ok(attach_timer(output, timer))
}

View File

@@ -38,8 +38,6 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::{Server, ServerHandlers};
use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
@@ -105,7 +103,7 @@ where
) -> HttpServerBuilder {
let mut builder = HttpServerBuilder::new(opts.http.clone())
.with_memory_limiter(request_memory_limiter)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
.with_sql_handler(self.instance.clone());
let validator = self.plugins.get::<LogValidatorRef>();
let ingest_interceptor = self.plugins.get::<LogIngestInterceptorRef<ServerError>>();
@@ -212,7 +210,7 @@ where
};
let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
self.instance.clone(),
user_provider.clone(),
runtime,
grpc.flight_compression,
@@ -336,10 +334,7 @@ where
let mysql_server = MysqlServer::create_server(
common_runtime::global_runtime(),
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
user_provider.clone(),
)),
Arc::new(MysqlSpawnRef::new(instance.clone(), user_provider.clone())),
Arc::new(MysqlSpawnConfig::new(
opts.tls.should_force_tls(),
tls_server_config,
@@ -364,7 +359,7 @@ where
maybe_watch_server_tls_config(tls_server_config.clone()).context(StartServerSnafu)?;
let pg_server = Box::new(PostgresServer::new(
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
instance.clone(),
opts.tls.should_force_tls(),
tls_server_config,
opts.keep_alive.as_secs(),

View File

@@ -1318,9 +1318,8 @@ mod test {
use tokio::time::Instant;
use super::*;
use crate::error::Error;
use crate::http::test_helpers::TestClient;
use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
use crate::query_handler::sql::SqlQueryHandler;
struct DummyInstance {
_tx: mpsc::Sender<(String, Vec<u8>)>,
@@ -1328,17 +1327,11 @@ mod test {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
@@ -1347,7 +1340,7 @@ mod test {
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
) -> Result<Output> {
unimplemented!()
}
@@ -1378,9 +1371,8 @@ mod test {
fn make_test_app_custom(tx: mpsc::Sender<(String, Vec<u8>)>, options: HttpOptions) -> Router {
let instance = Arc::new(DummyInstance { _tx: tx });
let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
let server = HttpServerBuilder::new(options)
.with_sql_handler(sql_instance)
.with_sql_handler(instance.clone())
.build();
server.build(server.make_app()).unwrap().route(
"/test/timeout",

View File

@@ -18,96 +18,33 @@ use std::sync::Arc;
use api::v1::greptime_request::Request;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::do_put::DoPutResponse;
use common_query::Output;
use futures::Stream;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::TableRef;
use crate::error::{self, Result};
use crate::error::Result;
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
pub type GrpcQueryHandlerRef<E> = Arc<dyn GrpcQueryHandler<Error = E> + Send + Sync>;
pub type ServerGrpcQueryHandlerRef = GrpcQueryHandlerRef<error::Error>;
pub type ServerGrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
pub type RawRecordBatch = bytes::Bytes;
#[async_trait]
pub trait GrpcQueryHandler {
type Error: ErrorExt;
async fn do_query(
&self,
query: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error>;
async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output>;
async fn put_record_batch(
&self,
request: PutRecordBatchRequest,
table_ref: &mut Option<TableRef>,
ctx: QueryContextRef,
) -> std::result::Result<AffectedRows, Self::Error>;
) -> Result<AffectedRows>;
fn handle_put_record_batch_stream(
&self,
stream: PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>>;
}
pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);
impl<E> ServerGrpcQueryHandlerAdapter<E> {
pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
Arc::new(Self(handler))
}
}
#[async_trait]
impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
where
E: ErrorExt + Send + Sync + 'static,
{
type Error = error::Error;
async fn do_query(&self, query: Request, ctx: QueryContextRef) -> Result<Output> {
self.0
.do_query(query, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
async fn put_record_batch(
&self,
request: PutRecordBatchRequest,
table_ref: &mut Option<TableRef>,
ctx: QueryContextRef,
) -> Result<AffectedRows> {
self.0
.put_record_batch(request, table_ref, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu)
}
fn handle_put_record_batch_stream(
&self,
stream: PutRecordBatchRequestStream,
ctx: QueryContextRef,
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>> {
use futures_util::StreamExt;
Box::pin(
self.0
.handle_put_record_batch_stream(stream, ctx)
.map(|result| {
result
.map_err(|e| BoxedError::new(e))
.context(error::ExecuteGrpcRequestSnafu)
}),
)
}
) -> Pin<Box<dyn Stream<Item = Result<DoPutResponse>> + Send>>;
}

View File

@@ -15,123 +15,39 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use session::context::QueryContextRef;
use snafu::ResultExt;
use sql::statements::statement::Statement;
use crate::error::{self, Result};
use crate::error::Result;
pub type SqlQueryHandlerRef<E> = Arc<dyn SqlQueryHandler<Error = E> + Send + Sync>;
pub type ServerSqlQueryHandlerRef = SqlQueryHandlerRef<error::Error>;
use query::query_engine::DescribeResult;
pub type ServerSqlQueryHandlerRef = Arc<dyn SqlQueryHandler + Send + Sync>;
#[async_trait]
pub trait SqlQueryHandler {
type Error: ErrorExt;
async fn do_query(
&self,
query: &str,
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>>;
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error>;
) -> Result<Output>;
async fn do_promql_query(
&self,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>>;
) -> Vec<Result<Output>>;
async fn do_describe(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> std::result::Result<Option<DescribeResult>, Self::Error>;
) -> Result<Option<DescribeResult>>;
async fn is_valid_schema(
&self,
catalog: &str,
schema: &str,
) -> std::result::Result<bool, Self::Error>;
}
pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);
impl<E> ServerSqlQueryHandlerAdapter<E> {
pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
Arc::new(Self(handler))
}
}
#[async_trait]
impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
where
E: ErrorExt + Send + Sync + 'static,
{
type Error = error::Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
self.0
.do_query(query, query_ctx)
.await
.into_iter()
.map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
.collect()
}
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
self.0
.do_exec_plan(stmt, plan, query_ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecutePlanSnafu)
}
async fn do_promql_query(
&self,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> Vec<Result<Output>> {
self.0
.do_promql_query(query, query_ctx)
.await
.into_iter()
.map(|x| x.map_err(BoxedError::new).context(error::ExecuteQuerySnafu))
.collect()
}
async fn do_describe(
&self,
stmt: Statement,
query_ctx: QueryContextRef,
) -> Result<Option<DescribeResult>> {
self.0
.do_describe(stmt, query_ctx)
.await
.map_err(BoxedError::new)
.context(error::DescribeStatementSnafu)
}
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool> {
self.0
.is_valid_schema(catalog, schema)
.await
.map_err(BoxedError::new)
.context(error::CheckDatabaseValiditySnafu)
}
async fn is_valid_schema(&self, catalog: &str, schema: &str) -> Result<bool>;
}

View File

@@ -23,7 +23,7 @@ use common_test_util::ports;
use datafusion_expr::LogicalPlan;
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::error::Result;
use servers::http::header::constants::GREPTIME_DB_HEADER_NAME;
use servers::http::test_helpers::TestClient;
use servers::http::{HttpOptions, HttpServerBuilder};
@@ -52,8 +52,6 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
@@ -63,15 +61,11 @@ impl SqlQueryHandler for DummyInstance {
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
) -> Result<Output> {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}

View File

@@ -52,8 +52,6 @@ impl OpentsdbProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = error::Error;
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
@@ -63,15 +61,11 @@ impl SqlQueryHandler for DummyInstance {
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
) -> Result<Output> {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}

View File

@@ -26,7 +26,7 @@ use datafusion_expr::LogicalPlan;
use prost::Message;
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use servers::error::{Error, Result};
use servers::error::Result;
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::test_helpers::TestClient;
use servers::http::{HttpOptions, HttpServerBuilder, PromValidationMode};
@@ -80,8 +80,6 @@ impl PromStoreProtocolHandler for DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(&self, _: &str, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
@@ -91,15 +89,11 @@ impl SqlQueryHandler for DummyInstance {
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
) -> Result<Output> {
unimplemented!()
}
async fn do_promql_query(
&self,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}

View File

@@ -27,7 +27,7 @@ use query::options::QueryOptions;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error::{Error, NotSupportedSnafu, Result};
use servers::error::{NotSupportedSnafu, Result};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use session::context::QueryContextRef;
@@ -52,8 +52,6 @@ impl DummyInstance {
#[async_trait]
impl SqlQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let stmt = QueryLanguageParser::parse_sql(query, &query_ctx).unwrap();
let plan = self
@@ -75,11 +73,7 @@ impl SqlQueryHandler for DummyInstance {
Ok(self.query_engine.execute(plan, query_ctx).await.unwrap())
}
async fn do_promql_query(
&self,
_: &PromQuery,
_: QueryContextRef,
) -> Vec<std::result::Result<Output, Self::Error>> {
async fn do_promql_query(&self, _: &PromQuery, _: QueryContextRef) -> Vec<Result<Output>> {
unimplemented!()
}
@@ -109,13 +103,7 @@ impl SqlQueryHandler for DummyInstance {
#[async_trait]
impl GrpcQueryHandler for DummyInstance {
type Error = Error;
async fn do_query(
&self,
request: Request,
ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {
async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result<Output> {
let output = match request {
Request::Inserts(_)
| Request::Deletes(_)
@@ -166,7 +154,7 @@ impl GrpcQueryHandler for DummyInstance {
_request: servers::grpc::flight::PutRecordBatchRequest,
_table_ref: &mut Option<TableRef>,
_ctx: QueryContextRef,
) -> std::result::Result<AffectedRows, Self::Error> {
) -> Result<AffectedRows> {
unimplemented!()
}
@@ -174,9 +162,7 @@ impl GrpcQueryHandler for DummyInstance {
&self,
_stream: servers::grpc::flight::PutRecordBatchRequestStream,
_ctx: QueryContextRef,
) -> std::pin::Pin<
Box<dyn futures::Stream<Item = std::result::Result<DoPutResponse, Self::Error>> + Send>,
> {
) -> std::pin::Pin<Box<dyn futures::Stream<Item = Result<DoPutResponse>> + Send>> {
unimplemented!()
}
}

View File

@@ -35,7 +35,6 @@ mod test {
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{FlightCompression, GrpcServerConfig};
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::server::Server;
use crate::cluster::GreptimeDbClusterBuilder;
@@ -88,7 +87,7 @@ mod test {
let runtime = common_runtime::global_runtime().clone();
let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(db.frontend.instance.clone()),
db.frontend.instance.clone(),
user_provider_from_option("static_user_provider:cmd:greptime_user=greptime_pwd").ok(),
Some(runtime.clone()),
FlightCompression::default(),

View File

@@ -23,13 +23,15 @@ mod tests {
use client::OutputData;
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::key::table_name::TableNameKey;
use common_meta::rpc::router::region_distribution;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::debug;
use datafusion_expr::LogicalPlan;
use frontend::error::{self, Error, Result};
use frontend::error::{Error, Result};
use frontend::instance::Instance;
use query::parser::QueryLanguageParser;
use query::query_engine::DefaultSerializer;
@@ -443,7 +445,7 @@ mod tests {
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
assert_eq!(e.status_code(), StatusCode::Unsupported);
} else {
unreachable!();
}
@@ -453,7 +455,7 @@ mod tests {
.await
.remove(0)
{
assert!(matches!(e, error::Error::NotSupported { .. }));
assert_eq!(e.status_code(), StatusCode::Unsupported);
} else {
unreachable!();
}

View File

@@ -48,8 +48,7 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
use servers::query_handler::sql::SqlQueryHandler;
use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::Server;
use servers::tls::ReloadableTlsServerConfig;
@@ -416,9 +415,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
..Default::default()
};
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(
instance.fe_instance().clone(),
))
.with_sql_handler(instance.fe_instance().clone())
.with_logs_handler(instance.fe_instance().clone())
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.opts.datanode_options().to_toml().unwrap())
@@ -474,9 +471,7 @@ pub async fn setup_test_http_app_with_frontend_and_custom_options(
});
let mut http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(
instance.fe_instance().clone(),
))
.with_sql_handler(instance.fe_instance().clone())
.with_log_ingest_handler(instance.fe_instance().clone(), None, None)
.with_logs_handler(instance.fe_instance().clone())
.with_influxdb_handler(instance.fe_instance().clone())
@@ -563,7 +558,7 @@ pub async fn setup_test_prom_app_with_frontend(
};
let frontend_ref = instance.fe_instance().clone();
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_sql_handler(frontend_ref.clone())
.with_logs_handler(instance.fe_instance().clone())
.with_prom_handler(
frontend_ref.clone(),
@@ -611,7 +606,7 @@ pub async fn setup_grpc_server_with(
let fe_instance_ref = instance.fe_instance().clone();
let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
fe_instance_ref.clone(),
user_provider.clone(),
Some(runtime.clone()),
FlightCompression::default(),
@@ -677,10 +672,7 @@ pub async fn setup_mysql_server_with_user_provider(
};
let mut mysql_server = MysqlServer::create_server(
runtime,
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdapter::arc(fe_instance_ref),
user_provider,
)),
Arc::new(MysqlSpawnRef::new(fe_instance_ref, user_provider)),
Arc::new(MysqlSpawnConfig::new(
false,
Arc::new(
@@ -735,7 +727,7 @@ pub async fn setup_pg_server_with_user_provider(
);
let mut pg_server = Box::new(PostgresServer::new(
ServerSqlQueryHandlerAdapter::arc(fe_instance_ref),
fe_instance_ref,
opts.tls.should_force_tls(),
tls_server_config,
0,

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::env;
use std::sync::Arc;
use client::{DEFAULT_SCHEMA_NAME, OutputData};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::ErrorExt;
use common_query::Output;
use common_recordbatch::util;
use common_test_util::recordbatch::check_output_stream;
@@ -25,11 +25,12 @@ use common_test_util::temp_dir;
use datatypes::arrow::array::{
ArrayRef, AsArray, StringArray, TimestampMillisecondArray, UInt64Array,
};
use frontend::error::{Error, Result};
use frontend::error::Error;
use frontend::instance::Instance;
use operator::error::Error as OperatorError;
use rstest::rstest;
use rstest_reuse::apply;
use servers::error as server_error;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
@@ -176,7 +177,11 @@ async fn test_extra_external_table_options(instance: Arc<dyn MockInstance>) {
);
let result = try_execute_sql(&frontend, sql).await;
assert!(matches!(result, Err(Error::ParseSql { .. })));
let err = result.unwrap_err();
assert!(matches!(
unwrap_frontend_error(&err),
Error::ParseSql { .. }
));
}
#[apply(both_instances_cases)]
@@ -385,19 +390,22 @@ async fn test_execute_insert_by_select(instance: Arc<dyn MockInstance>) {
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let err = try_execute_sql(&instance, "insert into demo2(host) select * from demo1")
.await
.unwrap_err();
assert!(matches!(
try_execute_sql(&instance, "insert into demo2(host) select * from demo1")
.await
.unwrap_err(),
unwrap_frontend_error(&err),
Error::TableOperation {
source: OperatorError::PlanStatement { .. },
..
}
));
let err = try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1")
.await
.unwrap_err();
assert!(matches!(
try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1")
.await
.unwrap_err(),
unwrap_frontend_error(&err),
Error::TableOperation {
source: OperatorError::PlanStatement { .. },
..
@@ -633,7 +641,11 @@ async fn test_execute_external_create_without_ts(instance: Arc<dyn MockInstance>
),
)
.await;
assert_matches!(result, Err(Error::ParseSql { .. }));
let err = result.unwrap_err();
assert!(matches!(
unwrap_frontend_error(&err),
Error::ParseSql { .. }
));
}
#[apply(both_instances_cases)]
@@ -655,7 +667,11 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc<dyn MockInst
),
)
.await;
assert!(matches!(result, Err(Error::ParseSql { .. })));
let err = result.unwrap_err();
assert!(matches!(
unwrap_frontend_error(&err),
Error::ParseSql { .. }
));
let result = try_execute_sql(
&instance,
@@ -669,7 +685,11 @@ async fn test_execute_external_create_with_invalid_ts(instance: Arc<dyn MockInst
),
)
.await;
assert!(matches!(result, Err(Error::ParseSql { .. })));
let err = result.unwrap_err();
assert!(matches!(
unwrap_frontend_error(&err),
Error::ParseSql { .. }
));
}
#[apply(both_instances_cases)]
@@ -2081,11 +2101,29 @@ async fn test_information_schema_dot_columns(instance: Arc<dyn MockInstance>) {
check_output_stream(output, expected).await;
}
fn unwrap_frontend_error(err: &server_error::Error) -> &Error {
match err {
server_error::Error::ExecuteQuery { source, .. }
| server_error::Error::ExecutePlan { source, .. }
| server_error::Error::ExecuteGrpcQuery { source, .. }
| server_error::Error::ExecuteGrpcRequest { source, .. }
| server_error::Error::CheckDatabaseValidity { source, .. } => source
.as_any()
.downcast_ref::<Error>()
.expect("Expected frontend::error::Error inside BoxedError"),
server_error::Error::DescribeStatement { source } => source
.as_any()
.downcast_ref::<Error>()
.expect("Expected frontend::error::Error inside BoxedError"),
other => panic!("Unexpected error type: {other}"),
}
}
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}
async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> server_error::Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
@@ -2093,7 +2131,7 @@ async fn try_execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
) -> server_error::Result<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}

View File

@@ -33,7 +33,6 @@ use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::Datanode;
use frontend::error::Result;
use frontend::instance::Instance;
use futures::TryStreamExt;
use meta_srv::metasrv::Metasrv;
@@ -439,7 +438,10 @@ pub fn find_testing_resource(path: &str) -> String {
prepare_path(&p)
}
pub async fn try_execute_sql(instance: &Arc<Instance>, sql: &str) -> Result<Output> {
pub async fn try_execute_sql(
instance: &Arc<Instance>,
sql: &str,
) -> servers::error::Result<Output> {
try_execute_sql_with(instance, sql, QueryContext::arc()).await
}
@@ -447,7 +449,7 @@ pub async fn try_execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
) -> servers::error::Result<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}

View File

@@ -36,7 +36,6 @@ use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::UInt64Type;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::future::BoxFuture;
use meta_srv::error;
@@ -52,6 +51,7 @@ use meta_srv::procedure::region_migration::{
};
use meta_srv::selector::{Selector, SelectorOptions};
use sea_query::{Expr, Iden, Order, PostgresQueryBuilder, Query};
use servers::error::Result as ServerResult;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::storage::RegionId;
@@ -1258,7 +1258,7 @@ async fn query_procedure_by_sql(instance: &Arc<Instance>, pid: &str) -> String {
column.value(0).to_string()
}
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<ServerResult<Output>> {
let query_ctx = QueryContext::arc();
let mut results = Vec::new();
@@ -1279,7 +1279,7 @@ async fn run_sql(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
) -> ServerResult<Output> {
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}

View File

@@ -23,11 +23,11 @@ use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::DatanodeWalConfig;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use meta_srv::gc::{self, BatchGcProcedure, GcSchedulerOptions, GcTickerRef};
use meta_srv::metasrv::Metasrv;
use mito2::gc::GcConfig;
use servers::error::Result as ServerResult;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::codec::PrimaryKeyEncoding;
@@ -867,7 +867,7 @@ async fn run_sql(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
) -> ServerResult<Output> {
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}