refactor: remove AdminExpr, make DDL expressions as normal GRPC requests (#808)

* refactor: remove AdminExpr, make DDL expressions as normal GRPC requests
This commit is contained in:
LFC
2022-12-30 16:47:45 +08:00
committed by GitHub
parent 11194f37d4
commit d13de0aeba
30 changed files with 464 additions and 730 deletions

View File

@@ -25,7 +25,6 @@ use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArr
use arrow::datatypes::{DataType, Float64Type, Int64Type};
use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::admin::Admin;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId};
use client::{Client, Database};
@@ -362,13 +361,11 @@ fn query_set() -> HashMap<String, String> {
ret
}
async fn do_write(args: &Args, client: &Client) {
let admin = Admin::new("admin", client.clone());
async fn do_write(args: &Args, db: &Database) {
let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument"));
let mut write_jobs = JoinSet::new();
let create_table_result = admin.create(create_table_expr()).await;
let create_table_result = db.create(create_table_expr()).await;
println!("Create table result: {create_table_result:?}");
let progress_bar_style = ProgressStyle::with_template(
@@ -383,7 +380,7 @@ async fn do_write(args: &Args, client: &Client) {
let batch_size = args.batch_size;
for _ in 0..args.thread_num {
if let Some(path) = file_list.pop() {
let db = Database::new(DATABASE_NAME, client.clone());
let db = db.clone();
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
@@ -392,7 +389,7 @@ async fn do_write(args: &Args, client: &Client) {
while write_jobs.join_next().await.is_some() {
file_progress.inc(1);
if let Some(path) = file_list.pop() {
let db = Database::new(DATABASE_NAME, client.clone());
let db = db.clone();
let mpb = multi_progress_bar.clone();
let pb_style = progress_bar_style.clone();
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
@@ -427,13 +424,13 @@ fn main() {
.unwrap()
.block_on(async {
let client = Client::with_urls(vec![&args.endpoint]);
let db = Database::new(DATABASE_NAME, client);
if !args.skip_write {
do_write(&args, &client).await;
do_write(&args, &db).await;
}
if !args.skip_read {
let db = Database::new(DATABASE_NAME, client.clone());
do_query(args.iter_num, &db).await;
}
})

View File

@@ -6,17 +6,8 @@ message RequestHeader {
string tenant = 1;
}
message ExprHeader {
uint32 version = 1;
}
message ResultHeader {
uint32 version = 1;
uint32 code = 2;
string err_msg = 3;
}
message MutateResult {
uint32 success = 1;
uint32 failure = 2;
}

View File

@@ -2,6 +2,7 @@ syntax = "proto3";
package greptime.v1;
import "greptime/v1/ddl.proto";
import "greptime/v1/column.proto";
import "greptime/v1/common.proto";
@@ -18,6 +19,7 @@ message ObjectExpr {
oneof request {
InsertRequest insert = 1;
QueryRequest query = 2;
DdlRequest ddl = 3;
}
}

View File

@@ -5,29 +5,15 @@ package greptime.v1;
import "greptime/v1/column.proto";
import "greptime/v1/common.proto";
message AdminRequest {
string name = 1;
repeated AdminExpr exprs = 2;
}
message AdminResponse {
repeated AdminResult results = 1;
}
message AdminExpr {
ExprHeader header = 1;
// "Data Definition Language" requests, that create, modify or delete the database structures but not the data.
// `DdlRequest` could carry more information than plain SQL, for example, the "table_id" in `CreateTableExpr`.
// So create a new DDL expr if you need it.
message DdlRequest {
oneof expr {
CreateDatabaseExpr create_database = 1;
CreateTableExpr create_table = 2;
AlterExpr alter = 3;
CreateDatabaseExpr create_database = 4;
DropTableExpr drop_table = 5;
}
}
message AdminResult {
ResultHeader header = 1;
oneof result {
MutateResult mutate = 2;
DropTableExpr drop_table = 4;
}
}

View File

@@ -2,7 +2,6 @@ syntax = "proto3";
package greptime.v1;
import "greptime/v1/admin.proto";
import "greptime/v1/common.proto";
import "greptime/v1/database.proto";
@@ -12,11 +11,9 @@ service Greptime {
message BatchRequest {
RequestHeader header = 1;
repeated AdminRequest admins = 2;
repeated DatabaseRequest databases = 3;
repeated DatabaseRequest databases = 2;
}
message BatchResponse {
repeated AdminResponse admins = 1;
repeated DatabaseResponse databases = 2;
repeated DatabaseResponse databases = 1;
}

View File

@@ -15,13 +15,10 @@
use arrow_flight::FlightData;
use prost::Message;
use crate::v1::{admin_result, AdminResult, MutateResult, ObjectResult, ResultHeader};
use crate::v1::{ObjectResult, ResultHeader};
pub const PROTOCOL_VERSION: u32 = 1;
pub type Success = u32;
pub type Failure = u32;
#[derive(Default)]
pub struct ObjectResultBuilder {
version: u32,
@@ -81,61 +78,6 @@ impl ObjectResultBuilder {
}
}
#[derive(Debug)]
pub struct AdminResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
mutate: Option<(Success, Failure)>,
}
impl AdminResultBuilder {
pub fn status_code(mut self, code: u32) -> Self {
self.code = code;
self
}
pub fn err_msg(mut self, err_msg: String) -> Self {
self.err_msg = Some(err_msg);
self
}
pub fn mutate_result(mut self, success: u32, failure: u32) -> Self {
self.mutate = Some((success, failure));
self
}
pub fn build(self) -> AdminResult {
let header = Some(ResultHeader {
version: self.version,
code: self.code,
err_msg: self.err_msg.unwrap_or_default(),
});
let result = if let Some((success, failure)) = self.mutate {
Some(admin_result::Result::Mutate(MutateResult {
success,
failure,
}))
} else {
None
};
AdminResult { header, result }
}
}
impl Default for AdminResultBuilder {
fn default() -> Self {
Self {
version: PROTOCOL_VERSION,
code: 0,
err_msg: None,
mutate: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId};
use client::admin::Admin;
use client::{Client, Database};
use prost_09::Message;
use substrait_proto::protobuf::plan_rel::RelType as PlanRelType;
@@ -66,8 +65,8 @@ async fn run() {
region_ids: vec![0],
};
let admin = Admin::new("create table", client.clone());
let result = admin.create(create_table_expr).await.unwrap();
let db = Database::new("create table", client.clone());
let result = db.create(create_table_expr).await.unwrap();
event!(Level::INFO, "create table result: {:#?}", result);
let logical = mock_logical_plan();

View File

@@ -1,137 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::*;
use common_error::prelude::StatusCode;
use common_query::Output;
use snafu::prelude::*;
use crate::database::PROTOCOL_VERSION;
use crate::{error, Client, Result};
#[derive(Clone, Debug)]
pub struct Admin {
name: String,
client: Client,
}
impl Admin {
pub fn new(name: impl Into<String>, client: Client) -> Self {
Self {
name: name.into(),
client,
}
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
self.do_request(expr).await
}
pub async fn do_request(&self, expr: AdminExpr) -> Result<AdminResult> {
// `remove(0)` is safe because of `do_requests`'s invariants.
Ok(self.do_requests(vec![expr]).await?.remove(0))
}
pub async fn alter(&self, expr: AlterExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::Alter(expr)),
};
self.do_request(expr).await
}
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::DropTable(expr)),
};
self.do_request(expr).await
}
/// Invariants: the lengths of input vec (`Vec<AdminExpr>`) and output vec (`Vec<AdminResult>`) are equal.
async fn do_requests(&self, exprs: Vec<AdminExpr>) -> Result<Vec<AdminResult>> {
let expr_count = exprs.len();
let req = AdminRequest {
name: self.name.clone(),
exprs,
};
let resp = self.client.admin(req).await?;
let results = resp.results;
ensure!(
results.len() == expr_count,
error::MissingResultSnafu {
name: "admin_results",
expected: expr_count,
actual: results.len(),
}
);
Ok(results)
}
pub async fn create_database(&self, expr: CreateDatabaseExpr) -> Result<AdminResult> {
let header = ExprHeader {
version: PROTOCOL_VERSION,
};
let expr = AdminExpr {
header: Some(header),
expr: Some(admin_expr::Expr::CreateDatabase(expr)),
};
Ok(self.do_requests(vec![expr]).await?.remove(0))
}
}
pub fn admin_result_to_output(admin_result: AdminResult) -> Result<Output> {
let header = admin_result.header.context(error::MissingHeaderSnafu)?;
if !StatusCode::is_success(header.code) {
return error::DatanodeSnafu {
code: header.code,
msg: header.err_msg,
}
.fail();
}
let result = admin_result.result.context(error::MissingResultSnafu {
name: "result".to_string(),
expected: 1_usize,
actual: 0_usize,
})?;
let output = match result {
admin_result::Result::Mutate(mutate) => {
if mutate.failure != 0 {
return error::MutateFailureSnafu {
failure: mutate.failure,
}
.fail();
}
Output::AffectedRows(mutate.success as usize)
}
};
Ok(output)
}

View File

@@ -104,20 +104,6 @@ impl Client {
self.inner.set_peers(urls);
}
pub async fn admin(&self, req: AdminRequest) -> Result<AdminResponse> {
let req = BatchRequest {
admins: vec![req],
..Default::default()
};
let mut res = self.batch(req).await?;
res.admins.pop().context(error::MissingResultSnafu {
name: "admins",
expected: 1_usize,
actual: 0_usize,
})
}
pub async fn database(&self, req: DatabaseRequest) -> Result<DatabaseResponse> {
let req = BatchRequest {
databases: vec![req],

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::{
object_expr, query_request, DatabaseRequest, InsertRequest, ObjectExpr,
ObjectResult as GrpcObjectResult, QueryRequest,
object_expr, query_request, AlterExpr, CreateTableExpr, DatabaseRequest, DdlRequest,
DropTableExpr, InsertRequest, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest,
};
use common_error::status_code::StatusCode;
use common_grpc::flight::{
@@ -27,8 +28,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{ConvertFlightDataSnafu, DatanodeSnafu, IllegalFlightMessagesSnafu};
use crate::{error, Client, Result};
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Clone, Debug)]
pub struct Database {
name: String,
@@ -77,6 +76,33 @@ impl Database {
obj_result.try_into()
}
pub async fn create(&self, expr: CreateTableExpr) -> Result<RpcOutput> {
let expr = ObjectExpr {
request: Some(object_expr::Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
})),
};
self.object(expr).await?.try_into()
}
pub async fn alter(&self, expr: AlterExpr) -> Result<RpcOutput> {
let expr = ObjectExpr {
request: Some(object_expr::Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
};
self.object(expr).await?.try_into()
}
pub async fn drop_table(&self, expr: DropTableExpr) -> Result<RpcOutput> {
let expr = ObjectExpr {
request: Some(object_expr::Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
})),
};
self.object(expr).await?.try_into()
}
pub async fn object(&self, expr: ObjectExpr) -> Result<GrpcObjectResult> {
let res = self.objects(vec![expr]).await?.pop().unwrap();
Ok(res)

View File

@@ -51,9 +51,6 @@ pub enum Error {
source: common_grpc::Error,
},
#[snafu(display("Mutate result has failure {}", failure))]
MutateFailure { failure: u32, backtrace: Backtrace },
#[snafu(display("Column datatype error, source: {}", source))]
ColumnDataType {
#[snafu(backtrace)]
@@ -91,7 +88,6 @@ impl ErrorExt for Error {
| Error::MissingHeader { .. }
| Error::TonicStatus { .. }
| Error::Datanode { .. }
| Error::MutateFailure { .. }
| Error::ColumnDataType { .. }
| Error::MissingField { .. } => StatusCode::Internal,
Error::CreateChannel { source, .. } | Error::ConvertFlightData { source } => {

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod admin;
mod client;
mod database;
mod error;

View File

@@ -36,11 +36,8 @@ pub enum Error {
source: substrait::error::Error,
},
#[snafu(display("Failed to execute physical plan, source: {}", source))]
ExecutePhysicalPlan {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Incorrect internal state: {}", state))]
IncorrectInternalState { state: String, backtrace: Backtrace },
#[snafu(display("Failed to create catalog list, source: {}", source))]
NewCatalog {
@@ -325,7 +322,6 @@ impl ErrorExt for Error {
match self {
Error::ExecuteSql { source } => source.status_code(),
Error::DecodeLogicalPlan { source } => source.status_code(),
Error::ExecutePhysicalPlan { source } => source.status_code(),
Error::NewCatalog { source } => source.status_code(),
Error::FindTable { source, .. } => source.status_code(),
Error::CreateTable { source, .. }
@@ -373,7 +369,8 @@ impl ErrorExt for Error {
| Error::Catalog { .. }
| Error::MissingRequiredField { .. }
| Error::FlightGet { .. }
| Error::InvalidFlightTicket { .. } => StatusCode::Internal,
| Error::InvalidFlightTicket { .. }
| Error::IncorrectInternalState { .. } => StatusCode::Internal,
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),

View File

@@ -16,9 +16,10 @@ mod stream;
use std::pin::Pin;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::object_expr::Request as GrpcRequest;
use api::v1::query_request::Query;
use api::v1::{InsertRequest, ObjectExpr};
use api::v1::{DdlRequest, InsertRequest, ObjectExpr};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
@@ -85,13 +86,14 @@ impl FlightService for Instance {
.request
.context(MissingRequiredFieldSnafu { name: "request" })?;
let output = match request {
GrpcRequest::Insert(request) => self.handle_insert(request).await?,
GrpcRequest::Query(query_request) => {
let query = query_request
.query
.context(MissingRequiredFieldSnafu { name: "query" })?;
self.handle_query(query).await?
}
GrpcRequest::Insert(request) => self.handle_insert(request).await?,
GrpcRequest::Ddl(request) => self.handle_ddl(request).await?,
};
let stream = to_flight_data_stream(output);
Ok(Response::new(stream))
@@ -166,6 +168,18 @@ impl Instance {
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
async fn handle_ddl(&self, request: DdlRequest) -> Result<Output> {
let expr = request
.expr
.context(MissingRequiredFieldSnafu { name: "expr" })?;
match expr {
DdlExpr::CreateTable(expr) => self.handle_create(expr).await,
DdlExpr::Alter(expr) => self.handle_alter(expr).await,
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await,
DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await,
}
}
}
fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
@@ -189,14 +203,215 @@ fn to_flight_data_stream(output: Output) -> TonicStream<FlightData> {
#[cfg(test)]
mod test {
use api::v1::QueryRequest;
use api::v1::column::{SemanticType, Values};
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateDatabaseExpr, CreateTableExpr, QueryRequest,
};
use client::RpcOutput;
use common_grpc::flight;
use common_grpc::flight::FlightMessage;
use common_recordbatch::RecordBatches;
use datatypes::prelude::*;
use super::*;
use crate::tests::test_util::{self, MockInstance};
async fn boarding(instance: &MockInstance, ticket: Request<Ticket>) -> RpcOutput {
let response = instance.inner().do_get(ticket).await.unwrap();
let result = flight::flight_data_to_object_result(response)
.await
.unwrap();
result.try_into().unwrap()
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_ddl() {
let instance = MockInstance::new("test_handle_ddl").await;
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr {
database_name: "my_database".to_string(),
})),
})),
}
.encode_to_vec(),
});
let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(1)));
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(CreateTableExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
desc: "blabla".to_string(),
column_defs: vec![
ColumnDef {
name: "a".to_string(),
datatype: ColumnDataType::String as i32,
is_nullable: true,
default_constraint: vec![],
},
ColumnDef {
name: "ts".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
is_nullable: false,
default_constraint: vec![],
},
],
time_index: "ts".to_string(),
..Default::default()
})),
})),
}
.encode_to_vec(),
});
let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(1)));
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
request: Some(GrpcRequest::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(AlterExpr {
catalog_name: "greptime".to_string(),
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
}],
})),
})),
})),
}
.encode_to_vec(),
});
let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(0)));
let output = instance
.inner()
.execute_sql(
"INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)",
QueryContext::arc(),
)
.await
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));
let output = instance
.inner()
.execute_sql(
"SELECT ts, a, b FROM my_database.my_table",
QueryContext::arc(),
)
.await
.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2022-12-30T07:09:00 | s | 1 |
+---------------------+---+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_insert() {
let instance = MockInstance::new("test_handle_insert").await;
test_util::create_test_table(
&instance,
ConcreteDataType::timestamp_millisecond_datatype(),
)
.await
.unwrap();
let insert = InsertRequest {
schema_name: "public".to_string(),
table_name: "demo".to_string(),
columns: vec![
Column {
column_name: "host".to_string(),
values: Some(Values {
string_values: vec![
"host1".to_string(),
"host2".to_string(),
"host3".to_string(),
],
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
},
Column {
column_name: "cpu".to_string(),
values: Some(Values {
f64_values: vec![1.0, 3.0],
..Default::default()
}),
null_mask: vec![2],
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
},
Column {
column_name: "ts".to_string(),
values: Some(Values {
ts_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000],
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::TimestampMillisecond as i32,
..Default::default()
},
],
row_count: 3,
..Default::default()
};
let ticket = Request::new(Ticket {
ticket: ObjectExpr {
request: Some(GrpcRequest::Insert(insert)),
}
.encode_to_vec(),
});
let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(3)));
let output = instance
.inner()
.execute_sql("SELECT ts, host, cpu FROM demo", QueryContext::arc())
.await
.unwrap();
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+-------+-----+
| ts | host | cpu |
+---------------------+-------+-----+
| 2022-12-30T07:09:00 | host1 | 1 |
| 2022-12-30T07:09:01 | host2 | |
| 2022-12-30T07:09:02 | host3 | 3 |
+---------------------+-------+-----+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_query() {
let instance = MockInstance::new("test_handle_query").await;
@@ -221,18 +436,8 @@ mod test {
.encode_to_vec(),
});
let response = instance.inner().do_get(ticket).await.unwrap();
let result = flight::flight_data_to_object_result(response)
.await
.unwrap();
let raw_data = result.flight_data;
let mut messages = flight::raw_flight_data_to_message(raw_data).unwrap();
assert_eq!(messages.len(), 1);
let message = messages.remove(0);
assert!(matches!(message, FlightMessage::AffectedRows(_)));
let FlightMessage::AffectedRows(affected_rows) = message else { unreachable!() };
assert_eq!(affected_rows, 2);
let output = boarding(&instance, ticket).await;
assert!(matches!(output, RpcOutput::AffectedRows(2)));
let ticket = Request::new(Ticket {
ticket: ObjectExpr {

View File

@@ -12,19 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::result::AdminResultBuilder;
use api::v1::{admin_expr, AdminExpr, AdminResult, CreateDatabaseExpr, ObjectExpr, ObjectResult};
use api::v1::{CreateDatabaseExpr, ObjectExpr, ObjectResult};
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::Ticket;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::prelude::BoxedError;
use common_error::status_code::StatusCode;
use common_grpc::flight;
use common_query::Output;
use prost::Message;
use query::plan::LogicalPlan;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler};
use servers::query_handler::GrpcQueryHandler;
use snafu::prelude::*;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::requests::CreateDatabaseRequest;
@@ -43,25 +40,11 @@ impl Instance {
.context(InvalidFlightDataSnafu)
}
async fn execute_create_database(
&self,
create_database_expr: CreateDatabaseExpr,
) -> AdminResult {
pub(crate) async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<Output> {
let req = CreateDatabaseRequest {
db_name: create_database_expr.database_name,
db_name: expr.database_name,
};
let result = self.sql_handler.create_database(req).await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
self.sql_handler().create_database(req).await
}
pub(crate) async fn execute_logical(&self, plan_bytes: Vec<u8>) -> Result<Output> {
@@ -91,28 +74,3 @@ impl GrpcQueryHandler for Instance {
})
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result<AdminResult> {
let admin_resp = match expr.expr {
Some(admin_expr::Expr::CreateTable(create_expr)) => {
self.handle_create(create_expr).await
}
Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await,
Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => {
self.execute_create_database(create_database_expr).await
}
Some(admin_expr::Expr::DropTable(drop_table_expr)) => {
self.handle_drop_table(drop_table_expr).await
}
other => {
return servers::error::NotSupportedSnafu {
feat: format!("{other:?}"),
}
.fail();
}
};
Ok(admin_resp)
}
}

View File

@@ -69,7 +69,7 @@ impl Services {
};
Ok(Self {
grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime),
grpc_server: GrpcServer::new(instance, grpc_runtime),
mysql_server,
})
}

View File

@@ -12,144 +12,77 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::result::AdminResultBuilder;
use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr};
use common_error::prelude::{ErrorExt, StatusCode};
use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr};
use common_grpc_expr::{alter_expr_to_request, create_expr_to_request};
use common_query::Output;
use common_telemetry::{error, info};
use futures::TryFutureExt;
use common_telemetry::info;
use session::context::QueryContext;
use snafu::prelude::*;
use table::requests::DropTableRequest;
use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu};
use crate::error::{
AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu,
IncorrectInternalStateSnafu, Result,
};
use crate::instance::Instance;
use crate::sql::SqlRequest;
impl Instance {
/// Handle gRPC create table requests.
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult {
pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> Result<Output> {
let table_name = format!(
"{}.{}.{}",
expr.catalog_name, expr.schema_name, expr.table_name
);
// TODO(LFC): Revisit table id related feature, add more tests.
// Also merge this mod with mod instance::grpc.
// Respect CreateExpr's table id and region ids if present, or allocate table id
// from local table id provider and set region id to 0.
let table_id = if let Some(table_id) = &expr.table_id {
info!(
"Creating table {:?}.{:?}.{:?} with table id from frontend: {}",
expr.catalog_name, expr.schema_name, expr.table_name, table_id.id
"Creating table {table_name} with table id {} from Frontend",
table_id.id
);
table_id.id
} else {
match self.table_id_provider.as_ref() {
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Internal as u32)
.err_msg("Table id provider absent in standalone mode".to_string())
.build();
}
Some(table_id_provider) => {
match table_id_provider
.next_table_id()
.await
.context(BumpTableIdSnafu)
{
Ok(table_id) => {
info!(
"Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}",
&expr.catalog_name, &expr.schema_name, expr.table_name, table_id
);
table_id
}
Err(e) => {
error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name);
return AdminResultBuilder::default()
.status_code(e.status_code() as u32)
.err_msg(e.to_string())
.build();
}
}
}
}
let provider =
self.table_id_provider
.as_ref()
.context(IncorrectInternalStateSnafu {
state: "Table id provider absent in standalone mode",
})?;
let table_id = provider.next_table_id().await.context(BumpTableIdSnafu)?;
info!("Creating table {table_name} with table id {table_id} from TableIdProvider");
table_id
};
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu);
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler().execute(
SqlRequest::CreateTable(request),
Arc::new(QueryContext::new()),
)
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
// Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug.
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu)?;
self.sql_handler()
.execute(SqlRequest::CreateTable(request), QueryContext::arc())
.await
}
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult {
let request = match alter_expr_to_request(expr)
.context(AlterExprToRequestSnafu)
.transpose()
{
None => {
return AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(0, 0)
.build()
}
Some(req) => req,
};
pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result<Output> {
let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?;
let Some(request) = request else { return Ok(Output::AffectedRows(0)) };
let result = futures::future::ready(request)
.and_then(|request| {
self.sql_handler()
.execute(SqlRequest::Alter(request), Arc::new(QueryContext::new()))
})
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
self.sql_handler()
.execute(SqlRequest::Alter(request), QueryContext::arc())
.await
}
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult {
pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result<Output> {
let req = DropTableRequest {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
table_name: expr.table_name,
};
let result = self
.sql_handler()
.execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new()))
.await;
match result {
Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as _, 0)
.build(),
Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(),
Err(err) => AdminResultBuilder::default()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
self.sql_handler()
.execute(SqlRequest::DropTable(req), QueryContext::arc())
.await
}
}

View File

@@ -232,12 +232,6 @@ pub enum Error {
source: table::error::Error,
},
#[snafu(display("Failed to create table, source: {}", source))]
CreateTable {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to create database: {}, source: {}", name, source))]
CreateDatabase {
name: String,
@@ -257,12 +251,6 @@ pub enum Error {
source: client::Error,
},
#[snafu(display("Failed to alter table on insertion, source: {}", source))]
AlterTableOnInsertion {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Failed to build CreateExpr on insertion: {}", source))]
BuildCreateExprOnInsertion {
#[snafu(backtrace)]
@@ -365,12 +353,6 @@ pub enum Error {
backtrace: Backtrace,
},
#[snafu(display("Invalid admin result, source: {}", source))]
InvalidAdminResult {
#[snafu(backtrace)]
source: client::Error,
},
#[snafu(display("Cannot find primary key column by name: {}", msg))]
PrimaryKeyNotFound { msg: String, backtrace: Backtrace },
@@ -476,9 +458,9 @@ impl ErrorExt for Error {
| Error::VectorComputation { source }
| Error::ConvertArrowSchema { source } => source.status_code(),
Error::ConnectDatanode { source, .. }
| Error::RequestDatanode { source }
| Error::InvalidAdminResult { source } => source.status_code(),
Error::ConnectDatanode { source, .. } | Error::RequestDatanode { source } => {
source.status_code()
}
Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
@@ -513,10 +495,8 @@ impl ErrorExt for Error {
Error::BumpTableId { source, .. } => source.status_code(),
Error::SchemaNotFound { .. } => StatusCode::InvalidArguments,
Error::CatalogNotFound { .. } => StatusCode::InvalidArguments,
Error::CreateTable { source, .. }
| Error::CreateDatabase { source, .. }
Error::CreateDatabase { source, .. }
| Error::CreateTableOnInsertion { source, .. }
| Error::AlterTableOnInsertion { source, .. }
| Error::Insert { source, .. } => source.status_code(),
Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(),
Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),

View File

@@ -20,18 +20,17 @@ mod prometheus;
use std::sync::Arc;
use std::time::Duration;
use api::result::{ObjectResultBuilder, PROTOCOL_VERSION};
use api::result::ObjectResultBuilder;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::object_expr::Request;
use api::v1::{
admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr,
CreateTableExpr, DropTableExpr, ExprHeader, InsertRequest, ObjectExpr,
ObjectResult as GrpcObjectResult,
AddColumns, AlterExpr, Column, CreateTableExpr, DdlRequest, DropTableExpr, InsertRequest,
ObjectExpr, ObjectResult as GrpcObjectResult,
};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef};
use client::admin::admin_result_to_output;
use client::RpcOutput;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
@@ -46,9 +45,9 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef};
use servers::query_handler::{
GrpcAdminHandler, GrpcAdminHandlerRef, GrpcQueryHandler, GrpcQueryHandlerRef,
InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler,
ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef,
GrpcQueryHandler, GrpcQueryHandlerRef, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler,
SqlQueryHandlerRef,
};
use servers::{error as server_error, Mode};
use session::context::QueryContextRef;
@@ -63,20 +62,18 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, AlterTableOnInsertionSnafu, CatalogSnafu, CreateDatabaseSnafu, CreateTableSnafu,
FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result,
self, CatalogSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu,
RequestDatanodeSnafu, Result,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::sql::insert_to_request;
use crate::table::insert::insert_request_to_insert_batch;
use crate::table::route::TableRoutes;
use crate::Plugins;
#[async_trait]
pub trait FrontendInstance:
GrpcAdminHandler
+ GrpcQueryHandler
GrpcQueryHandler
+ SqlQueryHandler
+ OpentsdbProtocolHandler
+ InfluxdbLineProtocolHandler
@@ -107,7 +104,6 @@ pub struct Instance {
sql_handler: SqlQueryHandlerRef,
grpc_query_handler: GrpcQueryHandlerRef,
grpc_admin_handler: GrpcAdminHandlerRef,
/// plugins: this map holds extensions to customize query or auth
/// behaviours.
@@ -140,8 +136,7 @@ impl Instance {
mode: Mode::Distributed,
dist_instance: Some(dist_instance),
sql_handler: dist_instance_ref.clone(),
grpc_query_handler: dist_instance_ref.clone(),
grpc_admin_handler: dist_instance_ref,
grpc_query_handler: dist_instance_ref,
plugins: Default::default(),
})
}
@@ -185,7 +180,6 @@ impl Instance {
dist_instance: None,
sql_handler: dn_instance.clone(),
grpc_query_handler: dn_instance.clone(),
grpc_admin_handler: dn_instance,
plugins: Default::default(),
}
}
@@ -211,40 +205,20 @@ impl Instance {
if let Some(v) = &self.dist_instance {
v.create_table(&mut expr, partitions).await
} else {
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::CreateTable(expr)),
};
let result = self
.grpc_admin_handler
.exec_admin_request(expr)
.grpc_query_handler
.do_query(ObjectExpr {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(expr)),
})),
})
.await
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(CreateTableSnafu)
let output: RpcOutput = result.try_into().context(RequestDatanodeSnafu)?;
Ok(output.into())
}
}
/// Handle create database expr.
pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result<Output> {
let database_name = expr.database_name.clone();
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::CreateDatabase(expr)),
};
let result = self
.grpc_admin_handler
.exec_admin_request(expr)
.await
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(CreateDatabaseSnafu {
name: database_name,
})
}
/// Handle batch inserts
pub async fn handle_inserts(&self, requests: Vec<InsertRequest>) -> Result<Output> {
let mut success = 0;
@@ -369,18 +343,17 @@ impl Instance {
kind: Some(Kind::AddColumns(add_columns)),
};
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Alter(expr)),
};
let result = self
.grpc_admin_handler
.exec_admin_request(expr)
.grpc_query_handler
.do_query(ObjectExpr {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
})
.await
.context(error::InvokeGrpcServerSnafu)?;
admin_result_to_output(result).context(AlterTableOnInsertionSnafu)
let output: RpcOutput = result.try_into().context(RequestDatanodeSnafu)?;
Ok(output.into())
}
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogProviderRef> {
@@ -430,19 +403,6 @@ impl Instance {
.context(error::TableSnafu)
}
fn stmt_to_insert_batch(
&self,
catalog: &str,
schema: &str,
insert: Box<Insert>,
) -> Result<(Vec<Column>, u32)> {
let catalog_provider = self.get_catalog(catalog)?;
let schema_provider = Self::get_schema(catalog_provider, schema)?;
let insert_request = insert_to_request(&schema_provider, *insert)?;
insert_request_to_insert_batch(&insert_request)
}
fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
ensure!(
self.catalog_manager
@@ -486,7 +446,7 @@ impl Instance {
) -> server_error::Result<Output> {
// TODO(sunng87): provide a better form to log or track statement
let query = &format!("{:?}", &stmt);
match stmt {
match stmt.clone() {
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::CreateTable(_)
@@ -498,27 +458,7 @@ impl Instance {
}
Statement::Insert(insert) => match self.mode {
Mode::Standalone => {
let (catalog_name, schema_name, table_name) = insert
.full_table_name()
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(server_error::ExecuteInsertSnafu {
msg: "Failed to get table name",
})?;
let (columns, row_count) = self
.stmt_to_insert_batch(&catalog_name, &schema_name, insert)
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
let request = InsertRequest {
schema_name,
table_name,
region_number: 0,
columns,
row_count,
};
self.handle_insert(request).await
return self.sql_handler.do_statement_query(stmt, query_ctx).await
}
Mode::Distributed => {
let affected = self
@@ -535,14 +475,19 @@ impl Instance {
let expr = AlterExpr::try_from(alter_stmt)
.map_err(BoxedError::new)
.context(server_error::ExecuteAlterSnafu { query })?;
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::Alter(expr)),
};
let result = self.grpc_admin_handler.exec_admin_request(expr).await?;
admin_result_to_output(result).context(error::InvalidAdminResultSnafu)
let result = self
.grpc_query_handler
.do_query(ObjectExpr {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::Alter(expr)),
})),
})
.await?;
let output: RpcOutput = result
.try_into()
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
Ok(output.into())
}
Statement::DropTable(drop_stmt) => {
let expr = DropTableExpr {
@@ -550,14 +495,19 @@ impl Instance {
schema_name: drop_stmt.schema_name,
table_name: drop_stmt.table_name,
};
let expr = AdminExpr {
header: Some(ExprHeader {
version: PROTOCOL_VERSION,
}),
expr: Some(admin_expr::Expr::DropTable(expr)),
};
let result = self.grpc_admin_handler.exec_admin_request(expr).await?;
admin_result_to_output(result).context(error::InvalidAdminResultSnafu)
let result = self
.grpc_query_handler
.do_query(ObjectExpr {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::DropTable(expr)),
})),
})
.await?;
let output: RpcOutput = result
.try_into()
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu { query })?;
Ok(output.into())
}
Statement::ShowCreateTable(_) => {
return server_error::NotSupportedSnafu { feat: query }.fail();
@@ -699,29 +649,15 @@ impl GrpcQueryHandler for Instance {
}
}
#[async_trait]
impl GrpcAdminHandler for Instance {
async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result<AdminResult> {
// Force the default to be `None` rather than `Some(0)` comes from gRPC decode.
// Related issue: #480
if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr {
create.table_id = None;
}
self.grpc_admin_handler.exec_admin_request(expr).await
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::borrow::Cow;
use std::iter;
use std::sync::atomic::AtomicU32;
use api::v1::column::SemanticType;
use api::v1::{
admin_expr, admin_result, column, query_request, Column, ColumnDataType,
ColumnDef as GrpcColumnDef, ExprHeader, MutateResult, QueryRequest,
column, query_request, Column, ColumnDataType, ColumnDef as GrpcColumnDef, QueryRequest,
};
use common_grpc::flight::{raw_flight_data_to_message, FlightMessage};
use common_recordbatch::RecordBatch;
@@ -879,21 +815,18 @@ mod tests {
};
// create
let create_expr = create_expr();
let admin_expr = AdminExpr {
header: Some(ExprHeader::default()),
expr: Some(admin_expr::Expr::CreateTable(create_expr)),
};
let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr)
.await
.unwrap();
assert_matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
);
let result = GrpcQueryHandler::do_query(
&*instance,
ObjectExpr {
request: Some(Request::Ddl(DdlRequest {
expr: Some(DdlExpr::CreateTable(create_expr())),
})),
},
)
.await
.unwrap();
let output: RpcOutput = result.try_into().unwrap();
assert!(matches!(output, RpcOutput::AffectedRows(1)));
// insert
let columns = vec![

View File

@@ -16,18 +16,18 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::result::AdminResultBuilder;
use api::v1::{
admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr,
ObjectResult, TableId,
};
use api::result::ObjectResultBuilder;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::object_expr::Request as GrpcRequest;
use api::v1::{AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr, ObjectResult, TableId};
use async_trait::async_trait;
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use catalog::{CatalogList, CatalogManager};
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::prelude::ConcreteDataType;
@@ -40,7 +40,7 @@ use meta_client::rpc::{
use query::sql::{describe_table, explain, show_databases, show_tables};
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error as server_error;
use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler};
use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler};
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::Value as SqlValue;
@@ -53,8 +53,8 @@ use crate::catalog::FrontendCatalogManager;
use crate::datanode::DatanodeClients;
use crate::error::{
self, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, ColumnDataTypeSnafu,
PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu,
TableNotFoundSnafu,
PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu,
StartMetaClientSnafu, TableNotFoundSnafu,
};
use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory};
use crate::instance::parse_stmt;
@@ -120,7 +120,7 @@ impl DistInstance {
for datanode in table_route.find_leaders() {
let client = self.datanode_clients.get_client(&datanode).await;
let client = Admin::new("greptime", client);
let client = Database::new("greptime", client);
let regions = table_route.find_leader_regions(&datanode);
let mut create_expr_for_region = create_table.clone();
@@ -134,8 +134,7 @@ impl DistInstance {
client
.create(create_expr_for_region)
.await
.and_then(admin_result_to_output)
.context(error::InvalidAdminResultSnafu)?;
.context(RequestDatanodeSnafu)?;
}
// Checked in real MySQL, it truly returns "0 rows affected".
@@ -221,7 +220,7 @@ impl DistInstance {
Ok(())
}
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<AdminResult> {
async fn handle_alter_table(&self, expr: AlterExpr) -> Result<()> {
let catalog_name = if expr.catalog_name.is_empty() {
DEFAULT_CATALOG_NAME
} else {
@@ -253,8 +252,7 @@ impl DistInstance {
.as_any()
.downcast_ref::<DistTable>()
.expect("Table impl must be DistTable in distributed mode");
dist_table.alter_by_expr(expr).await?;
Ok(AdminResultBuilder::default().mutate_result(0, 0).build())
dist_table.alter_by_expr(expr).await
}
async fn create_table_in_meta(
@@ -368,32 +366,33 @@ impl SqlQueryHandler for DistInstance {
#[async_trait]
impl GrpcQueryHandler for DistInstance {
async fn do_query(&self, _: ObjectExpr) -> server_error::Result<ObjectResult> {
unimplemented!()
}
}
#[async_trait]
impl GrpcAdminHandler for DistInstance {
async fn exec_admin_request(&self, query: AdminExpr) -> server_error::Result<AdminResult> {
let expr = query
.clone()
.expr
.context(server_error::InvalidQuerySnafu {
reason: "empty expr",
})?;
match expr {
admin_expr::Expr::CreateDatabase(create_database) => self
.handle_create_database(create_database)
.await
.map(|_| AdminResultBuilder::default().mutate_result(1, 0).build()),
admin_expr::Expr::Alter(alter) => self.handle_alter_table(alter).await,
_ => unimplemented!(),
async fn do_query(&self, expr: ObjectExpr) -> server_error::Result<ObjectResult> {
let request = expr.request.context(server_error::InvalidQuerySnafu {
reason: "empty expr",
})?;
match request {
GrpcRequest::Ddl(request) => {
let expr = request.expr.context(server_error::InvalidQuerySnafu {
reason: "empty DDL expr",
})?;
match expr.clone() {
DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await,
DdlExpr::Alter(expr) => self.handle_alter_table(expr).await,
DdlExpr::CreateTable(_) | DdlExpr::DropTable(_) => unimplemented!(),
}
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{expr:?}"),
})?;
Ok(ObjectResultBuilder::new()
.flight_data(vec![
FlightEncoder::default().encode(FlightMessage::AffectedRows(1))
])
.build())
}
// TODO(LFC): Implement Flight for DistInstance.
GrpcRequest::Query(_) | GrpcRequest::Insert(_) => unimplemented!(),
}
.map_err(BoxedError::new)
.context(server_error::ExecuteQuerySnafu {
query: format!("{query:?}"),
})
}
}

View File

@@ -169,7 +169,8 @@ impl PrometheusProtocolHandler for Instance {
mod tests {
use api::prometheus::remote::label_matcher::Type as MatcherType;
use api::prometheus::remote::{Label, LabelMatcher, Sample};
use api::v1::CreateDatabaseExpr;
use servers::query_handler::SqlQueryHandler;
use session::context::QueryContext;
use super::*;
use crate::tests;
@@ -187,12 +188,12 @@ mod tests {
let db = "prometheus";
instance
.handle_create_database(CreateDatabaseExpr {
database_name: db.to_string(),
})
assert!(instance
.do_query("CREATE DATABASE prometheus", QueryContext::arc())
.await
.unwrap();
.get(0)
.unwrap()
.is_ok());
instance.write(db, write_request).await.unwrap();

View File

@@ -56,7 +56,7 @@ impl Services {
.context(error::RuntimeResourceSnafu)?,
);
let grpc_server = GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime);
let grpc_server = GrpcServer::new(instance.clone(), grpc_runtime);
Some((Box::new(grpc_server) as _, grpc_addr))
} else {

View File

@@ -20,7 +20,6 @@ use std::sync::Arc;
use api::v1::AlterExpr;
use async_trait::async_trait;
use client::admin::Admin;
use client::{Database, RpcOutput};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::error::Result as QueryResult;
@@ -367,15 +366,12 @@ impl DistTable {
}
);
for datanode in leaders {
let admin = Admin::new(
let db = Database::new(
DEFAULT_CATALOG_NAME,
self.datanode_clients.get_client(&datanode).await,
);
debug!("Sent alter table {:?} to {:?}", expr, admin);
let result = admin
.alter(expr.clone())
.await
.context(RequestDatanodeSnafu)?;
debug!("Sending {:?} to {:?}", expr, db);
let result = db.alter(expr.clone()).await.context(RequestDatanodeSnafu)?;
debug!("Alter table result: {:?}", result);
// TODO(hl): We should further check and track alter result in some global DDL task tracker
}

View File

@@ -93,8 +93,7 @@ pub(crate) async fn create_datanode_client(
// create a mock datanode grpc service, see example here:
// https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs
let datanode_service =
GrpcServer::new(datanode_instance.clone(), datanode_instance, runtime).create_service();
let datanode_service = GrpcServer::new(datanode_instance, runtime).create_service();
tokio::spawn(async move {
Server::builder()
.add_service(datanode_service)

View File

@@ -31,25 +31,19 @@ use tonic::{Request, Response, Status};
use crate::error::{self, AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
use crate::grpc::handler::BatchHandler;
use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
use crate::query_handler::GrpcQueryHandlerRef;
use crate::server::Server;
pub struct GrpcServer {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
shutdown_tx: Mutex<Option<Sender<()>>>,
runtime: Arc<Runtime>,
}
impl GrpcServer {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc<Runtime>) -> Self {
Self {
query_handler,
admin_handler,
shutdown_tx: Mutex::new(None),
runtime,
}
@@ -57,11 +51,7 @@ impl GrpcServer {
pub fn create_service(&self) -> greptime_server::GreptimeServer<GrpcService> {
let service = GrpcService {
handler: BatchHandler::new(
self.query_handler.clone(),
self.admin_handler.clone(),
self.runtime.clone(),
),
handler: BatchHandler::new(self.query_handler.clone(), self.runtime.clone()),
};
greptime_server::GreptimeServer::new(service)
}

View File

@@ -14,29 +14,23 @@
use std::sync::Arc;
use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse};
use api::v1::{BatchRequest, BatchResponse, DatabaseResponse};
use common_runtime::Runtime;
use tokio::sync::oneshot;
use crate::error::Result;
use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef};
use crate::query_handler::GrpcQueryHandlerRef;
#[derive(Clone)]
pub struct BatchHandler {
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
}
impl BatchHandler {
pub fn new(
query_handler: GrpcQueryHandlerRef,
admin_handler: GrpcAdminHandlerRef,
runtime: Arc<Runtime>,
) -> Self {
pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc<Runtime>) -> Self {
Self {
query_handler,
admin_handler,
runtime,
}
}
@@ -44,23 +38,11 @@ impl BatchHandler {
pub async fn batch(&self, batch_req: BatchRequest) -> Result<BatchResponse> {
let (tx, rx) = oneshot::channel();
let query_handler = self.query_handler.clone();
let admin_handler = self.admin_handler.clone();
let future = async move {
let mut batch_resp = BatchResponse::default();
let mut admin_resp = AdminResponse::default();
let mut db_resp = DatabaseResponse::default();
for admin_req in batch_req.admins {
admin_resp.results.reserve(admin_req.exprs.len());
for admin_expr in admin_req.exprs {
let admin_result = admin_handler.exec_admin_request(admin_expr).await?;
admin_resp.results.push(admin_result);
}
}
batch_resp.admins.push(admin_resp);
for db_req in batch_req.databases {
db_resp.results.reserve(db_req.exprs.len());
@@ -77,8 +59,7 @@ impl BatchHandler {
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by tonic runtime.
// 2. avoid the handler blocks the gRPC runtime because `exec_admin_request` may block
// the caller thread.
// 2. avoid the handler blocks the gRPC runtime
self.runtime.spawn(async move {
let result = future.await;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult};
use api::v1::{ObjectExpr, ObjectResult};
use async_trait::async_trait;
use common_query::Output;
use session::context::QueryContextRef;
@@ -38,7 +38,6 @@ use crate::prometheus::Metrics;
pub type SqlQueryHandlerRef = Arc<dyn SqlQueryHandler + Send + Sync>;
pub type GrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
pub type GrpcAdminHandlerRef = Arc<dyn GrpcAdminHandler + Send + Sync>;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PrometheusProtocolHandlerRef = Arc<dyn PrometheusProtocolHandler + Send + Sync>;
@@ -69,11 +68,6 @@ pub trait GrpcQueryHandler {
async fn do_query(&self, query: ObjectExpr) -> Result<ObjectResult>;
}
#[async_trait]
pub trait GrpcAdminHandler {
async fn exec_admin_request(&self, expr: AdminExpr) -> Result<AdminResult>;
}
#[async_trait]
pub trait InfluxdbLineProtocolHandler {
/// A successful request will not return a response.

View File

@@ -277,11 +277,7 @@ pub async fn setup_grpc_server(
let fe_instance = frontend::instance::Instance::new_standalone(instance.clone());
let fe_instance_ref = Arc::new(fe_instance);
let fe_grpc_server = Arc::new(GrpcServer::new(
fe_instance_ref.clone(),
fe_instance_ref,
runtime,
));
let fe_grpc_server = Arc::new(GrpcServer::new(fe_instance_ref, runtime));
let grpc_server_clone = fe_grpc_server.clone();
let fe_grpc_addr_clone = fe_grpc_addr.clone();

View File

@@ -14,10 +14,9 @@
use api::v1::alter_expr::Kind;
use api::v1::column::SemanticType;
use api::v1::{
admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
CreateTableExpr, InsertRequest, MutateResult, TableId,
column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr,
InsertRequest, TableId,
};
use client::admin::Admin;
use client::{Client, Database, RpcOutput};
use common_catalog::consts::MIN_USER_TABLE_ID;
use servers::server::Server;
@@ -133,18 +132,11 @@ pub async fn test_insert_and_select(store_type: StorageType) {
let grpc_client = Client::with_urls(vec![addr]);
let db = Database::new("greptime", grpc_client.clone());
let admin = Admin::new("greptime", grpc_client);
// create
let expr = testing_create_expr();
let result = admin.create(expr).await.unwrap();
assert!(matches!(
result.result,
Some(admin_result::Result::Mutate(MutateResult {
success: 1,
failure: 0
}))
));
let result = db.create(expr).await.unwrap();
assert!(matches!(result, RpcOutput::AffectedRows(1)));
//alter
let add_column = ColumnDef {
@@ -160,13 +152,13 @@ pub async fn test_insert_and_select(store_type: StorageType) {
}],
});
let expr = AlterExpr {
table_name: "test_table".to_string(),
table_name: "demo".to_string(),
catalog_name: "".to_string(),
schema_name: "".to_string(),
kind: Some(kind),
};
let result = admin.alter(expr).await.unwrap();
assert!(result.result.is_none());
let result = db.alter(expr).await.unwrap();
assert!(matches!(result, RpcOutput::AffectedRows(0)));
// insert
insert_and_assert(&db).await;
@@ -205,7 +197,10 @@ async fn insert_and_assert(db: &Database) {
assert!(matches!(result, RpcOutput::AffectedRows(2)));
// select
let result = db.sql("SELECT * FROM demo").await.unwrap();
let result = db
.sql("SELECT host, cpu, memory, ts FROM demo")
.await
.unwrap();
match result {
RpcOutput::RecordBatches(recordbatches) => {
let pretty = recordbatches.pretty_print().unwrap();

View File

@@ -9,16 +9,12 @@ CREATE TABLE system_metrics (
TIME INDEX(ts)
);
MutateResult { success: 1, failure: 0 }
INSERT INTO system_metrics
VALUES
("host1", "idc_a", 11.8, 10.3, 10.3, 1667446797450),
("host2", "idc_a", 80.1, 70.3, 90.0, 1667446797450),
("host1", "idc_b", 50.0, 66.7, 40.6, 1667446797450);
MutateResult { success: 3, failure: 0 }
SELECT * FROM system_metrics;
+-----------------------+----------------------+----------------------------+-------------------------------+-----------------------------+----------------------------+
@@ -55,6 +51,3 @@ SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc;
+----------------------+---------------------------------------------------+
DROP TABLE system_metrics;
MutateResult { success: 1, failure: 0 }