fix: ease the restriction of the original "SelectExpr" (#794)

fix: ease the restriction of the original "SelectExpr" since we used to pass SQLs other than selection in the related GRPC interface
This commit is contained in:
LFC
2022-12-27 16:50:12 +08:00
committed by GitHub
parent 26a3e93ca7
commit a14ec94653
5 changed files with 31 additions and 49 deletions

View File

@@ -18,15 +18,12 @@ message ObjectExpr {
ExprHeader header = 1;
oneof expr {
InsertExpr insert = 2;
SelectExpr select = 3;
UpdateExpr update = 4;
DeleteExpr delete = 5;
QueryRequest query = 3;
}
}
// TODO(fys): Only support sql now, and will support promql etc in the future
message SelectExpr {
oneof expr {
message QueryRequest {
oneof query {
string sql = 1;
bytes logical_plan = 2;
}
@@ -48,11 +45,6 @@ message InsertExpr {
uint32 region_number = 5;
}
// TODO(jiachun)
message UpdateExpr {}
// TODO(jiachun)
message DeleteExpr {}
message ObjectResult {
ResultHeader header = 1;
oneof result {

View File

@@ -17,8 +17,8 @@ use std::sync::Arc;
use api::v1::codec::SelectResult as GrpcSelectResult;
use api::v1::column::SemanticType;
use api::v1::{
object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr,
object_expr, object_result, query_request, DatabaseRequest, ExprHeader, InsertExpr,
MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest,
};
use common_error::status_code::StatusCode;
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
@@ -83,28 +83,28 @@ impl Database {
pub async fn select(&self, expr: Select) -> Result<ObjectResult> {
let select_expr = match expr {
Select::Sql(sql) => SelectExpr {
expr: Some(select_expr::Expr::Sql(sql)),
Select::Sql(sql) => QueryRequest {
query: Some(query_request::Query::Sql(sql)),
},
};
self.do_select(select_expr).await
}
pub async fn logical_plan(&self, logical_plan: Vec<u8>) -> Result<ObjectResult> {
let select_expr = SelectExpr {
expr: Some(select_expr::Expr::LogicalPlan(logical_plan)),
let select_expr = QueryRequest {
query: Some(query_request::Query::LogicalPlan(logical_plan)),
};
self.do_select(select_expr).await
}
async fn do_select(&self, select_expr: SelectExpr) -> Result<ObjectResult> {
async fn do_select(&self, select_expr: QueryRequest) -> Result<ObjectResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = ObjectExpr {
header: Some(header),
expr: Some(object_expr::Expr::Select(select_expr)),
expr: Some(object_expr::Expr::Query(select_expr)),
};
let obj_result = self.object(expr).await?;

View File

@@ -17,7 +17,7 @@ mod stream;
use std::pin::Pin;
use api::v1::object_expr::Expr;
use api::v1::select_expr::Expr as SelectExpr;
use api::v1::query_request::Query;
use api::v1::ObjectExpr;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
@@ -29,8 +29,7 @@ use common_query::Output;
use futures::Stream;
use prost::Message;
use session::context::QueryContext;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Streaming};
use crate::error::{self, Result};
@@ -81,18 +80,15 @@ impl FlightService for Instance {
.expr
.context(error::MissingRequiredFieldSnafu { name: "expr" })?;
match expr {
Expr::Select(select_expr) => {
let select_expr = select_expr
.expr
Expr::Query(query_request) => {
let query = query_request
.query
.context(error::MissingRequiredFieldSnafu { name: "expr" })?;
let stream = self.handle_select_expr(select_expr).await?;
let stream = self.handle_query(query).await?;
Ok(Response::new(Box::pin(stream) as TonicStream<FlightData>))
}
// TODO(LFC): Implement Insertion Flight interface.
Expr::Insert(_) => Err(tonic::Status::unimplemented("Not yet implemented")),
Expr::Update(_) | Expr::Delete(_) => {
Err(tonic::Status::unimplemented("Not yet implemented"))
}
}
}
@@ -134,22 +130,16 @@ impl FlightService for Instance {
}
impl Instance {
async fn handle_select_expr(&self, select_expr: SelectExpr) -> Result<GetStream> {
let output = match select_expr {
SelectExpr::Sql(sql) => {
async fn handle_query(&self, query: Query) -> Result<GetStream> {
let output = match query {
Query::Sql(sql) => {
let stmt = self
.query_engine
.sql_to_statement(&sql)
.context(error::ExecuteSqlSnafu)?;
ensure!(
matches!(stmt, Statement::Query(_)),
error::InvalidSqlSnafu {
msg: format!("expect SQL to be selection, actual: {sql}")
}
);
self.execute_stmt(stmt, QueryContext::arc()).await?
}
SelectExpr::LogicalPlan(plan) => self.execute_logical(plan).await?,
Query::LogicalPlan(plan) => self.execute_logical(plan).await?,
};
let recordbatch_stream = match output {

View File

@@ -15,7 +15,7 @@
use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder};
use api::v1::{
admin_expr, object_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr, ObjectExpr,
ObjectResult, SelectExpr,
ObjectResult, QueryRequest,
};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
@@ -105,11 +105,11 @@ impl Instance {
}
}
async fn handle_select(&self, select_expr: SelectExpr) -> Result<ObjectResult> {
async fn handle_query_request(&self, query_request: QueryRequest) -> Result<ObjectResult> {
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
header: None,
expr: Some(object_expr::Expr::Select(select_expr)),
expr: Some(object_expr::Expr::Query(query_request)),
}
.encode_to_vec(),
});
@@ -169,12 +169,12 @@ impl GrpcQueryHandler for Instance {
self.handle_insert(catalog_name, schema_name, table_name, insert_batches)
.await
}
Some(object_expr::Expr::Select(select_expr)) => self
.handle_select(select_expr.clone())
Some(object_expr::Expr::Query(query_request)) => self
.handle_query_request(query_request.clone())
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteQuerySnafu {
query: format!("{select_expr:?}"),
query: format!("{query_request:?}"),
})?,
other => {
return servers::error::NotSupportedSnafu {

View File

@@ -721,9 +721,9 @@ mod tests {
use api::v1::column::SemanticType;
use api::v1::{
admin_expr, admin_result, column, object_expr, object_result, select_expr, Column,
admin_expr, admin_result, column, object_expr, object_result, query_request, Column,
ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, FlightDataRaw, MutateResult,
SelectExpr,
QueryRequest,
};
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
use common_recordbatch::RecordBatch;
@@ -930,8 +930,8 @@ mod tests {
// select
let object_expr = ObjectExpr {
header: Some(ExprHeader::default()),
expr: Some(object_expr::Expr::Select(SelectExpr {
expr: Some(select_expr::Expr::Sql("select * from demo".to_string())),
expr: Some(Expr::Query(QueryRequest {
query: Some(query_request::Query::Sql("select * from demo".to_string())),
})),
};
let result = GrpcQueryHandler::do_query(&*instance, object_expr)