diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index a0d4eee141..9ee82af92e 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -25,7 +25,6 @@ use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArr use arrow::datatypes::{DataType, Float64Type, Int64Type}; use arrow::record_batch::RecordBatch; use clap::Parser; -use client::admin::Admin; use client::api::v1::column::Values; use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; use client::{Client, Database}; @@ -362,13 +361,11 @@ fn query_set() -> HashMap { ret } -async fn do_write(args: &Args, client: &Client) { - let admin = Admin::new("admin", client.clone()); - +async fn do_write(args: &Args, db: &Database) { let mut file_list = get_file_list(args.path.clone().expect("Specify data path in argument")); let mut write_jobs = JoinSet::new(); - let create_table_result = admin.create(create_table_expr()).await; + let create_table_result = db.create(create_table_expr()).await; println!("Create table result: {create_table_result:?}"); let progress_bar_style = ProgressStyle::with_template( @@ -383,7 +380,7 @@ async fn do_write(args: &Args, client: &Client) { let batch_size = args.batch_size; for _ in 0..args.thread_num { if let Some(path) = file_list.pop() { - let db = Database::new(DATABASE_NAME, client.clone()); + let db = db.clone(); let mpb = multi_progress_bar.clone(); let pb_style = progress_bar_style.clone(); write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); @@ -392,7 +389,7 @@ async fn do_write(args: &Args, client: &Client) { while write_jobs.join_next().await.is_some() { file_progress.inc(1); if let Some(path) = file_list.pop() { - let db = Database::new(DATABASE_NAME, client.clone()); + let db = db.clone(); let mpb = multi_progress_bar.clone(); let pb_style = progress_bar_style.clone(); write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await }); @@ -427,13 +424,13 @@ fn main() { .unwrap() .block_on(async { let client = Client::with_urls(vec![&args.endpoint]); + let db = Database::new(DATABASE_NAME, client); if !args.skip_write { - do_write(&args, &client).await; + do_write(&args, &db).await; } if !args.skip_read { - let db = Database::new(DATABASE_NAME, client.clone()); do_query(args.iter_num, &db).await; } }) diff --git a/src/api/greptime/v1/common.proto b/src/api/greptime/v1/common.proto index e5c5a55785..8fd4601272 100644 --- a/src/api/greptime/v1/common.proto +++ b/src/api/greptime/v1/common.proto @@ -6,17 +6,8 @@ message RequestHeader { string tenant = 1; } -message ExprHeader { - uint32 version = 1; -} - message ResultHeader { uint32 version = 1; uint32 code = 2; string err_msg = 3; } - -message MutateResult { - uint32 success = 1; - uint32 failure = 2; -} diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 82295a6437..7e50deaa33 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package greptime.v1; +import "greptime/v1/ddl.proto"; import "greptime/v1/column.proto"; import "greptime/v1/common.proto"; @@ -18,6 +19,7 @@ message ObjectExpr { oneof request { InsertRequest insert = 1; QueryRequest query = 2; + DdlRequest ddl = 3; } } diff --git a/src/api/greptime/v1/admin.proto b/src/api/greptime/v1/ddl.proto similarity index 77% rename from src/api/greptime/v1/admin.proto rename to src/api/greptime/v1/ddl.proto index d5c9e93a7c..0f30bf0ac1 100644 --- a/src/api/greptime/v1/admin.proto +++ b/src/api/greptime/v1/ddl.proto @@ -5,29 +5,15 @@ package greptime.v1; import "greptime/v1/column.proto"; import "greptime/v1/common.proto"; -message AdminRequest { - string name = 1; - repeated AdminExpr exprs = 2; -} - -message AdminResponse { - repeated AdminResult results = 1; -} - -message AdminExpr { - ExprHeader header = 1; +// "Data Definition Language" requests, that create, modify or delete the database structures but not the data. +// `DdlRequest` could carry more information than plain SQL, for example, the "table_id" in `CreateTableExpr`. +// So create a new DDL expr if you need it. +message DdlRequest { oneof expr { + CreateDatabaseExpr create_database = 1; CreateTableExpr create_table = 2; AlterExpr alter = 3; - CreateDatabaseExpr create_database = 4; - DropTableExpr drop_table = 5; - } -} - -message AdminResult { - ResultHeader header = 1; - oneof result { - MutateResult mutate = 2; + DropTableExpr drop_table = 4; } } diff --git a/src/api/greptime/v1/greptime.proto b/src/api/greptime/v1/greptime.proto index 1f2d83be65..7add1086d3 100644 --- a/src/api/greptime/v1/greptime.proto +++ b/src/api/greptime/v1/greptime.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package greptime.v1; -import "greptime/v1/admin.proto"; import "greptime/v1/common.proto"; import "greptime/v1/database.proto"; @@ -12,11 +11,9 @@ service Greptime { message BatchRequest { RequestHeader header = 1; - repeated AdminRequest admins = 2; - repeated DatabaseRequest databases = 3; + repeated DatabaseRequest databases = 2; } message BatchResponse { - repeated AdminResponse admins = 1; - repeated DatabaseResponse databases = 2; + repeated DatabaseResponse databases = 1; } diff --git a/src/api/src/result.rs b/src/api/src/result.rs index 860d144600..e93e94945a 100644 --- a/src/api/src/result.rs +++ b/src/api/src/result.rs @@ -15,13 +15,10 @@ use arrow_flight::FlightData; use prost::Message; -use crate::v1::{admin_result, AdminResult, MutateResult, ObjectResult, ResultHeader}; +use crate::v1::{ObjectResult, ResultHeader}; pub const PROTOCOL_VERSION: u32 = 1; -pub type Success = u32; -pub type Failure = u32; - #[derive(Default)] pub struct ObjectResultBuilder { version: u32, @@ -81,61 +78,6 @@ impl ObjectResultBuilder { } } -#[derive(Debug)] -pub struct AdminResultBuilder { - version: u32, - code: u32, - err_msg: Option, - mutate: Option<(Success, Failure)>, -} - -impl AdminResultBuilder { - pub fn status_code(mut self, code: u32) -> Self { - self.code = code; - self - } - - pub fn err_msg(mut self, err_msg: String) -> Self { - self.err_msg = Some(err_msg); - self - } - - pub fn mutate_result(mut self, success: u32, failure: u32) -> Self { - self.mutate = Some((success, failure)); - self - } - - pub fn build(self) -> AdminResult { - let header = Some(ResultHeader { - version: self.version, - code: self.code, - err_msg: self.err_msg.unwrap_or_default(), - }); - - let result = if let Some((success, failure)) = self.mutate { - Some(admin_result::Result::Mutate(MutateResult { - success, - failure, - })) - } else { - None - }; - - AdminResult { header, result } - } -} - -impl Default for AdminResultBuilder { - fn default() -> Self { - Self { - version: PROTOCOL_VERSION, - code: 0, - err_msg: None, - mutate: None, - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 3fe3ff0a7b..e00c8c0984 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -13,7 +13,6 @@ // limitations under the License. use api::v1::{ColumnDataType, ColumnDef, CreateTableExpr, TableId}; -use client::admin::Admin; use client::{Client, Database}; use prost_09::Message; use substrait_proto::protobuf::plan_rel::RelType as PlanRelType; @@ -66,8 +65,8 @@ async fn run() { region_ids: vec![0], }; - let admin = Admin::new("create table", client.clone()); - let result = admin.create(create_table_expr).await.unwrap(); + let db = Database::new("create table", client.clone()); + let result = db.create(create_table_expr).await.unwrap(); event!(Level::INFO, "create table result: {:#?}", result); let logical = mock_logical_plan(); diff --git a/src/client/src/admin.rs b/src/client/src/admin.rs deleted file mode 100644 index 2a22f7a26e..0000000000 --- a/src/client/src/admin.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::*; -use common_error::prelude::StatusCode; -use common_query::Output; -use snafu::prelude::*; - -use crate::database::PROTOCOL_VERSION; -use crate::{error, Client, Result}; - -#[derive(Clone, Debug)] -pub struct Admin { - name: String, - client: Client, -} - -impl Admin { - pub fn new(name: impl Into, client: Client) -> Self { - Self { - name: name.into(), - client, - } - } - - pub async fn create(&self, expr: CreateTableExpr) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - let expr = AdminExpr { - header: Some(header), - expr: Some(admin_expr::Expr::CreateTable(expr)), - }; - self.do_request(expr).await - } - - pub async fn do_request(&self, expr: AdminExpr) -> Result { - // `remove(0)` is safe because of `do_requests`'s invariants. - Ok(self.do_requests(vec![expr]).await?.remove(0)) - } - - pub async fn alter(&self, expr: AlterExpr) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - let expr = AdminExpr { - header: Some(header), - expr: Some(admin_expr::Expr::Alter(expr)), - }; - self.do_request(expr).await - } - - pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - let expr = AdminExpr { - header: Some(header), - expr: Some(admin_expr::Expr::DropTable(expr)), - }; - - self.do_request(expr).await - } - - /// Invariants: the lengths of input vec (`Vec`) and output vec (`Vec`) are equal. - async fn do_requests(&self, exprs: Vec) -> Result> { - let expr_count = exprs.len(); - let req = AdminRequest { - name: self.name.clone(), - exprs, - }; - - let resp = self.client.admin(req).await?; - - let results = resp.results; - ensure!( - results.len() == expr_count, - error::MissingResultSnafu { - name: "admin_results", - expected: expr_count, - actual: results.len(), - } - ); - Ok(results) - } - - pub async fn create_database(&self, expr: CreateDatabaseExpr) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - let expr = AdminExpr { - header: Some(header), - expr: Some(admin_expr::Expr::CreateDatabase(expr)), - }; - Ok(self.do_requests(vec![expr]).await?.remove(0)) - } -} - -pub fn admin_result_to_output(admin_result: AdminResult) -> Result { - let header = admin_result.header.context(error::MissingHeaderSnafu)?; - if !StatusCode::is_success(header.code) { - return error::DatanodeSnafu { - code: header.code, - msg: header.err_msg, - } - .fail(); - } - - let result = admin_result.result.context(error::MissingResultSnafu { - name: "result".to_string(), - expected: 1_usize, - actual: 0_usize, - })?; - let output = match result { - admin_result::Result::Mutate(mutate) => { - if mutate.failure != 0 { - return error::MutateFailureSnafu { - failure: mutate.failure, - } - .fail(); - } - Output::AffectedRows(mutate.success as usize) - } - }; - Ok(output) -} diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 5b4af8e96b..ddc145b1be 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -104,20 +104,6 @@ impl Client { self.inner.set_peers(urls); } - pub async fn admin(&self, req: AdminRequest) -> Result { - let req = BatchRequest { - admins: vec![req], - ..Default::default() - }; - - let mut res = self.batch(req).await?; - res.admins.pop().context(error::MissingResultSnafu { - name: "admins", - expected: 1_usize, - actual: 0_usize, - }) - } - pub async fn database(&self, req: DatabaseRequest) -> Result { let req = BatchRequest { databases: vec![req], diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 78b27a208f..a5b024d344 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::ddl_request::Expr as DdlExpr; use api::v1::{ - object_expr, query_request, DatabaseRequest, InsertRequest, ObjectExpr, - ObjectResult as GrpcObjectResult, QueryRequest, + object_expr, query_request, AlterExpr, CreateTableExpr, DatabaseRequest, DdlRequest, + DropTableExpr, InsertRequest, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest, }; use common_error::status_code::StatusCode; use common_grpc::flight::{ @@ -27,8 +28,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ConvertFlightDataSnafu, DatanodeSnafu, IllegalFlightMessagesSnafu}; use crate::{error, Client, Result}; -pub const PROTOCOL_VERSION: u32 = 1; - #[derive(Clone, Debug)] pub struct Database { name: String, @@ -77,6 +76,33 @@ impl Database { obj_result.try_into() } + pub async fn create(&self, expr: CreateTableExpr) -> Result { + let expr = ObjectExpr { + request: Some(object_expr::Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + })), + }; + self.object(expr).await?.try_into() + } + + pub async fn alter(&self, expr: AlterExpr) -> Result { + let expr = ObjectExpr { + request: Some(object_expr::Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })), + }; + self.object(expr).await?.try_into() + } + + pub async fn drop_table(&self, expr: DropTableExpr) -> Result { + let expr = ObjectExpr { + request: Some(object_expr::Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + })), + }; + self.object(expr).await?.try_into() + } + pub async fn object(&self, expr: ObjectExpr) -> Result { let res = self.objects(vec![expr]).await?.pop().unwrap(); Ok(res) diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 9564761fc6..fa607258bf 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -51,9 +51,6 @@ pub enum Error { source: common_grpc::Error, }, - #[snafu(display("Mutate result has failure {}", failure))] - MutateFailure { failure: u32, backtrace: Backtrace }, - #[snafu(display("Column datatype error, source: {}", source))] ColumnDataType { #[snafu(backtrace)] @@ -91,7 +88,6 @@ impl ErrorExt for Error { | Error::MissingHeader { .. } | Error::TonicStatus { .. } | Error::Datanode { .. } - | Error::MutateFailure { .. } | Error::ColumnDataType { .. } | Error::MissingField { .. } => StatusCode::Internal, Error::CreateChannel { source, .. } | Error::ConvertFlightData { source } => { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index bc93954d22..09080c640d 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod admin; mod client; mod database; mod error; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 148c5913c6..4e1139a678 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -36,11 +36,8 @@ pub enum Error { source: substrait::error::Error, }, - #[snafu(display("Failed to execute physical plan, source: {}", source))] - ExecutePhysicalPlan { - #[snafu(backtrace)] - source: query::error::Error, - }, + #[snafu(display("Incorrect internal state: {}", state))] + IncorrectInternalState { state: String, backtrace: Backtrace }, #[snafu(display("Failed to create catalog list, source: {}", source))] NewCatalog { @@ -325,7 +322,6 @@ impl ErrorExt for Error { match self { Error::ExecuteSql { source } => source.status_code(), Error::DecodeLogicalPlan { source } => source.status_code(), - Error::ExecutePhysicalPlan { source } => source.status_code(), Error::NewCatalog { source } => source.status_code(), Error::FindTable { source, .. } => source.status_code(), Error::CreateTable { source, .. } @@ -373,7 +369,8 @@ impl ErrorExt for Error { | Error::Catalog { .. } | Error::MissingRequiredField { .. } | Error::FlightGet { .. } - | Error::InvalidFlightTicket { .. } => StatusCode::Internal, + | Error::InvalidFlightTicket { .. } + | Error::IncorrectInternalState { .. } => StatusCode::Internal, Error::InitBackend { .. } => StatusCode::StorageUnavailable, Error::OpenLogStore { source } => source.status_code(), diff --git a/src/datanode/src/instance/flight.rs b/src/datanode/src/instance/flight.rs index c1703b06b4..a6bcd6fb92 100644 --- a/src/datanode/src/instance/flight.rs +++ b/src/datanode/src/instance/flight.rs @@ -16,9 +16,10 @@ mod stream; use std::pin::Pin; +use api::v1::ddl_request::Expr as DdlExpr; use api::v1::object_expr::Request as GrpcRequest; use api::v1::query_request::Query; -use api::v1::{InsertRequest, ObjectExpr}; +use api::v1::{DdlRequest, InsertRequest, ObjectExpr}; use arrow_flight::flight_service_server::FlightService; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -85,13 +86,14 @@ impl FlightService for Instance { .request .context(MissingRequiredFieldSnafu { name: "request" })?; let output = match request { + GrpcRequest::Insert(request) => self.handle_insert(request).await?, GrpcRequest::Query(query_request) => { let query = query_request .query .context(MissingRequiredFieldSnafu { name: "query" })?; self.handle_query(query).await? } - GrpcRequest::Insert(request) => self.handle_insert(request).await?, + GrpcRequest::Ddl(request) => self.handle_ddl(request).await?, }; let stream = to_flight_data_stream(output); Ok(Response::new(stream)) @@ -166,6 +168,18 @@ impl Instance { .context(InsertSnafu { table_name })?; Ok(Output::AffectedRows(affected_rows)) } + + async fn handle_ddl(&self, request: DdlRequest) -> Result { + let expr = request + .expr + .context(MissingRequiredFieldSnafu { name: "expr" })?; + match expr { + DdlExpr::CreateTable(expr) => self.handle_create(expr).await, + DdlExpr::Alter(expr) => self.handle_alter(expr).await, + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await, + DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, + } + } } fn to_flight_data_stream(output: Output) -> TonicStream { @@ -189,14 +203,215 @@ fn to_flight_data_stream(output: Output) -> TonicStream { #[cfg(test)] mod test { - use api::v1::QueryRequest; + use api::v1::column::{SemanticType, Values}; + use api::v1::{ + alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, + CreateDatabaseExpr, CreateTableExpr, QueryRequest, + }; + use client::RpcOutput; use common_grpc::flight; - use common_grpc::flight::FlightMessage; + use common_recordbatch::RecordBatches; use datatypes::prelude::*; use super::*; use crate::tests::test_util::{self, MockInstance}; + async fn boarding(instance: &MockInstance, ticket: Request) -> RpcOutput { + let response = instance.inner().do_get(ticket).await.unwrap(); + let result = flight::flight_data_to_object_result(response) + .await + .unwrap(); + result.try_into().unwrap() + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_ddl() { + let instance = MockInstance::new("test_handle_ddl").await; + + let ticket = Request::new(Ticket { + ticket: ObjectExpr { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + })), + })), + } + .encode_to_vec(), + }); + + let output = boarding(&instance, ticket).await; + assert!(matches!(output, RpcOutput::AffectedRows(1))); + + let ticket = Request::new(Ticket { + ticket: ObjectExpr { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + ..Default::default() + })), + })), + } + .encode_to_vec(), + }); + + let output = boarding(&instance, ticket).await; + assert!(matches!(output, RpcOutput::AffectedRows(1))); + + let ticket = Request::new(Ticket { + ticket: ObjectExpr { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + }], + })), + })), + })), + } + .encode_to_vec(), + }); + + let output = boarding(&instance, ticket).await; + assert!(matches!(output, RpcOutput::AffectedRows(0))); + + let output = instance + .inner() + .execute_sql( + "INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)", + QueryContext::arc(), + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .inner() + .execute_sql( + "SELECT ts, a, b FROM my_database.my_table", + QueryContext::arc(), + ) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2022-12-30T07:09:00 | s | 1 | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_insert() { + let instance = MockInstance::new("test_handle_insert").await; + test_util::create_test_table( + &instance, + ConcreteDataType::timestamp_millisecond_datatype(), + ) + .await + .unwrap(); + + let insert = InsertRequest { + schema_name: "public".to_string(), + table_name: "demo".to_string(), + columns: vec![ + Column { + column_name: "host".to_string(), + values: Some(Values { + string_values: vec![ + "host1".to_string(), + "host2".to_string(), + "host3".to_string(), + ], + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "cpu".to_string(), + values: Some(Values { + f64_values: vec![1.0, 3.0], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + let ticket = Request::new(Ticket { + ticket: ObjectExpr { + request: Some(GrpcRequest::Insert(insert)), + } + .encode_to_vec(), + }); + + let output = boarding(&instance, ticket).await; + assert!(matches!(output, RpcOutput::AffectedRows(3))); + + let output = instance + .inner() + .execute_sql("SELECT ts, host, cpu FROM demo", QueryContext::arc()) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+-------+-----+ +| ts | host | cpu | ++---------------------+-------+-----+ +| 2022-12-30T07:09:00 | host1 | 1 | +| 2022-12-30T07:09:01 | host2 | | +| 2022-12-30T07:09:02 | host3 | 3 | ++---------------------+-------+-----+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + #[tokio::test(flavor = "multi_thread")] async fn test_handle_query() { let instance = MockInstance::new("test_handle_query").await; @@ -221,18 +436,8 @@ mod test { .encode_to_vec(), }); - let response = instance.inner().do_get(ticket).await.unwrap(); - let result = flight::flight_data_to_object_result(response) - .await - .unwrap(); - let raw_data = result.flight_data; - let mut messages = flight::raw_flight_data_to_message(raw_data).unwrap(); - assert_eq!(messages.len(), 1); - - let message = messages.remove(0); - assert!(matches!(message, FlightMessage::AffectedRows(_))); - let FlightMessage::AffectedRows(affected_rows) = message else { unreachable!() }; - assert_eq!(affected_rows, 2); + let output = boarding(&instance, ticket).await; + assert!(matches!(output, RpcOutput::AffectedRows(2))); let ticket = Request::new(Ticket { ticket: ObjectExpr { diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 8932b8776f..5bf2726823 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -12,19 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::result::AdminResultBuilder; -use api::v1::{admin_expr, AdminExpr, AdminResult, CreateDatabaseExpr, ObjectExpr, ObjectResult}; +use api::v1::{CreateDatabaseExpr, ObjectExpr, ObjectResult}; use arrow_flight::flight_service_server::FlightService; use arrow_flight::Ticket; use async_trait::async_trait; -use common_error::ext::ErrorExt; use common_error::prelude::BoxedError; -use common_error::status_code::StatusCode; use common_grpc::flight; use common_query::Output; use prost::Message; use query::plan::LogicalPlan; -use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler}; +use servers::query_handler::GrpcQueryHandler; use snafu::prelude::*; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::CreateDatabaseRequest; @@ -43,25 +40,11 @@ impl Instance { .context(InvalidFlightDataSnafu) } - async fn execute_create_database( - &self, - create_database_expr: CreateDatabaseExpr, - ) -> AdminResult { + pub(crate) async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { let req = CreateDatabaseRequest { - db_name: create_database_expr.database_name, + db_name: expr.database_name, }; - let result = self.sql_handler.create_database(req).await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } + self.sql_handler().create_database(req).await } pub(crate) async fn execute_logical(&self, plan_bytes: Vec) -> Result { @@ -91,28 +74,3 @@ impl GrpcQueryHandler for Instance { }) } } - -#[async_trait] -impl GrpcAdminHandler for Instance { - async fn exec_admin_request(&self, expr: AdminExpr) -> servers::error::Result { - let admin_resp = match expr.expr { - Some(admin_expr::Expr::CreateTable(create_expr)) => { - self.handle_create(create_expr).await - } - Some(admin_expr::Expr::Alter(alter_expr)) => self.handle_alter(alter_expr).await, - Some(admin_expr::Expr::CreateDatabase(create_database_expr)) => { - self.execute_create_database(create_database_expr).await - } - Some(admin_expr::Expr::DropTable(drop_table_expr)) => { - self.handle_drop_table(drop_table_expr).await - } - other => { - return servers::error::NotSupportedSnafu { - feat: format!("{other:?}"), - } - .fail(); - } - }; - Ok(admin_resp) - } -} diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index cbb2475955..6cf285110d 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -69,7 +69,7 @@ impl Services { }; Ok(Self { - grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime), + grpc_server: GrpcServer::new(instance, grpc_runtime), mysql_server, }) } diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 8327b9e1d9..6de572d1f0 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -12,144 +12,77 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use api::result::AdminResultBuilder; -use api::v1::{AdminResult, AlterExpr, CreateTableExpr, DropTableExpr}; -use common_error::prelude::{ErrorExt, StatusCode}; +use api::v1::{AlterExpr, CreateTableExpr, DropTableExpr}; use common_grpc_expr::{alter_expr_to_request, create_expr_to_request}; use common_query::Output; -use common_telemetry::{error, info}; -use futures::TryFutureExt; +use common_telemetry::info; use session::context::QueryContext; use snafu::prelude::*; use table::requests::DropTableRequest; -use crate::error::{AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu}; +use crate::error::{ + AlterExprToRequestSnafu, BumpTableIdSnafu, CreateExprToRequestSnafu, + IncorrectInternalStateSnafu, Result, +}; use crate::instance::Instance; use crate::sql::SqlRequest; impl Instance { /// Handle gRPC create table requests. - pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> AdminResult { + pub(crate) async fn handle_create(&self, expr: CreateTableExpr) -> Result { + let table_name = format!( + "{}.{}.{}", + expr.catalog_name, expr.schema_name, expr.table_name + ); + + // TODO(LFC): Revisit table id related feature, add more tests. + // Also merge this mod with mod instance::grpc. + // Respect CreateExpr's table id and region ids if present, or allocate table id // from local table id provider and set region id to 0. let table_id = if let Some(table_id) = &expr.table_id { info!( - "Creating table {:?}.{:?}.{:?} with table id from frontend: {}", - expr.catalog_name, expr.schema_name, expr.table_name, table_id.id + "Creating table {table_name} with table id {} from Frontend", + table_id.id ); table_id.id } else { - match self.table_id_provider.as_ref() { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Internal as u32) - .err_msg("Table id provider absent in standalone mode".to_string()) - .build(); - } - Some(table_id_provider) => { - match table_id_provider - .next_table_id() - .await - .context(BumpTableIdSnafu) - { - Ok(table_id) => { - info!( - "Creating table {:?}.{:?}.{:?} with table id from catalog manager: {}", - &expr.catalog_name, &expr.schema_name, expr.table_name, table_id - ); - table_id - } - Err(e) => { - error!(e;"Failed to create table id when creating table: {:?}.{:?}.{:?}", &expr.catalog_name, &expr.schema_name, expr.table_name); - return AdminResultBuilder::default() - .status_code(e.status_code() as u32) - .err_msg(e.to_string()) - .build(); - } - } - } - } + let provider = + self.table_id_provider + .as_ref() + .context(IncorrectInternalStateSnafu { + state: "Table id provider absent in standalone mode", + })?; + let table_id = provider.next_table_id().await.context(BumpTableIdSnafu)?; + info!("Creating table {table_name} with table id {table_id} from TableIdProvider"); + table_id }; - let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu); - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler().execute( - SqlRequest::CreateTable(request), - Arc::new(QueryContext::new()), - ) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - // Unreachable because we are executing "CREATE TABLE"; otherwise it's an internal bug. - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } + let request = create_expr_to_request(table_id, expr).context(CreateExprToRequestSnafu)?; + + self.sql_handler() + .execute(SqlRequest::CreateTable(request), QueryContext::arc()) + .await } - pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> AdminResult { - let request = match alter_expr_to_request(expr) - .context(AlterExprToRequestSnafu) - .transpose() - { - None => { - return AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(0, 0) - .build() - } - Some(req) => req, - }; + pub(crate) async fn handle_alter(&self, expr: AlterExpr) -> Result { + let request = alter_expr_to_request(expr).context(AlterExprToRequestSnafu)?; + let Some(request) = request else { return Ok(Output::AffectedRows(0)) }; - let result = futures::future::ready(request) - .and_then(|request| { - self.sql_handler() - .execute(SqlRequest::Alter(request), Arc::new(QueryContext::new())) - }) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } + self.sql_handler() + .execute(SqlRequest::Alter(request), QueryContext::arc()) + .await } - pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> AdminResult { + pub(crate) async fn handle_drop_table(&self, expr: DropTableExpr) -> Result { let req = DropTableRequest { catalog_name: expr.catalog_name, schema_name: expr.schema_name, table_name: expr.table_name, }; - let result = self - .sql_handler() - .execute(SqlRequest::DropTable(req), Arc::new(QueryContext::new())) - .await; - match result { - Ok(Output::AffectedRows(rows)) => AdminResultBuilder::default() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as _, 0) - .build(), - Ok(Output::Stream(_)) | Ok(Output::RecordBatches(_)) => unreachable!(), - Err(err) => AdminResultBuilder::default() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build(), - } + self.sql_handler() + .execute(SqlRequest::DropTable(req), QueryContext::arc()) + .await } } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 73d586bb27..92df414476 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -232,12 +232,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Failed to create table, source: {}", source))] - CreateTable { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to create database: {}, source: {}", name, source))] CreateDatabase { name: String, @@ -257,12 +251,6 @@ pub enum Error { source: client::Error, }, - #[snafu(display("Failed to alter table on insertion, source: {}", source))] - AlterTableOnInsertion { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to build CreateExpr on insertion: {}", source))] BuildCreateExprOnInsertion { #[snafu(backtrace)] @@ -365,12 +353,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Invalid admin result, source: {}", source))] - InvalidAdminResult { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Cannot find primary key column by name: {}", msg))] PrimaryKeyNotFound { msg: String, backtrace: Backtrace }, @@ -476,9 +458,9 @@ impl ErrorExt for Error { | Error::VectorComputation { source } | Error::ConvertArrowSchema { source } => source.status_code(), - Error::ConnectDatanode { source, .. } - | Error::RequestDatanode { source } - | Error::InvalidAdminResult { source } => source.status_code(), + Error::ConnectDatanode { source, .. } | Error::RequestDatanode { source } => { + source.status_code() + } Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { source.status_code() @@ -513,10 +495,8 @@ impl ErrorExt for Error { Error::BumpTableId { source, .. } => source.status_code(), Error::SchemaNotFound { .. } => StatusCode::InvalidArguments, Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, - Error::CreateTable { source, .. } - | Error::CreateDatabase { source, .. } + Error::CreateDatabase { source, .. } | Error::CreateTableOnInsertion { source, .. } - | Error::AlterTableOnInsertion { source, .. } | Error::Insert { source, .. } => source.status_code(), Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(), Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 2b4bff55c9..b2b7bab0cf 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -20,18 +20,17 @@ mod prometheus; use std::sync::Arc; use std::time::Duration; -use api::result::{ObjectResultBuilder, PROTOCOL_VERSION}; +use api::result::ObjectResultBuilder; use api::v1::alter_expr::Kind; +use api::v1::ddl_request::Expr as DdlExpr; use api::v1::object_expr::Request; use api::v1::{ - admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr, - CreateTableExpr, DropTableExpr, ExprHeader, InsertRequest, ObjectExpr, - ObjectResult as GrpcObjectResult, + AddColumns, AlterExpr, Column, CreateTableExpr, DdlRequest, DropTableExpr, InsertRequest, + ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; -use client::admin::admin_result_to_output; use client::RpcOutput; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; @@ -46,9 +45,9 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::{ - GrpcAdminHandler, GrpcAdminHandlerRef, GrpcQueryHandler, GrpcQueryHandlerRef, - InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, - ScriptHandlerRef, SqlQueryHandler, SqlQueryHandlerRef, + GrpcQueryHandler, GrpcQueryHandlerRef, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, + PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler, + SqlQueryHandlerRef, }; use servers::{error as server_error, Mode}; use session::context::QueryContextRef; @@ -63,20 +62,18 @@ use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ - self, AlterTableOnInsertionSnafu, CatalogSnafu, CreateDatabaseSnafu, CreateTableSnafu, - FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result, + self, CatalogSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, + RequestDatanodeSnafu, Result, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::sql::insert_to_request; -use crate::table::insert::insert_request_to_insert_batch; use crate::table::route::TableRoutes; use crate::Plugins; #[async_trait] pub trait FrontendInstance: - GrpcAdminHandler - + GrpcQueryHandler + GrpcQueryHandler + SqlQueryHandler + OpentsdbProtocolHandler + InfluxdbLineProtocolHandler @@ -107,7 +104,6 @@ pub struct Instance { sql_handler: SqlQueryHandlerRef, grpc_query_handler: GrpcQueryHandlerRef, - grpc_admin_handler: GrpcAdminHandlerRef, /// plugins: this map holds extensions to customize query or auth /// behaviours. @@ -140,8 +136,7 @@ impl Instance { mode: Mode::Distributed, dist_instance: Some(dist_instance), sql_handler: dist_instance_ref.clone(), - grpc_query_handler: dist_instance_ref.clone(), - grpc_admin_handler: dist_instance_ref, + grpc_query_handler: dist_instance_ref, plugins: Default::default(), }) } @@ -185,7 +180,6 @@ impl Instance { dist_instance: None, sql_handler: dn_instance.clone(), grpc_query_handler: dn_instance.clone(), - grpc_admin_handler: dn_instance, plugins: Default::default(), } } @@ -211,40 +205,20 @@ impl Instance { if let Some(v) = &self.dist_instance { v.create_table(&mut expr, partitions).await } else { - let expr = AdminExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(admin_expr::Expr::CreateTable(expr)), - }; let result = self - .grpc_admin_handler - .exec_admin_request(expr) + .grpc_query_handler + .do_query(ObjectExpr { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(expr)), + })), + }) .await .context(error::InvokeGrpcServerSnafu)?; - admin_result_to_output(result).context(CreateTableSnafu) + let output: RpcOutput = result.try_into().context(RequestDatanodeSnafu)?; + Ok(output.into()) } } - /// Handle create database expr. - pub async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { - let database_name = expr.database_name.clone(); - let expr = AdminExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(admin_expr::Expr::CreateDatabase(expr)), - }; - let result = self - .grpc_admin_handler - .exec_admin_request(expr) - .await - .context(error::InvokeGrpcServerSnafu)?; - admin_result_to_output(result).context(CreateDatabaseSnafu { - name: database_name, - }) - } - /// Handle batch inserts pub async fn handle_inserts(&self, requests: Vec) -> Result { let mut success = 0; @@ -369,18 +343,17 @@ impl Instance { kind: Some(Kind::AddColumns(add_columns)), }; - let expr = AdminExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(admin_expr::Expr::Alter(expr)), - }; let result = self - .grpc_admin_handler - .exec_admin_request(expr) + .grpc_query_handler + .do_query(ObjectExpr { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })), + }) .await .context(error::InvokeGrpcServerSnafu)?; - admin_result_to_output(result).context(AlterTableOnInsertionSnafu) + let output: RpcOutput = result.try_into().context(RequestDatanodeSnafu)?; + Ok(output.into()) } fn get_catalog(&self, catalog_name: &str) -> Result { @@ -430,19 +403,6 @@ impl Instance { .context(error::TableSnafu) } - fn stmt_to_insert_batch( - &self, - catalog: &str, - schema: &str, - insert: Box, - ) -> Result<(Vec, u32)> { - let catalog_provider = self.get_catalog(catalog)?; - let schema_provider = Self::get_schema(catalog_provider, schema)?; - - let insert_request = insert_to_request(&schema_provider, *insert)?; - insert_request_to_insert_batch(&insert_request) - } - fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { ensure!( self.catalog_manager @@ -486,7 +446,7 @@ impl Instance { ) -> server_error::Result { // TODO(sunng87): provide a better form to log or track statement let query = &format!("{:?}", &stmt); - match stmt { + match stmt.clone() { Statement::CreateDatabase(_) | Statement::ShowDatabases(_) | Statement::CreateTable(_) @@ -498,27 +458,7 @@ impl Instance { } Statement::Insert(insert) => match self.mode { Mode::Standalone => { - let (catalog_name, schema_name, table_name) = insert - .full_table_name() - .context(error::ParseSqlSnafu) - .map_err(BoxedError::new) - .context(server_error::ExecuteInsertSnafu { - msg: "Failed to get table name", - })?; - - let (columns, row_count) = self - .stmt_to_insert_batch(&catalog_name, &schema_name, insert) - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - - let request = InsertRequest { - schema_name, - table_name, - region_number: 0, - columns, - row_count, - }; - self.handle_insert(request).await + return self.sql_handler.do_statement_query(stmt, query_ctx).await } Mode::Distributed => { let affected = self @@ -535,14 +475,19 @@ impl Instance { let expr = AlterExpr::try_from(alter_stmt) .map_err(BoxedError::new) .context(server_error::ExecuteAlterSnafu { query })?; - let expr = AdminExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(admin_expr::Expr::Alter(expr)), - }; - let result = self.grpc_admin_handler.exec_admin_request(expr).await?; - admin_result_to_output(result).context(error::InvalidAdminResultSnafu) + let result = self + .grpc_query_handler + .do_query(ObjectExpr { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(expr)), + })), + }) + .await?; + let output: RpcOutput = result + .try_into() + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + Ok(output.into()) } Statement::DropTable(drop_stmt) => { let expr = DropTableExpr { @@ -550,14 +495,19 @@ impl Instance { schema_name: drop_stmt.schema_name, table_name: drop_stmt.table_name, }; - let expr = AdminExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(admin_expr::Expr::DropTable(expr)), - }; - let result = self.grpc_admin_handler.exec_admin_request(expr).await?; - admin_result_to_output(result).context(error::InvalidAdminResultSnafu) + let result = self + .grpc_query_handler + .do_query(ObjectExpr { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::DropTable(expr)), + })), + }) + .await?; + let output: RpcOutput = result + .try_into() + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + Ok(output.into()) } Statement::ShowCreateTable(_) => { return server_error::NotSupportedSnafu { feat: query }.fail(); @@ -699,29 +649,15 @@ impl GrpcQueryHandler for Instance { } } -#[async_trait] -impl GrpcAdminHandler for Instance { - async fn exec_admin_request(&self, mut expr: AdminExpr) -> server_error::Result { - // Force the default to be `None` rather than `Some(0)` comes from gRPC decode. - // Related issue: #480 - if let Some(api::v1::admin_expr::Expr::CreateTable(create)) = &mut expr.expr { - create.table_id = None; - } - self.grpc_admin_handler.exec_admin_request(expr).await - } -} - #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; use std::borrow::Cow; use std::iter; use std::sync::atomic::AtomicU32; use api::v1::column::SemanticType; use api::v1::{ - admin_expr, admin_result, column, query_request, Column, ColumnDataType, - ColumnDef as GrpcColumnDef, ExprHeader, MutateResult, QueryRequest, + column, query_request, Column, ColumnDataType, ColumnDef as GrpcColumnDef, QueryRequest, }; use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; use common_recordbatch::RecordBatch; @@ -879,21 +815,18 @@ mod tests { }; // create - let create_expr = create_expr(); - let admin_expr = AdminExpr { - header: Some(ExprHeader::default()), - expr: Some(admin_expr::Expr::CreateTable(create_expr)), - }; - let result = GrpcAdminHandler::exec_admin_request(&*instance, admin_expr) - .await - .unwrap(); - assert_matches!( - result.result, - Some(admin_result::Result::Mutate(MutateResult { - success: 1, - failure: 0 - })) - ); + let result = GrpcQueryHandler::do_query( + &*instance, + ObjectExpr { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(create_expr())), + })), + }, + ) + .await + .unwrap(); + let output: RpcOutput = result.try_into().unwrap(); + assert!(matches!(output, RpcOutput::AffectedRows(1))); // insert let columns = vec![ diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 31657c0f29..63fa90c857 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -16,18 +16,18 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::result::AdminResultBuilder; -use api::v1::{ - admin_expr, AdminExpr, AdminResult, AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr, - ObjectResult, TableId, -}; +use api::result::ObjectResultBuilder; +use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::object_expr::Request as GrpcRequest; +use api::v1::{AlterExpr, CreateDatabaseExpr, CreateTableExpr, ObjectExpr, ObjectResult, TableId}; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use catalog::{CatalogList, CatalogManager}; use chrono::DateTime; -use client::admin::{admin_result_to_output, Admin}; +use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; +use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; @@ -40,7 +40,7 @@ use meta_client::rpc::{ use query::sql::{describe_table, explain, show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error as server_error; -use servers::query_handler::{GrpcAdminHandler, GrpcQueryHandler, SqlQueryHandler}; +use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::Value as SqlValue; @@ -53,8 +53,8 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ self, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, ColumnDataTypeSnafu, - PrimaryKeyNotFoundSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, - TableNotFoundSnafu, + PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, + StartMetaClientSnafu, TableNotFoundSnafu, }; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; use crate::instance::parse_stmt; @@ -120,7 +120,7 @@ impl DistInstance { for datanode in table_route.find_leaders() { let client = self.datanode_clients.get_client(&datanode).await; - let client = Admin::new("greptime", client); + let client = Database::new("greptime", client); let regions = table_route.find_leader_regions(&datanode); let mut create_expr_for_region = create_table.clone(); @@ -134,8 +134,7 @@ impl DistInstance { client .create(create_expr_for_region) .await - .and_then(admin_result_to_output) - .context(error::InvalidAdminResultSnafu)?; + .context(RequestDatanodeSnafu)?; } // Checked in real MySQL, it truly returns "0 rows affected". @@ -221,7 +220,7 @@ impl DistInstance { Ok(()) } - async fn handle_alter_table(&self, expr: AlterExpr) -> Result { + async fn handle_alter_table(&self, expr: AlterExpr) -> Result<()> { let catalog_name = if expr.catalog_name.is_empty() { DEFAULT_CATALOG_NAME } else { @@ -253,8 +252,7 @@ impl DistInstance { .as_any() .downcast_ref::() .expect("Table impl must be DistTable in distributed mode"); - dist_table.alter_by_expr(expr).await?; - Ok(AdminResultBuilder::default().mutate_result(0, 0).build()) + dist_table.alter_by_expr(expr).await } async fn create_table_in_meta( @@ -368,32 +366,33 @@ impl SqlQueryHandler for DistInstance { #[async_trait] impl GrpcQueryHandler for DistInstance { - async fn do_query(&self, _: ObjectExpr) -> server_error::Result { - unimplemented!() - } -} - -#[async_trait] -impl GrpcAdminHandler for DistInstance { - async fn exec_admin_request(&self, query: AdminExpr) -> server_error::Result { - let expr = query - .clone() - .expr - .context(server_error::InvalidQuerySnafu { - reason: "empty expr", - })?; - match expr { - admin_expr::Expr::CreateDatabase(create_database) => self - .handle_create_database(create_database) - .await - .map(|_| AdminResultBuilder::default().mutate_result(1, 0).build()), - admin_expr::Expr::Alter(alter) => self.handle_alter_table(alter).await, - _ => unimplemented!(), + async fn do_query(&self, expr: ObjectExpr) -> server_error::Result { + let request = expr.request.context(server_error::InvalidQuerySnafu { + reason: "empty expr", + })?; + match request { + GrpcRequest::Ddl(request) => { + let expr = request.expr.context(server_error::InvalidQuerySnafu { + reason: "empty DDL expr", + })?; + match expr.clone() { + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await, + DdlExpr::Alter(expr) => self.handle_alter_table(expr).await, + DdlExpr::CreateTable(_) | DdlExpr::DropTable(_) => unimplemented!(), + } + .map_err(BoxedError::new) + .with_context(|_| server_error::ExecuteQuerySnafu { + query: format!("{expr:?}"), + })?; + Ok(ObjectResultBuilder::new() + .flight_data(vec![ + FlightEncoder::default().encode(FlightMessage::AffectedRows(1)) + ]) + .build()) + } + // TODO(LFC): Implement Flight for DistInstance. + GrpcRequest::Query(_) | GrpcRequest::Insert(_) => unimplemented!(), } - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { - query: format!("{query:?}"), - }) } } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 52d72d1b3d..393a9488e0 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -169,7 +169,8 @@ impl PrometheusProtocolHandler for Instance { mod tests { use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, LabelMatcher, Sample}; - use api::v1::CreateDatabaseExpr; + use servers::query_handler::SqlQueryHandler; + use session::context::QueryContext; use super::*; use crate::tests; @@ -187,12 +188,12 @@ mod tests { let db = "prometheus"; - instance - .handle_create_database(CreateDatabaseExpr { - database_name: db.to_string(), - }) + assert!(instance + .do_query("CREATE DATABASE prometheus", QueryContext::arc()) .await - .unwrap(); + .get(0) + .unwrap() + .is_ok()); instance.write(db, write_request).await.unwrap(); diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index d3c55b8c97..9636a5318f 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -56,7 +56,7 @@ impl Services { .context(error::RuntimeResourceSnafu)?, ); - let grpc_server = GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime); + let grpc_server = GrpcServer::new(instance.clone(), grpc_runtime); Some((Box::new(grpc_server) as _, grpc_addr)) } else { diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 7d63d7e47b..25a78a6979 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use api::v1::AlterExpr; use async_trait::async_trait; -use client::admin::Admin; use client::{Database, RpcOutput}; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::error::Result as QueryResult; @@ -367,15 +366,12 @@ impl DistTable { } ); for datanode in leaders { - let admin = Admin::new( + let db = Database::new( DEFAULT_CATALOG_NAME, self.datanode_clients.get_client(&datanode).await, ); - debug!("Sent alter table {:?} to {:?}", expr, admin); - let result = admin - .alter(expr.clone()) - .await - .context(RequestDatanodeSnafu)?; + debug!("Sending {:?} to {:?}", expr, db); + let result = db.alter(expr.clone()).await.context(RequestDatanodeSnafu)?; debug!("Alter table result: {:?}", result); // TODO(hl): We should further check and track alter result in some global DDL task tracker } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index d6a074373b..4481a809fd 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -93,8 +93,7 @@ pub(crate) async fn create_datanode_client( // create a mock datanode grpc service, see example here: // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs - let datanode_service = - GrpcServer::new(datanode_instance.clone(), datanode_instance, runtime).create_service(); + let datanode_service = GrpcServer::new(datanode_instance, runtime).create_service(); tokio::spawn(async move { Server::builder() .add_service(datanode_service) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 5d56ddd36b..cda3dc65c1 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -31,25 +31,19 @@ use tonic::{Request, Response, Status}; use crate::error::{self, AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; use crate::grpc::handler::BatchHandler; -use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; +use crate::query_handler::GrpcQueryHandlerRef; use crate::server::Server; pub struct GrpcServer { query_handler: GrpcQueryHandlerRef, - admin_handler: GrpcAdminHandlerRef, shutdown_tx: Mutex>>, runtime: Arc, } impl GrpcServer { - pub fn new( - query_handler: GrpcQueryHandlerRef, - admin_handler: GrpcAdminHandlerRef, - runtime: Arc, - ) -> Self { + pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { Self { query_handler, - admin_handler, shutdown_tx: Mutex::new(None), runtime, } @@ -57,11 +51,7 @@ impl GrpcServer { pub fn create_service(&self) -> greptime_server::GreptimeServer { let service = GrpcService { - handler: BatchHandler::new( - self.query_handler.clone(), - self.admin_handler.clone(), - self.runtime.clone(), - ), + handler: BatchHandler::new(self.query_handler.clone(), self.runtime.clone()), }; greptime_server::GreptimeServer::new(service) } diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs index a087f76aaf..bd624c4cae 100644 --- a/src/servers/src/grpc/handler.rs +++ b/src/servers/src/grpc/handler.rs @@ -14,29 +14,23 @@ use std::sync::Arc; -use api::v1::{AdminResponse, BatchRequest, BatchResponse, DatabaseResponse}; +use api::v1::{BatchRequest, BatchResponse, DatabaseResponse}; use common_runtime::Runtime; use tokio::sync::oneshot; use crate::error::Result; -use crate::query_handler::{GrpcAdminHandlerRef, GrpcQueryHandlerRef}; +use crate::query_handler::GrpcQueryHandlerRef; #[derive(Clone)] pub struct BatchHandler { query_handler: GrpcQueryHandlerRef, - admin_handler: GrpcAdminHandlerRef, runtime: Arc, } impl BatchHandler { - pub fn new( - query_handler: GrpcQueryHandlerRef, - admin_handler: GrpcAdminHandlerRef, - runtime: Arc, - ) -> Self { + pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { Self { query_handler, - admin_handler, runtime, } } @@ -44,23 +38,11 @@ impl BatchHandler { pub async fn batch(&self, batch_req: BatchRequest) -> Result { let (tx, rx) = oneshot::channel(); let query_handler = self.query_handler.clone(); - let admin_handler = self.admin_handler.clone(); let future = async move { let mut batch_resp = BatchResponse::default(); - let mut admin_resp = AdminResponse::default(); let mut db_resp = DatabaseResponse::default(); - for admin_req in batch_req.admins { - admin_resp.results.reserve(admin_req.exprs.len()); - - for admin_expr in admin_req.exprs { - let admin_result = admin_handler.exec_admin_request(admin_expr).await?; - admin_resp.results.push(admin_result); - } - } - batch_resp.admins.push(admin_resp); - for db_req in batch_req.databases { db_resp.results.reserve(db_req.exprs.len()); @@ -77,8 +59,7 @@ impl BatchHandler { // Executes requests in another runtime to // 1. prevent the execution from being cancelled unexpected by tonic runtime. - // 2. avoid the handler blocks the gRPC runtime because `exec_admin_request` may block - // the caller thread. + // 2. avoid the handler blocks the gRPC runtime self.runtime.spawn(async move { let result = future.await; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 2c7d47ce40..0e2e8d2cf8 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::prometheus::remote::{ReadRequest, WriteRequest}; -use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult}; +use api::v1::{ObjectExpr, ObjectResult}; use async_trait::async_trait; use common_query::Output; use session::context::QueryContextRef; @@ -38,7 +38,6 @@ use crate::prometheus::Metrics; pub type SqlQueryHandlerRef = Arc; pub type GrpcQueryHandlerRef = Arc; -pub type GrpcAdminHandlerRef = Arc; pub type OpentsdbProtocolHandlerRef = Arc; pub type InfluxdbLineProtocolHandlerRef = Arc; pub type PrometheusProtocolHandlerRef = Arc; @@ -69,11 +68,6 @@ pub trait GrpcQueryHandler { async fn do_query(&self, query: ObjectExpr) -> Result; } -#[async_trait] -pub trait GrpcAdminHandler { - async fn exec_admin_request(&self, expr: AdminExpr) -> Result; -} - #[async_trait] pub trait InfluxdbLineProtocolHandler { /// A successful request will not return a response. diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 67feb62269..4677305bfd 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -277,11 +277,7 @@ pub async fn setup_grpc_server( let fe_instance = frontend::instance::Instance::new_standalone(instance.clone()); let fe_instance_ref = Arc::new(fe_instance); - let fe_grpc_server = Arc::new(GrpcServer::new( - fe_instance_ref.clone(), - fe_instance_ref, - runtime, - )); + let fe_grpc_server = Arc::new(GrpcServer::new(fe_instance_ref, runtime)); let grpc_server_clone = fe_grpc_server.clone(); let fe_grpc_addr_clone = fe_grpc_addr.clone(); diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 724d1b53ec..19987d2be2 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -14,10 +14,9 @@ use api::v1::alter_expr::Kind; use api::v1::column::SemanticType; use api::v1::{ - admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateTableExpr, InsertRequest, MutateResult, TableId, + column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, + InsertRequest, TableId, }; -use client::admin::Admin; use client::{Client, Database, RpcOutput}; use common_catalog::consts::MIN_USER_TABLE_ID; use servers::server::Server; @@ -133,18 +132,11 @@ pub async fn test_insert_and_select(store_type: StorageType) { let grpc_client = Client::with_urls(vec![addr]); let db = Database::new("greptime", grpc_client.clone()); - let admin = Admin::new("greptime", grpc_client); // create let expr = testing_create_expr(); - let result = admin.create(expr).await.unwrap(); - assert!(matches!( - result.result, - Some(admin_result::Result::Mutate(MutateResult { - success: 1, - failure: 0 - })) - )); + let result = db.create(expr).await.unwrap(); + assert!(matches!(result, RpcOutput::AffectedRows(1))); //alter let add_column = ColumnDef { @@ -160,13 +152,13 @@ pub async fn test_insert_and_select(store_type: StorageType) { }], }); let expr = AlterExpr { - table_name: "test_table".to_string(), + table_name: "demo".to_string(), catalog_name: "".to_string(), schema_name: "".to_string(), kind: Some(kind), }; - let result = admin.alter(expr).await.unwrap(); - assert!(result.result.is_none()); + let result = db.alter(expr).await.unwrap(); + assert!(matches!(result, RpcOutput::AffectedRows(0))); // insert insert_and_assert(&db).await; @@ -205,7 +197,10 @@ async fn insert_and_assert(db: &Database) { assert!(matches!(result, RpcOutput::AffectedRows(2))); // select - let result = db.sql("SELECT * FROM demo").await.unwrap(); + let result = db + .sql("SELECT host, cpu, memory, ts FROM demo") + .await + .unwrap(); match result { RpcOutput::RecordBatches(recordbatches) => { let pretty = recordbatches.pretty_print().unwrap(); diff --git a/tests/cases/standalone/basic.result b/tests/cases/standalone/basic.result index 229da9c61f..f62bdf242d 100644 --- a/tests/cases/standalone/basic.result +++ b/tests/cases/standalone/basic.result @@ -9,16 +9,12 @@ CREATE TABLE system_metrics ( TIME INDEX(ts) ); -MutateResult { success: 1, failure: 0 } - INSERT INTO system_metrics VALUES ("host1", "idc_a", 11.8, 10.3, 10.3, 1667446797450), ("host2", "idc_a", 80.1, 70.3, 90.0, 1667446797450), ("host1", "idc_b", 50.0, 66.7, 40.6, 1667446797450); -MutateResult { success: 3, failure: 0 } - SELECT * FROM system_metrics; +-----------------------+----------------------+----------------------------+-------------------------------+-----------------------------+----------------------------+ @@ -55,6 +51,3 @@ SELECT idc, avg(memory_util) FROM system_metrics GROUP BY idc ORDER BY idc; +----------------------+---------------------------------------------------+ DROP TABLE system_metrics; - -MutateResult { success: 1, failure: 0 } -