diff --git a/Cargo.lock b/Cargo.lock index 0dc9f197d2..b68e5bd510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1308,6 +1308,7 @@ name = "client" version = "0.1.0" dependencies = [ "api", + "arrow-flight", "async-stream", "common-base", "common-error", @@ -1320,7 +1321,9 @@ dependencies = [ "datanode", "datatypes", "enum_dispatch", + "futures-util", "parking_lot", + "prost 0.11.3", "prost 0.9.0", "rand 0.8.5", "snafu", @@ -2144,7 +2147,6 @@ name = "datanode" version = "0.1.0" dependencies = [ "api", - "arrow-flight", "async-stream", "async-trait", "axum", @@ -2611,7 +2613,6 @@ version = "0.1.0" dependencies = [ "anymap", "api", - "arrow-flight", "async-stream", "async-trait", "catalog", @@ -6581,6 +6582,7 @@ version = "0.1.0" dependencies = [ "aide", "api", + "arrow-flight", "async-trait", "axum", "axum-macros", @@ -6592,6 +6594,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-grpc-expr", "common-query", "common-recordbatch", "common-runtime", @@ -6612,6 +6615,7 @@ dependencies = [ "openmetrics-parser", "opensrv-mysql", "pgwire", + "pin-project", "prost 0.11.3", "query", "rand 0.8.5", @@ -6637,7 +6641,6 @@ dependencies = [ "tokio-stream", "tokio-test", "tonic", - "tonic-reflection", "tower", "tower-http", ] @@ -6893,6 +6896,7 @@ dependencies = [ "client", "common-base", "common-grpc", + "common-query", "sqlness", "tokio", ] @@ -7327,6 +7331,7 @@ dependencies = [ "common-catalog", "common-error", "common-grpc", + "common-query", "common-runtime", "common-telemetry", "datanode", @@ -7704,21 +7709,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tonic-reflection" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0455f730d540a1484bffc3c55c94100b18a662597b982c2e9073f2c55c602616" -dependencies = [ - "bytes", - "prost 0.11.3", - "prost-types 0.11.2", - "tokio", - "tokio-stream", - "tonic", - "tonic-build", -] - [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index 494a95bb9b..fc458a195d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,8 +58,10 @@ datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" } datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4917235a398ae20145c87d20984e6367dc1a0c1e" } futures = "0.3" +futures-util = "0.3" parquet = "29.0" paste = "1.0" +prost = "0.11" serde = { version = "1.0", features = ["derive"] } snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.28" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index af0b807590..7678f30d5a 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -prost = "0.11" +prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/api/build.rs b/src/api/build.rs index b913d685b2..913d4d1519 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -12,15 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; - fn main() { - let default_out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap()); tonic_build::configure() - .file_descriptor_set_path(default_out_dir.join("greptime_fd.bin")) .compile( &[ - "greptime/v1/greptime.proto", + "greptime/v1/database.proto", "greptime/v1/meta/common.proto", "greptime/v1/meta/heartbeat.proto", "greptime/v1/meta/route.proto", diff --git a/src/api/greptime/v1/common.proto b/src/api/greptime/v1/common.proto deleted file mode 100644 index 8fd4601272..0000000000 --- a/src/api/greptime/v1/common.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax = "proto3"; - -package greptime.v1; - -message RequestHeader { - string tenant = 1; -} - -message ResultHeader { - uint32 version = 1; - uint32 code = 2; - string err_msg = 3; -} diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 7e50deaa33..8f15925246 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -4,18 +4,8 @@ package greptime.v1; import "greptime/v1/ddl.proto"; import "greptime/v1/column.proto"; -import "greptime/v1/common.proto"; -message DatabaseRequest { - string name = 1; - repeated ObjectExpr exprs = 2; -} - -message DatabaseResponse { - repeated ObjectResult results = 1; -} - -message ObjectExpr { +message GreptimeRequest { oneof request { InsertRequest insert = 1; QueryRequest query = 2; @@ -46,11 +36,6 @@ message InsertRequest { uint32 region_number = 5; } -message ObjectResult { - ResultHeader header = 1; - repeated bytes flight_data = 2; -} - message FlightDataExt { uint32 affected_rows = 1; } diff --git a/src/api/greptime/v1/ddl.proto b/src/api/greptime/v1/ddl.proto index 5c252e7b0f..c295ca45a2 100644 --- a/src/api/greptime/v1/ddl.proto +++ b/src/api/greptime/v1/ddl.proto @@ -3,7 +3,6 @@ syntax = "proto3"; package greptime.v1; import "greptime/v1/column.proto"; -import "greptime/v1/common.proto"; // "Data Definition Language" requests, that create, modify or delete the database structures but not the data. // `DdlRequest` could carry more information than plain SQL, for example, the "table_id" in `CreateTableExpr`. diff --git a/src/api/greptime/v1/greptime.proto b/src/api/greptime/v1/greptime.proto deleted file mode 100644 index 7add1086d3..0000000000 --- a/src/api/greptime/v1/greptime.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; - -package greptime.v1; - -import "greptime/v1/common.proto"; -import "greptime/v1/database.proto"; - -service Greptime { - rpc Batch(BatchRequest) returns (BatchResponse) {} -} - -message BatchRequest { - RequestHeader header = 1; - repeated DatabaseRequest databases = 2; -} - -message BatchResponse { - repeated DatabaseResponse databases = 1; -} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 9624b5b96a..77f58b383c 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -15,7 +15,6 @@ pub mod error; pub mod helper; pub mod prometheus; -pub mod result; pub mod serde; pub mod v1; diff --git a/src/api/src/result.rs b/src/api/src/result.rs deleted file mode 100644 index 7b6c311bc3..0000000000 --- a/src/api/src/result.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use arrow_flight::FlightData; -use prost::Message; - -use crate::v1::{ObjectResult, ResultHeader}; - -pub const PROTOCOL_VERSION: u32 = 1; - -#[derive(Default)] -pub struct ObjectResultBuilder { - version: u32, - code: u32, - err_msg: Option, - flight_data: Option>, -} - -impl ObjectResultBuilder { - pub fn new() -> Self { - Self { - version: PROTOCOL_VERSION, - ..Default::default() - } - } - - #[allow(dead_code)] - pub fn version(mut self, version: u32) -> Self { - self.version = version; - self - } - - pub fn status_code(mut self, code: u32) -> Self { - self.code = code; - self - } - - pub fn err_msg(mut self, err_msg: String) -> Self { - self.err_msg = Some(err_msg); - self - } - - pub fn flight_data(mut self, flight_data: Vec) -> Self { - self.flight_data = Some(flight_data); - self - } - - pub fn build(self) -> ObjectResult { - let header = Some(ResultHeader { - version: self.version, - code: self.code, - err_msg: self.err_msg.unwrap_or_default(), - }); - - let flight_data = if let Some(flight_data) = self.flight_data { - flight_data - .into_iter() - .map(|x| x.encode_to_vec()) - .collect::>>() - } else { - vec![] - }; - ObjectResult { - header, - flight_data, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_object_result_builder() { - let obj_result = ObjectResultBuilder::new() - .version(101) - .status_code(500) - .err_msg("Failed to read this file!".to_string()) - .build(); - let header = obj_result.header.unwrap(); - assert_eq!(101, header.version); - assert_eq!(500, header.code); - assert_eq!("Failed to read this file!", header.err_msg); - } -} diff --git a/src/api/src/v1.rs b/src/api/src/v1.rs index dcf7927edd..078733fd1e 100644 --- a/src/api/src/v1.rs +++ b/src/api/src/v1.rs @@ -15,7 +15,5 @@ #![allow(clippy::derive_partial_eq_without_eq)] tonic::include_proto!("greptime.v1"); -pub const GREPTIME_FD_SET: &[u8] = tonic::include_file_descriptor_set!("greptime_fd"); - mod column_def; pub mod meta; diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 73bb2990d9..5aecb1f2bf 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -21,7 +21,7 @@ common-time = { path = "../common/time" } datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" -futures-util = "0.3" +futures-util.workspace = true lazy_static = "1.4" meta-client = { path = "../meta-client" } regex = "1.6" diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 2a69e7297c..ac83eb2610 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] api = { path = "../api" } +arrow-flight.workspace = true async-stream.workspace = true common-base = { path = "../common/base" } common-error = { path = "../common/error" } @@ -17,7 +18,9 @@ common-time = { path = "../common/time" } datafusion.workspace = true datatypes = { path = "../datatypes" } enum_dispatch = "0.3" +futures-util.workspace = true parking_lot = "0.12" +prost.workspace = true rand = "0.8" snafu.workspace = true tonic.workspace = true diff --git a/src/client/src/client.rs b/src/client/src/client.rs index 9cd053c746..ca09fe047d 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -14,8 +14,7 @@ use std::sync::Arc; -use api::v1::greptime_client::GreptimeClient; -use api::v1::*; +use arrow_flight::flight_service_client::FlightServiceClient; use common_grpc::channel_manager::ChannelManager; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt}; @@ -24,6 +23,21 @@ use tonic::transport::Channel; use crate::load_balance::{LoadBalance, Loadbalancer}; use crate::{error, Result}; +pub(crate) struct FlightClient { + addr: String, + client: FlightServiceClient, +} + +impl FlightClient { + pub(crate) fn addr(&self) -> &str { + &self.addr + } + + pub(crate) fn mut_inner(&mut self) -> &mut FlightServiceClient { + &mut self.client + } +} + #[derive(Clone, Debug, Default)] pub struct Client { inner: Arc, @@ -104,43 +118,23 @@ impl Client { self.inner.set_peers(urls); } - pub async fn database(&self, req: DatabaseRequest) -> Result { - let req = BatchRequest { - databases: vec![req], - ..Default::default() - }; - - let mut res = self.batch(req).await?; - res.databases.pop().context(error::MissingResultSnafu { - name: "database", - expected: 1_usize, - actual: 0_usize, - }) - } - - pub async fn batch(&self, req: BatchRequest) -> Result { - let peer = self + pub(crate) fn make_client(&self) -> Result { + let addr = self .inner .get_peer() .context(error::IllegalGrpcClientStateSnafu { err_msg: "No available peer found", })?; - let mut client = self.make_client(&peer)?; - let result = client - .batch(req) - .await - .context(error::TonicStatusSnafu { addr: peer })?; - Ok(result.into_inner()) - } - fn make_client(&self, addr: impl AsRef) -> Result> { - let addr = addr.as_ref(); let channel = self .inner .channel_manager - .get(addr) - .context(error::CreateChannelSnafu { addr })?; - Ok(GreptimeClient::new(channel)) + .get(&addr) + .context(error::CreateChannelSnafu { addr: &addr })?; + Ok(FlightClient { + addr, + client: FlightServiceClient::new(channel), + }) } } diff --git a/src/client/src/database.rs b/src/client/src/database.rs index e48041e002..2fba696e52 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -13,19 +13,21 @@ // limitations under the License. use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_request::Request; +use api::v1::query_request::Query; use api::v1::{ - object_expr, query_request, AlterExpr, CreateTableExpr, DatabaseRequest, DdlRequest, - DropTableExpr, InsertRequest, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest, -}; -use common_error::status_code::StatusCode; -use common_grpc::flight::{ - flight_messages_to_recordbatches, raw_flight_data_to_message, FlightMessage, + AlterExpr, CreateTableExpr, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest, + QueryRequest, }; +use arrow_flight::{FlightData, Ticket}; +use common_error::prelude::*; +use common_grpc::flight::{flight_messages_to_recordbatches, FlightDecoder, FlightMessage}; use common_query::Output; -use common_recordbatch::RecordBatches; -use snafu::{ensure, OptionExt, ResultExt}; +use futures_util::{TryFutureExt, TryStreamExt}; +use prost::Message; +use snafu::{ensure, ResultExt}; -use crate::error::{ConvertFlightDataSnafu, DatanodeSnafu, IllegalFlightMessagesSnafu}; +use crate::error::{ConvertFlightDataSnafu, IllegalFlightMessagesSnafu}; use crate::{error, Client, Result}; #[derive(Clone, Debug)] @@ -46,112 +48,87 @@ impl Database { &self.name } - pub async fn insert(&self, request: InsertRequest) -> Result { - let expr = ObjectExpr { - request: Some(object_expr::Request::Insert(request)), - }; - self.object(expr).await?.try_into() + pub async fn insert(&self, request: InsertRequest) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Insert(request)), + }) + .await } - pub async fn sql(&self, sql: &str) -> Result { - let query = QueryRequest { - query: Some(query_request::Query::Sql(sql.to_string())), - }; - self.do_query(query).await + pub async fn sql(&self, sql: &str) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql(sql.to_string())), + })), + }) + .await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - let query = QueryRequest { - query: Some(query_request::Query::LogicalPlan(logical_plan)), - }; - self.do_query(query).await + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::LogicalPlan(logical_plan)), + })), + }) + .await } - async fn do_query(&self, request: QueryRequest) -> Result { - let expr = ObjectExpr { - request: Some(object_expr::Request::Query(request)), - }; - - let obj_result = self.object(expr).await?; - obj_result.try_into() - } - - pub async fn create(&self, expr: CreateTableExpr) -> Result { - let expr = ObjectExpr { - request: Some(object_expr::Request::Ddl(DdlRequest { + pub async fn create(&self, expr: CreateTableExpr) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), })), - }; - self.object(expr).await?.try_into() + }) + .await } - pub async fn alter(&self, expr: AlterExpr) -> Result { - let expr = ObjectExpr { - request: Some(object_expr::Request::Ddl(DdlRequest { + pub async fn alter(&self, expr: AlterExpr) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::Alter(expr)), })), - }; - self.object(expr).await?.try_into() + }) + .await } - pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - let expr = ObjectExpr { - request: Some(object_expr::Request::Ddl(DdlRequest { + pub async fn drop_table(&self, expr: DropTableExpr) -> Result { + self.do_get(GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::DropTable(expr)), })), - }; - self.object(expr).await?.try_into() + }) + .await } - pub async fn object(&self, expr: ObjectExpr) -> Result { - let res = self.objects(vec![expr]).await?.pop().unwrap(); - Ok(res) - } + async fn do_get(&self, request: GreptimeRequest) -> Result { + let mut client = self.client.make_client()?; - async fn objects(&self, exprs: Vec) -> Result> { - let expr_count = exprs.len(); - let req = DatabaseRequest { - name: self.name.clone(), - exprs, - }; + // TODO(LFC): Streaming get flight data. + let flight_data: Vec = client + .mut_inner() + .do_get(Ticket { + ticket: request.encode_to_vec(), + }) + .and_then(|response| response.into_inner().try_collect()) + .await + .map_err(|e| { + let code = get_metadata_value(&e, INNER_ERROR_CODE) + .and_then(|s| s.parse::().ok()) + .unwrap_or(e.code() as u32); + let err_msg = get_metadata_value(&e, INNER_ERROR_MSG).unwrap_or(e.to_string()); + error::FlightGetSnafu { + addr: client.addr(), + code, + err_msg, + } + .build() + })?; - let res = self.client.database(req).await?; - let res = res.results; - - ensure!( - res.len() == expr_count, - error::MissingResultSnafu { - name: "object_results", - expected: expr_count, - actual: res.len(), - } - ); - - Ok(res) - } -} - -#[derive(Debug)] -pub enum RpcOutput { - RecordBatches(RecordBatches), - AffectedRows(usize), -} - -impl TryFrom for RpcOutput { - type Error = error::Error; - - fn try_from(object_result: api::v1::ObjectResult) -> std::result::Result { - let header = object_result.header.context(error::MissingHeaderSnafu)?; - if !StatusCode::is_success(header.code) { - return DatanodeSnafu { - code: header.code, - msg: header.err_msg, - } - .fail(); - } - - let flight_messages = raw_flight_data_to_message(object_result.flight_data) - .context(ConvertFlightDataSnafu)?; + let decoder = &mut FlightDecoder::default(); + let flight_messages = flight_data + .into_iter() + .map(|x| decoder.try_decode(x).context(ConvertFlightDataSnafu)) + .collect::>>()?; let output = if let Some(FlightMessage::AffectedRows(rows)) = flight_messages.get(0) { ensure!( @@ -160,23 +137,20 @@ impl TryFrom for RpcOutput { reason: "Expect 'AffectedRows' Flight messages to be one and only!" } ); - RpcOutput::AffectedRows(*rows) + Output::AffectedRows(*rows) } else { let recordbatches = flight_messages_to_recordbatches(flight_messages) .context(ConvertFlightDataSnafu)?; - RpcOutput::RecordBatches(recordbatches) + Output::RecordBatches(recordbatches) }; Ok(output) } } -impl From for Output { - fn from(value: RpcOutput) -> Self { - match value { - RpcOutput::AffectedRows(x) => Output::AffectedRows(x), - RpcOutput::RecordBatches(x) => Output::RecordBatches(x), - } - } +fn get_metadata_value(e: &tonic::Status, key: &str) -> Option { + e.metadata() + .get(key) + .and_then(|v| String::from_utf8(v.as_bytes().to_vec()).ok()) } #[cfg(test)] diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 773862624e..4392709e93 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -25,26 +25,19 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Missing {}, expected {}, actual {}", name, expected, actual))] - MissingResult { - name: String, - expected: usize, - actual: usize, - }, - - #[snafu(display("Missing result header"))] - MissingHeader, - - #[snafu(display("Tonic internal error, addr: {}, source: {}", addr, source))] - TonicStatus { + #[snafu(display( + "Failed to do Flight get, addr: {}, code: {}, err_msg: {}", + addr, + code, + err_msg + ))] + FlightGet { addr: String, - source: tonic::Status, + code: u32, + err_msg: String, backtrace: Backtrace, }, - #[snafu(display("Error occurred on the data node, code: {}, msg: {}", code, msg))] - Datanode { code: u32, msg: String }, - #[snafu(display("Failed to convert FlightData, source: {}", source))] ConvertFlightData { #[snafu(backtrace)] @@ -84,10 +77,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Error::IllegalFlightMessages { .. } - | Error::MissingResult { .. } - | Error::MissingHeader { .. } - | Error::TonicStatus { .. } - | Error::Datanode { .. } + | Error::FlightGet { .. } | Error::ColumnDataType { .. } | Error::MissingField { .. } => StatusCode::Internal, Error::CreateChannel { source, .. } | Error::ConvertFlightData { source } => { diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 3e3246fdcb..fbee1356d9 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -20,5 +20,5 @@ pub mod load_balance; pub use api; pub use self::client::Client; -pub use self::database::{Database, RpcOutput}; +pub use self::database::Database; pub use self::error::{Error, Result}; diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 4d0d3a8fde..507f1daaa9 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -24,6 +24,9 @@ pub mod prelude { pub use crate::ext::{BoxedError, ErrorExt}; pub use crate::format::DebugFormat; pub use crate::status_code::StatusCode; + + pub const INNER_ERROR_CODE: &str = "INNER_ERROR_CODE"; + pub const INNER_ERROR_MSG: &str = "INNER_ERROR_MSG"; } pub use snafu; diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index f25b468eec..6801891bcd 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -18,7 +18,7 @@ datafusion.workspace = true datatypes = { path = "../../datatypes" } flatbuffers = "22" futures = "0.3" -prost = "0.11" +prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tokio.workspace = true tonic.workspace = true diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 29d74174b7..77c4eee4d8 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -13,34 +13,25 @@ // limitations under the License. use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; -use api::result::ObjectResultBuilder; -use api::v1::{FlightDataExt, ObjectResult}; +use api::v1::FlightDataExt; use arrow_flight::utils::{flight_data_from_arrow_batch, flight_data_to_arrow_batch}; use arrow_flight::{FlightData, IpcMessage, SchemaAsIpc}; -use common_error::prelude::StatusCode; use common_recordbatch::{RecordBatch, RecordBatches}; use datatypes::arrow; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader}; use datatypes::schema::{Schema, SchemaRef}; use flatbuffers::FlatBufferBuilder; -use futures::TryStreamExt; use prost::Message; use snafu::{OptionExt, ResultExt}; -use tonic::codegen::futures_core::Stream; -use tonic::Response; use crate::error::{ ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result, }; -type TonicResult = std::result::Result; -type TonicStream = Pin> + Send + Sync + 'static>>; - #[derive(Debug, Clone)] pub enum FlightMessage { Schema(SchemaRef), @@ -147,37 +138,6 @@ impl FlightDecoder { } } -// TODO(LFC): Remove it once we completely get rid of old GRPC interface. -pub async fn flight_data_to_object_result( - response: Response>, -) -> Result { - let stream = response.into_inner(); - let result: TonicResult> = stream.try_collect().await; - match result { - Ok(flight_data) => Ok(ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .flight_data(flight_data) - .build()), - Err(e) => Ok(ObjectResultBuilder::new() - .status_code(StatusCode::Internal as _) - .err_msg(e.to_string()) - .build()), - } -} - -pub fn raw_flight_data_to_message(raw_data: Vec>) -> Result> { - let flight_data = raw_data - .into_iter() - .map(|x| FlightData::decode(x.as_slice()).context(DecodeFlightDataSnafu)) - .collect::>>()?; - - let decoder = &mut FlightDecoder::default(); - flight_data - .into_iter() - .map(|x| decoder.try_decode(x)) - .collect() -} - pub fn flight_messages_to_recordbatches(messages: Vec) -> Result { if messages.is_empty() { Ok(RecordBatches::empty()) diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 65ec955e8a..61e90a1aba 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Debug, Formatter}; + use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; pub mod columnar_value; @@ -29,4 +31,16 @@ pub enum Output { Stream(SendableRecordBatchStream), } +impl Debug for Output { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Output::AffectedRows(rows) => write!(f, "Output::AffectedRows({rows})"), + Output::RecordBatches(recordbatches) => { + write!(f, "Output::RecordBatches({recordbatches:?})") + } + Output::Stream(_) => write!(f, "Output::Stream()"), + } + } +} + pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index c3f40d6678..b63daad021 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -12,7 +12,6 @@ python = ["dep:script"] async-stream.workspace = true async-trait.workspace = true api = { path = "../api" } -arrow-flight.workspace = true axum = "0.6" axum-macros = "0.3" backon = "0.2" @@ -38,7 +37,7 @@ metrics = "0.20" mito = { path = "../mito", features = ["test"] } object-store = { path = "../object-store" } pin-project = "1.0" -prost = "0.11" +prost.workspace = true query = { path = "../query" } script = { path = "../script", features = ["python"], optional = true } serde = "1.0" diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7d14402c57..a4a9d9a9ff 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -281,32 +281,8 @@ pub enum Error { #[snafu(display("Missing node id option in distributed mode"))] MissingMetasrvOpts { backtrace: Backtrace }, - #[snafu(display("Invalid Flight ticket, source: {}", source))] - InvalidFlightTicket { - source: api::DecodeError, - backtrace: Backtrace, - }, - #[snafu(display("Missing required field: {}", name))] MissingRequiredField { name: String, backtrace: Backtrace }, - - #[snafu(display("Failed to poll recordbatch stream, source: {}", source))] - PollRecordbatchStream { - #[snafu(backtrace)] - source: common_recordbatch::error::Error, - }, - - #[snafu(display("Invalid FlightData, source: {}", source))] - InvalidFlightData { - #[snafu(backtrace)] - source: common_grpc::Error, - }, - - #[snafu(display("Failed to do Flight get, source: {}", source))] - FlightGet { - source: tonic::Status, - backtrace: Backtrace, - }, } pub type Result = std::result::Result; @@ -336,8 +312,6 @@ impl ErrorExt for Error { | Error::CreateExprToRequest { source } | Error::InsertData { source } => source.status_code(), - Error::InvalidFlightData { source } => source.status_code(), - Error::CreateSchema { source, .. } | Error::ConvertSchema { source, .. } | Error::VectorComputation { source } => source.status_code(), @@ -362,8 +336,6 @@ impl ErrorExt for Error { | Error::RegisterSchema { .. } | Error::Catalog { .. } | Error::MissingRequiredField { .. } - | Error::FlightGet { .. } - | Error::InvalidFlightTicket { .. } | Error::IncorrectInternalState { .. } => StatusCode::Internal, Error::InitBackend { .. } => StatusCode::StorageUnavailable, @@ -376,7 +348,6 @@ impl ErrorExt for Error { Error::BumpTableId { source, .. } => source.status_code(), Error::MissingNodeId { .. } => StatusCode::InvalidArguments, Error::MissingMetasrvOpts { .. } => StatusCode::InvalidArguments, - Error::PollRecordbatchStream { source } => source.status_code(), } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index c2a3802253..1751211b0b 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -47,7 +47,6 @@ use crate::heartbeat::HeartbeatTask; use crate::script::ScriptExecutor; use crate::sql::SqlHandler; -pub mod flight; mod grpc; mod script; mod sql; diff --git a/src/datanode/src/instance/flight.rs b/src/datanode/src/instance/flight.rs deleted file mode 100644 index d6599bdebc..0000000000 --- a/src/datanode/src/instance/flight.rs +++ /dev/null @@ -1,470 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod stream; - -use std::pin::Pin; - -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::object_expr::Request as GrpcRequest; -use api::v1::query_request::Query; -use api::v1::{DdlRequest, InsertRequest, ObjectExpr}; -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::{ - Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, -}; -use async_trait::async_trait; -use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_grpc::flight::{FlightEncoder, FlightMessage}; -use common_query::Output; -use futures::Stream; -use prost::Message; -use query::parser::QueryLanguageParser; -use session::context::QueryContext; -use snafu::{OptionExt, ResultExt}; -use tonic::{Request, Response, Streaming}; - -use crate::error::{ - CatalogSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, InvalidFlightTicketSnafu, - MissingRequiredFieldSnafu, Result, TableNotFoundSnafu, -}; -use crate::instance::flight::stream::FlightRecordBatchStream; -use crate::instance::Instance; - -type TonicResult = std::result::Result; -type TonicStream = Pin> + Send + Sync + 'static>>; - -#[async_trait] -impl FlightService for Instance { - type HandshakeStream = TonicStream; - - async fn handshake( - &self, - _request: Request>, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - type ListFlightsStream = TonicStream; - - async fn list_flights( - &self, - _request: Request, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - async fn get_flight_info( - &self, - _request: Request, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - async fn get_schema( - &self, - _request: Request, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - type DoGetStream = TonicStream; - - async fn do_get(&self, request: Request) -> TonicResult> { - let ticket = request.into_inner().ticket; - let request = ObjectExpr::decode(ticket.as_slice()) - .context(InvalidFlightTicketSnafu)? - .request - .context(MissingRequiredFieldSnafu { name: "request" })?; - let output = match request { - GrpcRequest::Insert(request) => self.handle_insert(request).await?, - GrpcRequest::Query(query_request) => { - let query = query_request - .query - .context(MissingRequiredFieldSnafu { name: "query" })?; - self.handle_query(query).await? - } - GrpcRequest::Ddl(request) => self.handle_ddl(request).await?, - }; - let stream = to_flight_data_stream(output); - Ok(Response::new(stream)) - } - - type DoPutStream = TonicStream; - - async fn do_put( - &self, - _request: Request>, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - type DoExchangeStream = TonicStream; - - async fn do_exchange( - &self, - _request: Request>, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - type DoActionStream = TonicStream; - - async fn do_action( - &self, - _request: Request, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } - - type ListActionsStream = TonicStream; - - async fn list_actions( - &self, - _request: Request, - ) -> TonicResult> { - Err(tonic::Status::unimplemented("Not yet implemented")) - } -} - -impl Instance { - async fn handle_query(&self, query: Query) -> Result { - Ok(match query { - Query::Sql(sql) => { - let stmt = QueryLanguageParser::parse_sql(&sql).context(ExecuteSqlSnafu)?; - self.execute_stmt(stmt, QueryContext::arc()).await? - } - Query::LogicalPlan(plan) => self.execute_logical(plan).await?, - }) - } - - pub async fn handle_insert(&self, request: InsertRequest) -> Result { - let table_name = &request.table_name.clone(); - // TODO(LFC): InsertRequest should carry catalog name, too. - let table = self - .catalog_manager - .table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name) - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name })?; - - let request = - common_grpc_expr::insert::to_table_insert_request(request).context(InsertDataSnafu)?; - - let affected_rows = table - .insert(request) - .await - .context(InsertSnafu { table_name })?; - Ok(Output::AffectedRows(affected_rows)) - } - - async fn handle_ddl(&self, request: DdlRequest) -> Result { - let expr = request - .expr - .context(MissingRequiredFieldSnafu { name: "expr" })?; - match expr { - DdlExpr::CreateTable(expr) => self.handle_create(expr).await, - DdlExpr::Alter(expr) => self.handle_alter(expr).await, - DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await, - DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, - } - } -} - -pub fn to_flight_data_stream(output: Output) -> TonicStream { - match output { - Output::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream); - Box::pin(stream) as _ - } - Output::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream()); - Box::pin(stream) as _ - } - Output::AffectedRows(rows) => { - let stream = tokio_stream::once(Ok( - FlightEncoder::default().encode(FlightMessage::AffectedRows(rows)) - )); - Box::pin(stream) as _ - } - } -} - -#[cfg(test)] -mod test { - use api::v1::column::{SemanticType, Values}; - use api::v1::{ - alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, QueryRequest, - }; - use client::RpcOutput; - use common_grpc::flight; - use common_recordbatch::RecordBatches; - use datatypes::prelude::*; - - use super::*; - use crate::tests::test_util::{self, MockInstance}; - - async fn boarding(instance: &MockInstance, ticket: Request) -> RpcOutput { - let response = instance.inner().do_get(ticket).await.unwrap(); - let result = flight::flight_data_to_object_result(response) - .await - .unwrap(); - result.try_into().unwrap() - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_handle_ddl() { - let instance = MockInstance::new("test_handle_ddl").await; - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { - database_name: "my_database".to_string(), - })), - })), - } - .encode_to_vec(), - }); - - let output = boarding(&instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(1))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(CreateTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "my_database".to_string(), - table_name: "my_table".to_string(), - desc: "blabla".to_string(), - column_defs: vec![ - ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: true, - default_constraint: vec![], - }, - ColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - is_nullable: false, - default_constraint: vec![], - }, - ], - time_index: "ts".to_string(), - ..Default::default() - })), - })), - } - .encode_to_vec(), - }); - - let output = boarding(&instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(0))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(AlterExpr { - catalog_name: "greptime".to_string(), - schema_name: "my_database".to_string(), - table_name: "my_table".to_string(), - kind: Some(alter_expr::Kind::AddColumns(AddColumns { - add_columns: vec![AddColumn { - column_def: Some(ColumnDef { - name: "b".to_string(), - datatype: ColumnDataType::Int32 as i32, - is_nullable: true, - default_constraint: vec![], - }), - is_key: true, - }], - })), - })), - })), - } - .encode_to_vec(), - }); - - let output = boarding(&instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(0))); - - let output = instance - .inner() - .execute_sql( - "INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)", - QueryContext::arc(), - ) - .await - .unwrap(); - assert!(matches!(output, Output::AffectedRows(1))); - - let output = instance - .inner() - .execute_sql( - "SELECT ts, a, b FROM my_database.my_table", - QueryContext::arc(), - ) - .await - .unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2022-12-30T07:09:00 | s | 1 | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_handle_insert() { - let instance = MockInstance::new("test_handle_insert").await; - test_util::create_test_table( - &instance, - ConcreteDataType::timestamp_millisecond_datatype(), - ) - .await - .unwrap(); - - let insert = InsertRequest { - schema_name: "public".to_string(), - table_name: "demo".to_string(), - columns: vec![ - Column { - column_name: "host".to_string(), - values: Some(Values { - string_values: vec![ - "host1".to_string(), - "host2".to_string(), - "host3".to_string(), - ], - ..Default::default() - }), - semantic_type: SemanticType::Tag as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }, - Column { - column_name: "cpu".to_string(), - values: Some(Values { - f64_values: vec![1.0, 3.0], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float64 as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 3, - ..Default::default() - }; - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Insert(insert)), - } - .encode_to_vec(), - }); - - let output = boarding(&instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(3))); - - let output = instance - .inner() - .execute_sql("SELECT ts, host, cpu FROM demo", QueryContext::arc()) - .await - .unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+-------+-----+ -| ts | host | cpu | -+---------------------+-------+-----+ -| 2022-12-30T07:09:00 | host1 | 1 | -| 2022-12-30T07:09:01 | host2 | | -| 2022-12-30T07:09:02 | host3 | 3 | -+---------------------+-------+-----+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_handle_query() { - let instance = MockInstance::new("test_handle_query").await; - test_util::create_test_table( - &instance, - ConcreteDataType::timestamp_millisecond_datatype(), - ) - .await - .unwrap(); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql( - "INSERT INTO demo(host, cpu, memory, ts) VALUES \ - ('host1', 66.6, 1024, 1672201025000),\ - ('host2', 88.8, 333.3, 1672201026000)" - .to_string(), - )), - })), - } - .encode_to_vec(), - }); - - let output = boarding(&instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(2))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql( - "SELECT ts, host, cpu, memory FROM demo".to_string(), - )), - })), - } - .encode_to_vec(), - }); - - let response = instance.inner().do_get(ticket).await.unwrap(); - let result = flight::flight_data_to_object_result(response) - .await - .unwrap(); - let raw_data = result.flight_data; - let messages = flight::raw_flight_data_to_message(raw_data).unwrap(); - assert_eq!(messages.len(), 2); - - let recordbatch = flight::flight_messages_to_recordbatches(messages).unwrap(); - let expected = "\ -+---------------------+-------+------+--------+ -| ts | host | cpu | memory | -+---------------------+-------+------+--------+ -| 2022-12-28T04:17:05 | host1 | 66.6 | 1024 | -| 2022-12-28T04:17:06 | host2 | 88.8 | 333.3 | -+---------------------+-------+------+--------+"; - let actual = recordbatch.pretty_print().unwrap(); - assert_eq!(actual, expected); - } -} diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 33bc3c34bf..b6ccb99c23 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -12,60 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::error::Error; - -use api::v1::{CreateDatabaseExpr, ObjectExpr, ObjectResult, ResultHeader}; -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::Ticket; +use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_request::Request as GrpcRequest; +use api::v1::query_request::Query; +use api::v1::{CreateDatabaseExpr, DdlRequest, GreptimeRequest, InsertRequest}; use async_trait::async_trait; -use common_error::prelude::{BoxedError, ErrorExt, StatusCode}; -use common_grpc::flight; +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_error::prelude::BoxedError; use common_query::Output; -use prost::Message; +use query::parser::QueryLanguageParser; use query::plan::LogicalPlan; use servers::query_handler::GrpcQueryHandler; +use session::context::QueryContext; use snafu::prelude::*; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::requests::CreateDatabaseRequest; -use tonic::Request; -use crate::error::{ - DecodeLogicalPlanSnafu, Error as DatanodeError, ExecuteSqlSnafu, InvalidFlightDataSnafu, Result, -}; +use crate::error::{self, DecodeLogicalPlanSnafu, ExecuteSqlSnafu, Result}; use crate::instance::Instance; impl Instance { - async fn boarding(&self, ticket: Request) -> Result { - let response = self.do_get(ticket).await; - let response = match response { - Ok(response) => response, - Err(e) => { - let status_code = e - .source() - .and_then(|s| s.downcast_ref::()) - .map(|s| s.status_code()) - .unwrap_or(StatusCode::Internal); - - let err_msg = e.source().map(|s| s.to_string()).unwrap_or(e.to_string()); - - // TODO(LFC): Further formalize error message when Arrow Flight adoption is done, - // and don't forget to change "test runner"'s error msg accordingly. - return Ok(ObjectResult { - header: Some(ResultHeader { - version: 1, - code: status_code as _, - err_msg, - }), - flight_data: vec![], - }); - } - }; - - flight::flight_data_to_object_result(response) - .await - .context(InvalidFlightDataSnafu) - } - pub(crate) async fn handle_create_database(&self, expr: CreateDatabaseExpr) -> Result { let req = CreateDatabaseRequest { db_name: expr.database_name, @@ -83,20 +49,298 @@ impl Instance { .await .context(ExecuteSqlSnafu) } + + async fn handle_query(&self, query: Query) -> Result { + Ok(match query { + Query::Sql(sql) => { + let stmt = QueryLanguageParser::parse_sql(&sql).context(ExecuteSqlSnafu)?; + self.execute_stmt(stmt, QueryContext::arc()).await? + } + Query::LogicalPlan(plan) => self.execute_logical(plan).await?, + }) + } + + pub async fn handle_insert(&self, request: InsertRequest) -> Result { + let table_name = &request.table_name.clone(); + // TODO(LFC): InsertRequest should carry catalog name, too. + let table = self + .catalog_manager + .table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { table_name })?; + + let request = common_grpc_expr::insert::to_table_insert_request(request) + .context(error::InsertDataSnafu)?; + + let affected_rows = table + .insert(request) + .await + .context(error::InsertSnafu { table_name })?; + Ok(Output::AffectedRows(affected_rows)) + } + + async fn handle_ddl(&self, request: DdlRequest) -> Result { + let expr = request.expr.context(error::MissingRequiredFieldSnafu { + name: "DdlRequest.expr", + })?; + match expr { + DdlExpr::CreateTable(expr) => self.handle_create(expr).await, + DdlExpr::Alter(expr) => self.handle_alter(expr).await, + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await, + DdlExpr::DropTable(expr) => self.handle_drop_table(expr).await, + } + } + + async fn handle_grpc_query(&self, query: GreptimeRequest) -> Result { + let request = query.request.context(error::MissingRequiredFieldSnafu { + name: "GreptimeRequest.request", + })?; + let output = match request { + GrpcRequest::Insert(request) => self.handle_insert(request).await?, + GrpcRequest::Query(query_request) => { + let query = query_request + .query + .context(error::MissingRequiredFieldSnafu { + name: "QueryRequest.query", + })?; + self.handle_query(query).await? + } + GrpcRequest::Ddl(request) => self.handle_ddl(request).await?, + }; + Ok(output) + } } #[async_trait] impl GrpcQueryHandler for Instance { - async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { - let ticket = Request::new(Ticket { - ticket: query.encode_to_vec(), - }); - // TODO(LFC): Temporarily use old GRPC interface here, will get rid of them near the end of Arrow Flight adoption. - self.boarding(ticket) + async fn do_query(&self, query: GreptimeRequest) -> servers::error::Result { + self.handle_grpc_query(query) .await .map_err(BoxedError::new) - .with_context(|_| servers::error::ExecuteQuerySnafu { - query: format!("{query:?}"), - }) + .context(servers::error::ExecuteGrpcQuerySnafu) + } +} + +#[cfg(test)] +mod test { + use api::v1::column::{SemanticType, Values}; + use api::v1::{ + alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, + CreateDatabaseExpr, CreateTableExpr, QueryRequest, + }; + use common_recordbatch::RecordBatches; + use datatypes::prelude::*; + + use super::*; + use crate::tests::test_util::{self, MockInstance}; + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_ddl() { + let instance = MockInstance::new("test_handle_ddl").await; + let instance = instance.inner(); + + let query = GreptimeRequest { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "my_database".to_string(), + })), + })), + }; + let output = instance.do_query(query).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = GreptimeRequest { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + desc: "blabla".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as i32, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + ..Default::default() + })), + })), + }; + let output = instance.do_query(query).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = GreptimeRequest { + request: Some(GrpcRequest::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "my_database".to_string(), + table_name: "my_table".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as i32, + is_nullable: true, + default_constraint: vec![], + }), + is_key: true, + }], + })), + })), + })), + }; + let output = instance.do_query(query).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let output = instance + .execute_sql( + "INSERT INTO my_database.my_table (a, b, ts) VALUES ('s', 1, 1672384140000)", + QueryContext::arc(), + ) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let output = instance + .execute_sql( + "SELECT ts, a, b FROM my_database.my_table", + QueryContext::arc(), + ) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2022-12-30T07:09:00 | s | 1 | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_insert() { + let instance = MockInstance::new("test_handle_insert").await; + let instance = instance.inner(); + test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) + .await + .unwrap(); + + let insert = InsertRequest { + schema_name: "public".to_string(), + table_name: "demo".to_string(), + columns: vec![ + Column { + column_name: "host".to_string(), + values: Some(Values { + string_values: vec![ + "host1".to_string(), + "host2".to_string(), + "host3".to_string(), + ], + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::String as i32, + ..Default::default() + }, + Column { + column_name: "cpu".to_string(), + values: Some(Values { + f64_values: vec![1.0, 3.0], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Float64 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + let query = GreptimeRequest { + request: Some(GrpcRequest::Insert(insert)), + }; + let output = instance.do_query(query).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(3))); + + let output = instance + .execute_sql("SELECT ts, host, cpu FROM demo", QueryContext::arc()) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+-------+-----+ +| ts | host | cpu | ++---------------------+-------+-----+ +| 2022-12-30T07:09:00 | host1 | 1 | +| 2022-12-30T07:09:01 | host2 | | +| 2022-12-30T07:09:02 | host3 | 3 | ++---------------------+-------+-----+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_query() { + let instance = MockInstance::new("test_handle_query").await; + let instance = instance.inner(); + test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) + .await + .unwrap(); + + let query = GreptimeRequest { + request: Some(GrpcRequest::Query(QueryRequest { + query: Some(Query::Sql( + "INSERT INTO demo(host, cpu, memory, ts) VALUES \ + ('host1', 66.6, 1024, 1672201025000),\ + ('host2', 88.8, 333.3, 1672201026000)" + .to_string(), + )), + })), + }; + let output = instance.do_query(query).await.unwrap(); + assert!(matches!(output, Output::AffectedRows(2))); + + let query = GreptimeRequest { + request: Some(GrpcRequest::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, host, cpu, memory FROM demo".to_string(), + )), + })), + }; + let output = instance.do_query(query).await.unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatch = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+-------+------+--------+ +| ts | host | cpu | memory | ++---------------------+-------+------+--------+ +| 2022-12-28T04:17:05 | host1 | 66.6 | 1024 | +| 2022-12-28T04:17:06 | host2 | 88.8 | 333.3 | ++---------------------+-------+------+--------+"; + let actual = recordbatch.pretty_print().unwrap(); + assert_eq!(actual, expected); } } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 0596324b71..a69f21e590 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -162,7 +162,7 @@ async fn setup_test_instance(test_name: &str) -> MockInstance { let instance = MockInstance::new(test_name).await; test_util::create_test_table( - &instance, + instance.inner(), ConcreteDataType::timestamp_millisecond_datatype(), ) .await @@ -189,7 +189,7 @@ async fn test_execute_insert() { async fn test_execute_insert_query_with_i64_timestamp() { let instance = MockInstance::new("insert_query_i64_timestamp").await; - test_util::create_test_table(&instance, ConcreteDataType::int64_datatype()) + test_util::create_test_table(instance.inner(), ConcreteDataType::int64_datatype()) .await .unwrap(); @@ -302,7 +302,7 @@ async fn test_execute_show_databases_tables() { // creat a table test_util::create_test_table( - &instance, + instance.inner(), ConcreteDataType::timestamp_millisecond_datatype(), ) .await diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 90233d43d2..110cd12a46 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -78,7 +78,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) } pub(crate) async fn create_test_table( - instance: &MockInstance, + instance: &Instance, ts_type: ConcreteDataType, ) -> Result<()> { let column_schemas = vec![ @@ -89,7 +89,7 @@ pub(crate) async fn create_test_table( ]; let table_name = "demo"; - let table_engine: TableEngineRef = instance.inner().sql_handler().table_engine(); + let table_engine: TableEngineRef = instance.sql_handler().table_engine(); let table = table_engine .create_table( &EngineContext::default(), @@ -115,7 +115,6 @@ pub(crate) async fn create_test_table( .context(CreateTableSnafu { table_name })?; let schema_provider = instance - .inner() .catalog_manager .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) .unwrap() diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 2b3e0b002f..91c49d730a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -6,7 +6,6 @@ license.workspace = true [dependencies] anymap = "1.0.0-beta.2" -arrow-flight.workspace = true api = { path = "../api" } async-stream.workspace = true async-trait = "0.1" @@ -28,12 +27,12 @@ datafusion-expr.workspace = true datanode = { path = "../datanode" } datatypes = { path = "../datatypes" } futures = "0.3" -futures-util = "0.3" +futures-util.workspace = true itertools = "0.10" meta-client = { path = "../meta-client" } moka = { version = "0.9", features = ["future"] } openmetrics-parser = "0.4" -prost = "0.11" +prost.workspace = true query = { path = "../query" } rustls = "0.20" serde = "1.0" diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 9111f21660..ef66a60163 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -23,12 +23,6 @@ use store_api::storage::RegionId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Invalid ObjectResult, source: {}", source))] - InvalidObjectResult { - #[snafu(backtrace)] - source: client::Error, - }, - #[snafu(display("Failed to request Datanode, source: {}", source))] RequestDatanode { #[snafu(backtrace)] @@ -105,12 +99,6 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Invalid Flight ticket, source: {}", source))] - InvalidFlightTicket { - source: api::DecodeError, - backtrace: Backtrace, - }, - #[snafu(display( "Failed to convert DataFusion's ScalarValue: {:?}, source: {}", value, @@ -237,12 +225,6 @@ pub enum Error { source: common_grpc_expr::error::Error, }, - #[snafu(display("Failed to find new columns on insertion: {}", source))] - FindNewColumnsOnInsertion { - #[snafu(backtrace)] - source: common_grpc_expr::error::Error, - }, - #[snafu(display( "Failed to convert GRPC InsertRequest to table InsertRequest, source: {}", source @@ -374,24 +356,6 @@ pub enum Error { source: datatypes::error::Error, }, - #[snafu(display("Failed to invoke GRPC server, source: {}", source))] - InvokeGrpcServer { - #[snafu(backtrace)] - source: servers::error::Error, - }, - - #[snafu(display("Failed to do Flight get, source: {}", source))] - FlightGet { - source: tonic::Status, - backtrace: Backtrace, - }, - - #[snafu(display("Invalid FlightData, source: {}", source))] - InvalidFlightData { - #[snafu(backtrace)] - source: common_grpc::Error, - }, - #[snafu(display("Failed to found context value: {}", key))] ContextValueNotFound { key: String, backtrace: Backtrace }, @@ -419,14 +383,11 @@ impl ErrorExt for Error { | Error::InvalidInsertRequest { .. } | Error::FindPartitionColumn { .. } | Error::ColumnValuesNumberMismatch { .. } - | Error::RegionKeysSize { .. } - | Error::InvalidFlightTicket { .. } => StatusCode::InvalidArguments, + | Error::RegionKeysSize { .. } => StatusCode::InvalidArguments, Error::RuntimeResource { source, .. } => source.status_code(), - Error::StartServer { source, .. } | Error::InvokeGrpcServer { source } => { - source.status_code() - } + Error::StartServer { source, .. } => source.status_code(), Error::ParseSql { source } => source.status_code(), @@ -436,9 +397,7 @@ impl ErrorExt for Error { | Error::ConvertScalarValue { source, .. } | Error::ConvertArrowSchema { source } => source.status_code(), - Error::InvalidObjectResult { source, .. } | Error::RequestDatanode { source } => { - source.status_code() - } + Error::RequestDatanode { source } => source.status_code(), Error::ColumnDataType { source } | Error::InvalidColumnDef { source, .. } => { source.status_code() @@ -454,7 +413,6 @@ impl ErrorExt for Error { | Error::FindRegionPartition { .. } | Error::IllegalTableRoutesData { .. } | Error::BuildDfLogicalPlan { .. } - | Error::FlightGet { .. } | Error::BuildTableMeta { .. } => StatusCode::Internal, Error::IllegalFrontendState { .. } @@ -475,7 +433,6 @@ impl ErrorExt for Error { Error::CatalogNotFound { .. } => StatusCode::InvalidArguments, Error::Insert { source, .. } => source.status_code(), Error::BuildCreateExprOnInsertion { source, .. } => source.status_code(), - Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), Error::ToTableInsertRequest { source, .. } => source.status_code(), Error::PrimaryKeyNotFound { .. } => StatusCode::InvalidArguments, Error::ExecuteStatement { source, .. } => source.status_code(), @@ -485,7 +442,6 @@ impl ErrorExt for Error { Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), Error::BuildVector { source, .. } => source.status_code(), - Error::InvalidFlightData { source } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 972dc0b771..3f1353d859 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) mod distributed; -mod flight; mod grpc; mod influxdb; mod opentsdb; @@ -24,14 +23,13 @@ use std::time::Duration; use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::object_expr::Request; +use api::v1::greptime_request::Request; use api::v1::{ - AddColumns, AlterExpr, Column, DdlRequest, DropTableExpr, InsertRequest, ObjectExpr, + AddColumns, AlterExpr, Column, DdlRequest, DropTableExpr, GreptimeRequest, InsertRequest, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; -use client::RpcOutput; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -57,10 +55,7 @@ use sql::statements::statement::Statement; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; -use crate::error::{ - self, CatalogSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, InvalidObjectResultSnafu, - InvokeGrpcServerSnafu, MissingMetasrvOptsSnafu, Result, -}; +use crate::error::{self, MissingMetasrvOptsSnafu, Result}; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::table::route::TableRoutes; @@ -194,7 +189,10 @@ impl Instance { } /// Handle batch inserts - pub async fn handle_inserts(&self, requests: Vec) -> Result { + pub async fn handle_inserts( + &self, + requests: Vec, + ) -> server_error::Result { let mut success = 0; for request in requests { match self.handle_insert(request).await? { @@ -205,7 +203,7 @@ impl Instance { Ok(Output::AffectedRows(success)) } - async fn handle_insert(&self, request: InsertRequest) -> Result { + async fn handle_insert(&self, request: InsertRequest) -> server_error::Result { let schema_name = &request.schema_name; let table_name = &request.table_name; let catalog_name = DEFAULT_CATALOG_NAME; @@ -215,14 +213,10 @@ impl Instance { self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns) .await?; - let query = ObjectExpr { + let query = GreptimeRequest { request: Some(Request::Insert(request)), }; - let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query) - .await - .context(InvokeGrpcServerSnafu)?; - let result: RpcOutput = result.try_into().context(InsertSnafu)?; - Ok(result.into()) + GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await } // check if table already exist: @@ -234,11 +228,11 @@ impl Instance { schema_name: &str, table_name: &str, columns: &[Column], - ) -> Result<()> { + ) -> server_error::Result<()> { let table = self .catalog_manager .table(catalog_name, schema_name, table_name) - .context(CatalogSnafu)?; + .context(server_error::CatalogSnafu)?; match table { None => { info!( @@ -256,7 +250,7 @@ impl Instance { let schema = table.schema(); if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns) - .context(FindNewColumnsOnInsertionSnafu)? + .context(server_error::FindNewColumnsOnInsertionSnafu)? { info!( "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", @@ -286,29 +280,27 @@ impl Instance { schema_name: &str, table_name: &str, columns: &[Column], - ) -> Result { + ) -> server_error::Result { // Create table automatically, build schema from data. let create_expr = self .create_expr_factory .create_expr_by_columns(catalog_name, schema_name, table_name, columns) - .await?; + .await + .map_err(BoxedError::new) + .context(server_error::ExecuteGrpcQuerySnafu)?; info!( "Try to create table: {} automatically with request: {:?}", table_name, create_expr, ); - let result = self - .grpc_query_handler - .do_query(ObjectExpr { + self.grpc_query_handler + .do_query(GreptimeRequest { request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(create_expr)), })), }) .await - .context(InvokeGrpcServerSnafu)?; - let output: RpcOutput = result.try_into().context(InvalidObjectResultSnafu)?; - Ok(output.into()) } async fn add_new_columns_to_table( @@ -317,7 +309,7 @@ impl Instance { schema_name: &str, table_name: &str, add_columns: AddColumns, - ) -> Result { + ) -> server_error::Result { debug!( "Adding new columns: {:?} to table: {}", add_columns, table_name @@ -329,17 +321,13 @@ impl Instance { kind: Some(Kind::AddColumns(add_columns)), }; - let result = self - .grpc_query_handler - .do_query(ObjectExpr { + self.grpc_query_handler + .do_query(GreptimeRequest { request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::Alter(expr)), })), }) .await - .context(InvokeGrpcServerSnafu)?; - let output: RpcOutput = result.try_into().context(InvalidObjectResultSnafu)?; - Ok(output.into()) } fn handle_use(&self, db: String, query_ctx: QueryContextRef) -> Result { @@ -403,19 +391,14 @@ impl Instance { let expr = AlterExpr::try_from(alter_stmt) .map_err(BoxedError::new) .context(server_error::ExecuteAlterSnafu { query })?; - let result = self + return self .grpc_query_handler - .do_query(ObjectExpr { + .do_query(GreptimeRequest { request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::Alter(expr)), })), }) - .await?; - let output: RpcOutput = result - .try_into() - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - Ok(output.into()) + .await; } Statement::DropTable(drop_stmt) => { let expr = DropTableExpr { @@ -423,19 +406,14 @@ impl Instance { schema_name: drop_stmt.schema_name, table_name: drop_stmt.table_name, }; - let result = self + return self .grpc_query_handler - .do_query(ObjectExpr { + .do_query(GreptimeRequest { request: Some(Request::Ddl(DdlRequest { expr: Some(DdlExpr::DropTable(expr)), })), }) - .await?; - let output: RpcOutput = result - .try_into() - .map_err(BoxedError::new) - .context(server_error::ExecuteQuerySnafu { query })?; - Ok(output.into()) + .await; } Statement::ShowCreateTable(_) => { return server_error::NotSupportedSnafu { feat: query }.fail(); @@ -547,20 +525,8 @@ impl ScriptHandler for Instance { #[cfg(test)] mod tests { use std::borrow::Cow; - use std::iter; use std::sync::atomic::AtomicU32; - use api::v1::column::SemanticType; - use api::v1::{ - column, query_request, Column, ColumnDataType, ColumnDef as GrpcColumnDef, CreateTableExpr, - QueryRequest, - }; - use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; - use common_recordbatch::RecordBatch; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; - use datatypes::value::Value; - use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; use session::context::QueryContext; use super::*; @@ -662,219 +628,6 @@ mod tests { }; } - #[tokio::test(flavor = "multi_thread")] - async fn test_execute_grpc() { - let standalone = tests::create_standalone_instance("test_execute_grpc").await; - let instance = standalone.instance; - - // testing data: - let expected_host_col = Column { - column_name: "host".to_string(), - values: Some(column::Values { - string_values: vec!["fe.host.a", "fe.host.b", "fe.host.c", "fe.host.d"] - .into_iter() - .map(|s| s.to_string()) - .collect(), - ..Default::default() - }), - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::String as i32, - ..Default::default() - }; - let expected_cpu_col = Column { - column_name: "cpu".to_string(), - values: Some(column::Values { - f64_values: vec![1.0, 3.0, 4.0], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float64 as i32, - }; - let expected_mem_col = Column { - column_name: "memory".to_string(), - values: Some(column::Values { - f64_values: vec![100.0, 200.0, 400.0], - ..Default::default() - }), - null_mask: vec![4], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Float64 as i32, - }; - let expected_ts_col = Column { - column_name: "ts".to_string(), - values: Some(column::Values { - ts_millisecond_values: vec![1000, 2000, 3000, 4000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }; - - // create - let result = GrpcQueryHandler::do_query( - &*instance, - ObjectExpr { - request: Some(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(create_expr())), - })), - }, - ) - .await - .unwrap(); - let output: RpcOutput = result.try_into().unwrap(); - assert!(matches!(output, RpcOutput::AffectedRows(0))); - - // insert - let columns = vec![ - expected_host_col.clone(), - expected_cpu_col.clone(), - expected_mem_col.clone(), - expected_ts_col.clone(), - ]; - let row_count = 4; - let request = InsertRequest { - schema_name: "public".to_string(), - table_name: "demo".to_string(), - region_number: 0, - columns, - row_count, - }; - let object_expr = ObjectExpr { - request: Some(Request::Insert(request)), - }; - let result = GrpcQueryHandler::do_query(&*instance, object_expr) - .await - .unwrap(); - let raw_data = result.flight_data; - let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap(); - assert_eq!(flight_messages.len(), 1); - let message = flight_messages.remove(0); - assert!(matches!(message, FlightMessage::AffectedRows(4))); - - // select - let object_expr = ObjectExpr { - request: Some(Request::Query(QueryRequest { - query: Some(query_request::Query::Sql("select * from demo".to_string())), - })), - }; - let result = GrpcQueryHandler::do_query(&*instance, object_expr) - .await - .unwrap(); - let raw_data = result.flight_data; - let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap(); - assert_eq!(flight_messages.len(), 2); - - let expected_schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("disk_util", ConcreteDataType::float64_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(9.9f64)))) - .unwrap(), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ])); - match flight_messages.remove(0) { - FlightMessage::Schema(schema) => { - assert_eq!(schema, expected_schema); - } - _ => unreachable!(), - } - - match flight_messages.remove(0) { - FlightMessage::Recordbatch(recordbatch) => { - let expect_recordbatch = RecordBatch::new( - expected_schema, - vec![ - Arc::new(StringVector::from(vec![ - "fe.host.a", - "fe.host.b", - "fe.host.c", - "fe.host.d", - ])) as _, - Arc::new(Float64Vector::from(vec![ - Some(1.0f64), - None, - Some(3.0f64), - Some(4.0f64), - ])) as _, - Arc::new(Float64Vector::from(vec![ - Some(100f64), - Some(200f64), - None, - Some(400f64), - ])) as _, - Arc::new(Float64Vector::from_vec( - iter::repeat(9.9f64).take(4).collect(), - )) as _, - Arc::new(TimestampMillisecondVector::from_vec(vec![ - 1000i64, 2000, 3000, 4000, - ])) as _, - ], - ) - .unwrap(); - assert_eq!(recordbatch, expect_recordbatch); - } - _ => unreachable!(), - } - } - - fn create_expr() -> CreateTableExpr { - let column_defs = vec![ - GrpcColumnDef { - name: "host".to_string(), - datatype: ColumnDataType::String as i32, - is_nullable: false, - default_constraint: vec![], - }, - GrpcColumnDef { - name: "cpu".to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: vec![], - }, - GrpcColumnDef { - name: "memory".to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: vec![], - }, - GrpcColumnDef { - name: "disk_util".to_string(), - datatype: ColumnDataType::Float64 as i32, - is_nullable: true, - default_constraint: ColumnDefaultConstraint::Value(Value::from(9.9f64)) - .try_into() - .unwrap(), - }, - GrpcColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - is_nullable: true, - default_constraint: vec![], - }, - ]; - CreateTableExpr { - catalog_name: "".to_string(), - schema_name: "".to_string(), - table_name: "demo".to_string(), - desc: "".to_string(), - column_defs, - time_index: "ts".to_string(), - primary_keys: vec!["host".to_string()], - create_if_not_exists: true, - table_options: Default::default(), - table_id: None, - region_ids: vec![0], - } - } - #[tokio::test(flavor = "multi_thread")] async fn test_sql_interceptor_plugin() { #[derive(Default)] diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 6a93f59924..442f98cb2e 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -12,18 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod flight; +mod grpc; use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateTableExpr, InsertRequest, ObjectExpr, ObjectResult, - TableId, -}; -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::Ticket; +use api::v1::{AlterExpr, CreateDatabaseExpr, CreateTableExpr, InsertRequest, TableId}; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue}; use catalog::{CatalogList, CatalogManager}; @@ -31,7 +26,6 @@ use chrono::DateTime; use client::Database; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::prelude::BoxedError; -use common_grpc::flight::flight_data_to_object_result; use common_query::Output; use common_telemetry::{debug, error, info}; use datatypes::prelude::ConcreteDataType; @@ -41,12 +35,11 @@ use meta_client::rpc::{ CreateRequest as MetaCreateRequest, Partition as MetaPartition, PutRequest, RouteResponse, TableName, TableRoute, }; -use prost::Message; use query::parser::QueryStatement; use query::sql::{describe_table, explain, show_databases, show_tables}; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error as server_error; -use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use servers::query_handler::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::Value as SqlValue; @@ -55,15 +48,14 @@ use sql::statements::sql_value_to_value; use sql::statements::statement::Statement; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::table::AlterContext; -use tonic::Request; use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ self, AlterExprToRequestSnafu, CatalogEntrySerdeSnafu, CatalogNotFoundSnafu, CatalogSnafu, - ColumnDataTypeSnafu, FlightGetSnafu, InvalidFlightDataSnafu, ParseSqlSnafu, - PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaNotFoundSnafu, - StartMetaClientSnafu, TableNotFoundSnafu, TableSnafu, ToTableInsertRequestSnafu, + ColumnDataTypeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, + RequestMetaSnafu, Result, SchemaNotFoundSnafu, StartMetaClientSnafu, TableNotFoundSnafu, + TableSnafu, ToTableInsertRequestSnafu, }; use crate::expr_factory::{CreateExprFactory, DefaultCreateExprFactory}; use crate::instance::parse_stmt; @@ -371,13 +363,6 @@ impl DistInstance { Ok(Output::AffectedRows(affected_rows)) } - async fn boarding(&self, ticket: Request) -> Result { - let response = self.do_get(ticket).await.context(FlightGetSnafu)?; - flight_data_to_object_result(response) - .await - .context(InvalidFlightDataSnafu) - } - #[cfg(test)] pub(crate) fn catalog_manager(&self) -> Arc { self.catalog_manager.clone() @@ -420,22 +405,6 @@ impl SqlQueryHandler for DistInstance { } } -#[async_trait] -impl GrpcQueryHandler for DistInstance { - async fn do_query(&self, query: ObjectExpr) -> server_error::Result { - let ticket = Request::new(Ticket { - ticket: query.encode_to_vec(), - }); - // TODO(LFC): Temporarily use old GRPC interface here, will get rid of them near the end of Arrow Flight adoption. - self.boarding(ticket) - .await - .map_err(BoxedError::new) - .with_context(|_| servers::error::ExecuteQuerySnafu { - query: format!("{query:?}"), - }) - } -} - fn create_table_global_value( create_table: &CreateTableExpr, table_route: &TableRoute, diff --git a/src/frontend/src/instance/distributed/grpc.rs b/src/frontend/src/instance/distributed/grpc.rs new file mode 100644 index 0000000000..61e9b5eaca --- /dev/null +++ b/src/frontend/src/instance/distributed/grpc.rs @@ -0,0 +1,69 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::greptime_request::Request; +use api::v1::GreptimeRequest; +use async_trait::async_trait; +use common_error::prelude::BoxedError; +use common_query::Output; +use servers::query_handler::GrpcQueryHandler; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{self, Result}; +use crate::instance::distributed::DistInstance; + +impl DistInstance { + async fn handle_grpc_query(&self, query: GreptimeRequest) -> Result { + let request = query.request.context(error::IncompleteGrpcResultSnafu { + err_msg: "Missing 'request' in GreptimeRequest", + })?; + let output = match request { + Request::Insert(request) => self.handle_dist_insert(request).await?, + Request::Query(_) => { + unreachable!("Query should have been handled directly in Frontend Instance!") + } + Request::Ddl(request) => { + let expr = request.expr.context(error::IncompleteGrpcResultSnafu { + err_msg: "Missing 'expr' in DDL request", + })?; + match expr { + DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await?, + DdlExpr::CreateTable(mut expr) => { + // TODO(LFC): Support creating distributed table through GRPC interface. + // Currently only SQL supports it; how to design the fields in CreateTableExpr? + self.create_table(&mut expr, None).await? + } + DdlExpr::Alter(expr) => self.handle_alter_table(expr).await?, + DdlExpr::DropTable(_) => { + // TODO(LFC): Implement distributed drop table. + // Seems the whole "drop table through GRPC interface" feature is not implemented? + unimplemented!() + } + } + } + }; + Ok(output) + } +} + +#[async_trait] +impl GrpcQueryHandler for DistInstance { + async fn do_query(&self, query: GreptimeRequest) -> servers::error::Result { + self.handle_grpc_query(query) + .await + .map_err(BoxedError::new) + .context(servers::error::ExecuteGrpcQuerySnafu) + } +} diff --git a/src/frontend/src/instance/flight.rs b/src/frontend/src/instance/flight.rs deleted file mode 100644 index 69e2ea7ab5..0000000000 --- a/src/frontend/src/instance/flight.rs +++ /dev/null @@ -1,667 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::pin::Pin; - -use api::v1::object_expr::Request as GrpcRequest; -use api::v1::query_request::Query; -use api::v1::ObjectExpr; -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::{ - Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, -}; -use async_trait::async_trait; -use client::RpcOutput; -use datanode::instance::flight::to_flight_data_stream; -use futures::Stream; -use prost::Message; -use servers::query_handler::GrpcQueryHandler; -use session::context::QueryContext; -use snafu::{ensure, OptionExt, ResultExt}; -use tonic::{Request, Response, Status, Streaming}; - -use crate::error::{ - IncompleteGrpcResultSnafu, InvalidFlightTicketSnafu, InvalidObjectResultSnafu, InvalidSqlSnafu, - InvokeGrpcServerSnafu, -}; -use crate::instance::{parse_stmt, Instance}; - -type TonicResult = Result; -type TonicStream = Pin> + Send + Sync + 'static>>; - -#[async_trait] -impl FlightService for Instance { - type HandshakeStream = TonicStream; - - async fn handshake( - &self, - _: Request>, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - type ListFlightsStream = TonicStream; - - async fn list_flights( - &self, - _: Request, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn get_flight_info( - &self, - _: Request, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - async fn get_schema( - &self, - _: Request, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - type DoGetStream = TonicStream; - - async fn do_get(&self, request: Request) -> TonicResult> { - let ticket = request.into_inner().ticket; - let request = ObjectExpr::decode(ticket.as_slice()) - .context(InvalidFlightTicketSnafu)? - .request - .context(IncompleteGrpcResultSnafu { - err_msg: "Missing 'request' in ObjectExpr", - })?; - let output = match request { - GrpcRequest::Insert(request) => self.handle_insert(request).await?, - GrpcRequest::Query(query_request) => { - let query = query_request.query.context(IncompleteGrpcResultSnafu { - err_msg: "Missing 'query' in ObjectExpr::Request", - })?; - match query { - Query::Sql(sql) => { - let mut stmt = parse_stmt(&sql)?; - ensure!( - stmt.len() == 1, - InvalidSqlSnafu { - err_msg: "expect only one statement in SQL query string through GRPC interface" - } - ); - let stmt = stmt.remove(0); - - self.query_statement(stmt, QueryContext::arc()).await? - } - Query::LogicalPlan(_) => { - return Err(Status::unimplemented("Not yet implemented")) - } - } - } - GrpcRequest::Ddl(request) => { - let query = ObjectExpr { - request: Some(GrpcRequest::Ddl(request)), - }; - let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query) - .await - .context(InvokeGrpcServerSnafu)?; - let result: RpcOutput = result.try_into().context(InvalidObjectResultSnafu)?; - result.into() - } - }; - let stream = to_flight_data_stream(output); - Ok(Response::new(stream)) - } - - type DoPutStream = TonicStream; - - async fn do_put( - &self, - _: Request>, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - type DoExchangeStream = TonicStream; - - async fn do_exchange( - &self, - _: Request>, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - type DoActionStream = TonicStream; - - async fn do_action(&self, _: Request) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } - - type ListActionsStream = TonicStream; - - async fn list_actions( - &self, - _: Request, - ) -> TonicResult> { - Err(Status::unimplemented("Not yet implemented")) - } -} - -#[cfg(test)] -mod test { - use std::collections::HashMap; - use std::sync::Arc; - - use api::v1::column::{SemanticType, Values}; - use api::v1::ddl_request::Expr as DdlExpr; - use api::v1::{ - alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, DdlRequest, InsertRequest, QueryRequest, - }; - use catalog::helper::{TableGlobalKey, TableGlobalValue}; - use client::RpcOutput; - use common_grpc::flight; - use common_query::Output; - use common_recordbatch::RecordBatches; - - use super::*; - use crate::table::DistTable; - use crate::tests; - use crate::tests::MockDistributedInstance; - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_handle_ddl_request() { - let instance = - tests::create_distributed_instance("test_distributed_handle_ddl_request").await; - let frontend = &instance.frontend; - - test_handle_ddl_request(frontend).await - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_handle_ddl_request() { - let standalone = - tests::create_standalone_instance("test_standalone_handle_ddl_request").await; - let instance = &standalone.instance; - - test_handle_ddl_request(instance).await - } - - async fn test_handle_ddl_request(instance: &Arc) { - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { - database_name: "database_created_through_grpc".to_string(), - })), - })), - } - .encode_to_vec(), - }); - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(1))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::CreateTable(CreateTableExpr { - catalog_name: "greptime".to_string(), - schema_name: "database_created_through_grpc".to_string(), - table_name: "table_created_through_grpc".to_string(), - column_defs: vec![ - ColumnDef { - name: "a".to_string(), - datatype: ColumnDataType::String as _, - is_nullable: true, - default_constraint: vec![], - }, - ColumnDef { - name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, - is_nullable: false, - default_constraint: vec![], - }, - ], - time_index: "ts".to_string(), - ..Default::default() - })), - })), - } - .encode_to_vec(), - }); - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(0))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Ddl(DdlRequest { - expr: Some(DdlExpr::Alter(AlterExpr { - catalog_name: "greptime".to_string(), - schema_name: "database_created_through_grpc".to_string(), - table_name: "table_created_through_grpc".to_string(), - kind: Some(alter_expr::Kind::AddColumns(AddColumns { - add_columns: vec![AddColumn { - column_def: Some(ColumnDef { - name: "b".to_string(), - datatype: ColumnDataType::Int32 as _, - is_nullable: true, - default_constraint: vec![], - }), - is_key: false, - }], - })), - })), - })), - } - .encode_to_vec(), - }); - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(0))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string())) - })) - }.encode_to_vec() - }); - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(1))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql("SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc".to_string())) - })) - }.encode_to_vec() - }); - let output = boarding(instance, ticket).await; - let RpcOutput::RecordBatches(recordbatches) = output else { unreachable!() }; - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-04T07:14:26 | s | 1 | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_insert_and_query() { - common_telemetry::init_default_ut_logging(); - - let instance = - tests::create_distributed_instance("test_distributed_insert_and_query").await; - let frontend = &instance.frontend; - - let table_name = "my_dist_table"; - let sql = format!( - r" -CREATE TABLE {table_name} ( - a INT, - ts TIMESTAMP, - TIME INDEX (ts) -) PARTITION BY RANGE COLUMNS(a) ( - PARTITION r0 VALUES LESS THAN (10), - PARTITION r1 VALUES LESS THAN (20), - PARTITION r2 VALUES LESS THAN (50), - PARTITION r3 VALUES LESS THAN (MAXVALUE), -)" - ); - create_table(frontend, sql).await; - - test_insert_and_query_on_existing_table(frontend, table_name).await; - - verify_data_distribution( - &instance, - table_name, - HashMap::from([ - ( - 0u32, - "\ -+---------------------+---+ -| ts | a | -+---------------------+---+ -| 2023-01-01T07:26:12 | 1 | -| 2023-01-01T07:26:14 | | -+---------------------+---+", - ), - ( - 1u32, - "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:13 | 11 | -+---------------------+----+", - ), - ( - 2u32, - "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:15 | 20 | -| 2023-01-01T07:26:16 | 22 | -+---------------------+----+", - ), - ( - 3u32, - "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:17 | 50 | -| 2023-01-01T07:26:18 | 55 | -| 2023-01-01T07:26:19 | 99 | -+---------------------+----+", - ), - ]), - ) - .await; - - test_insert_and_query_on_auto_created_table(frontend).await; - - // Auto created table has only one region. - verify_data_distribution( - &instance, - "auto_created_table", - HashMap::from([( - 0u32, - "\ -+---------------------+---+ -| ts | a | -+---------------------+---+ -| 2023-01-01T07:26:15 | 4 | -| 2023-01-01T07:26:16 | | -| 2023-01-01T07:26:17 | 6 | -| 2023-01-01T07:26:18 | | -| 2023-01-01T07:26:19 | | -| 2023-01-01T07:26:20 | | -+---------------------+---+", - )]), - ) - .await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_insert_and_query() { - common_telemetry::init_default_ut_logging(); - - let standalone = - tests::create_standalone_instance("test_standalone_insert_and_query").await; - let instance = &standalone.instance; - - let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))"); - create_table(instance, sql).await; - - test_insert_and_query_on_existing_table(instance, table_name).await; - - test_insert_and_query_on_auto_created_table(instance).await - } - - async fn create_table(frontend: &Arc, sql: String) { - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql(sql)), - })), - } - .encode_to_vec(), - }); - let output = boarding(frontend, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(0))); - } - - async fn test_insert_and_query_on_existing_table(instance: &Arc, table_name: &str) { - let insert = InsertRequest { - schema_name: "public".to_string(), - table_name: table_name.to_string(), - columns: vec![ - Column { - column_name: "a".to_string(), - values: Some(Values { - i32_values: vec![1, 11, 20, 22, 50, 55, 99], - ..Default::default() - }), - null_mask: vec![4], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![ - 1672557972000, - 1672557973000, - 1672557974000, - 1672557975000, - 1672557976000, - 1672557977000, - 1672557978000, - 1672557979000, - ], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 8, - ..Default::default() - }; - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Insert(insert)), - } - .encode_to_vec(), - }); - - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(8))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql(format!( - "SELECT ts, a FROM {table_name} ORDER BY ts" - ))), - })), - } - .encode_to_vec(), - }); - - let output = boarding(instance, ticket).await; - let RpcOutput::RecordBatches(recordbatches) = output else { unreachable!() }; - let expected = "\ -+---------------------+----+ -| ts | a | -+---------------------+----+ -| 2023-01-01T07:26:12 | 1 | -| 2023-01-01T07:26:13 | 11 | -| 2023-01-01T07:26:14 | | -| 2023-01-01T07:26:15 | 20 | -| 2023-01-01T07:26:16 | 22 | -| 2023-01-01T07:26:17 | 50 | -| 2023-01-01T07:26:18 | 55 | -| 2023-01-01T07:26:19 | 99 | -+---------------------+----+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - async fn verify_data_distribution( - instance: &MockDistributedInstance, - table_name: &str, - expected_distribution: HashMap, - ) { - let table = instance - .frontend - .catalog_manager() - .table("greptime", "public", table_name) - .unwrap() - .unwrap(); - let table = table.as_any().downcast_ref::().unwrap(); - - let TableGlobalValue { regions_id_map, .. } = table - .table_global_value(&TableGlobalKey { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - }) - .await - .unwrap() - .unwrap(); - let region_to_dn_map = regions_id_map - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); - assert_eq!(region_to_dn_map.len(), expected_distribution.len()); - - for (region, dn) in region_to_dn_map.iter() { - let dn = instance.datanodes.get(dn).unwrap(); - let output = dn - .execute_sql( - &format!("SELECT ts, a FROM {table_name} ORDER BY ts"), - QueryContext::arc(), - ) - .await - .unwrap(); - let Output::Stream(stream) = output else { unreachable!() }; - let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let actual = recordbatches.pretty_print().unwrap(); - - let expected = expected_distribution.get(region).unwrap(); - assert_eq!(&actual, expected); - } - } - - async fn test_insert_and_query_on_auto_created_table(instance: &Arc) { - let insert = InsertRequest { - schema_name: "public".to_string(), - table_name: "auto_created_table".to_string(), - columns: vec![ - Column { - column_name: "a".to_string(), - values: Some(Values { - i32_values: vec![4, 6], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::Int32 as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 3, - ..Default::default() - }; - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Insert(insert)), - } - .encode_to_vec(), - }); - - // Test auto create not existed table upon insertion. - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(3))); - - let insert = InsertRequest { - schema_name: "public".to_string(), - table_name: "auto_created_table".to_string(), - columns: vec![ - Column { - column_name: "b".to_string(), - values: Some(Values { - string_values: vec!["x".to_string(), "z".to_string()], - ..Default::default() - }), - null_mask: vec![2], - semantic_type: SemanticType::Field as i32, - datatype: ColumnDataType::String as i32, - }, - Column { - column_name: "ts".to_string(), - values: Some(Values { - ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], - ..Default::default() - }), - semantic_type: SemanticType::Timestamp as i32, - datatype: ColumnDataType::TimestampMillisecond as i32, - ..Default::default() - }, - ], - row_count: 3, - ..Default::default() - }; - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Insert(insert)), - } - .encode_to_vec(), - }); - - // Test auto add not existed column upon insertion. - let output = boarding(instance, ticket).await; - assert!(matches!(output, RpcOutput::AffectedRows(3))); - - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - request: Some(GrpcRequest::Query(QueryRequest { - query: Some(Query::Sql( - "SELECT ts, a, b FROM auto_created_table".to_string(), - )), - })), - } - .encode_to_vec(), - }); - - let output = boarding(instance, ticket).await; - let RpcOutput::RecordBatches(recordbatches) = output else { unreachable!() }; - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:15 | 4 | | -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:19 | | | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); - } - - async fn boarding(instance: &Arc, ticket: Request) -> RpcOutput { - let response = instance.do_get(ticket).await.unwrap(); - let result = flight::flight_data_to_object_result(response) - .await - .unwrap(); - result.try_into().unwrap() - } -} diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 40169b173d..22db0bc91a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -12,42 +12,549 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::{ObjectExpr, ObjectResult}; -use arrow_flight::flight_service_server::FlightService; -use arrow_flight::Ticket; +use api::v1::greptime_request::Request; +use api::v1::query_request::Query; +use api::v1::GreptimeRequest; use async_trait::async_trait; -use common_error::prelude::BoxedError; -use common_grpc::flight; -use prost::Message; -use servers::error as server_error; -use servers::query_handler::GrpcQueryHandler; -use snafu::ResultExt; -use tonic::Request; +use common_query::Output; +use servers::error::{self, Result}; +use servers::query_handler::{GrpcQueryHandler, SqlQueryHandler}; +use session::context::QueryContext; +use snafu::{ensure, OptionExt}; -use crate::error::{FlightGetSnafu, InvalidFlightDataSnafu, Result}; use crate::instance::Instance; -impl Instance { - async fn boarding(&self, ticket: Request) -> Result { - let response = self.do_get(ticket).await.context(FlightGetSnafu)?; - flight::flight_data_to_object_result(response) - .await - .context(InvalidFlightDataSnafu) - } -} - #[async_trait] impl GrpcQueryHandler for Instance { - async fn do_query(&self, query: ObjectExpr) -> server_error::Result { - let ticket = Request::new(Ticket { - ticket: query.encode_to_vec(), - }); - // TODO(LFC): Temporarily use old GRPC interface here, will get rid of them near the end of Arrow Flight adoption. - self.boarding(ticket) - .await - .map_err(BoxedError::new) - .with_context(|_| servers::error::ExecuteQuerySnafu { - query: format!("{query:?}"), - }) + async fn do_query(&self, query: GreptimeRequest) -> Result { + let request = query.request.context(error::GrpcRequestMissingFieldSnafu { + name: "GreptimeRequest.request", + })?; + let output = match request { + Request::Insert(request) => self.handle_insert(request).await?, + Request::Query(query_request) => { + let query = query_request + .query + .context(error::GrpcRequestMissingFieldSnafu { + name: "QueryRequest.query", + })?; + match query { + Query::Sql(sql) => { + let mut result = + SqlQueryHandler::do_query(self, &sql, QueryContext::arc()).await; + ensure!( + result.len() == 1, + error::NotSupportedSnafu { + feat: "execute multiple statements in SQL query string through GRPC interface" + } + ); + result.remove(0)? + } + Query::LogicalPlan(_) => { + return error::NotSupportedSnafu { + feat: "Execute LogicalPlan in Frontend", + } + .fail(); + } + } + } + Request::Ddl(request) => { + let query = GreptimeRequest { + request: Some(Request::Ddl(request)), + }; + GrpcQueryHandler::do_query(&*self.grpc_query_handler, query).await? + } + }; + Ok(output) + } +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::column::{SemanticType, Values}; + use api::v1::ddl_request::Expr as DdlExpr; + use api::v1::{ + alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, + CreateDatabaseExpr, CreateTableExpr, DdlRequest, InsertRequest, QueryRequest, + }; + use catalog::helper::{TableGlobalKey, TableGlobalValue}; + use common_query::Output; + use common_recordbatch::RecordBatches; + + use super::*; + use crate::table::DistTable; + use crate::tests; + use crate::tests::MockDistributedInstance; + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_handle_ddl_request() { + let instance = + tests::create_distributed_instance("test_distributed_handle_ddl_request").await; + let frontend = &instance.frontend; + + test_handle_ddl_request(frontend).await + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_handle_ddl_request() { + let standalone = + tests::create_standalone_instance("test_standalone_handle_ddl_request").await; + let instance = &standalone.instance; + + test_handle_ddl_request(instance).await + } + + async fn test_handle_ddl_request(instance: &Arc) { + let query = GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateDatabase(CreateDatabaseExpr { + database_name: "database_created_through_grpc".to_string(), + })), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::CreateTable(CreateTableExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + column_defs: vec![ + ColumnDef { + name: "a".to_string(), + datatype: ColumnDataType::String as _, + is_nullable: true, + default_constraint: vec![], + }, + ColumnDef { + name: "ts".to_string(), + datatype: ColumnDataType::TimestampMillisecond as _, + is_nullable: false, + default_constraint: vec![], + }, + ], + time_index: "ts".to_string(), + ..Default::default() + })), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = GreptimeRequest { + request: Some(Request::Ddl(DdlRequest { + expr: Some(DdlExpr::Alter(AlterExpr { + catalog_name: "greptime".to_string(), + schema_name: "database_created_through_grpc".to_string(), + table_name: "table_created_through_grpc".to_string(), + kind: Some(alter_expr::Kind::AddColumns(AddColumns { + add_columns: vec![AddColumn { + column_def: Some(ColumnDef { + name: "b".to_string(), + datatype: ColumnDataType::Int32 as _, + is_nullable: true, + default_constraint: vec![], + }), + is_key: false, + }], + })), + })), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + + let query = GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql("INSERT INTO database_created_through_grpc.table_created_through_grpc (a, b, ts) VALUES ('s', 1, 1672816466000)".to_string())) + })) + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(1))); + + let query = GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, a, b FROM database_created_through_grpc.table_created_through_grpc" + .to_string(), + )), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-04T07:14:26 | s | 1 | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_distributed_insert_and_query() { + common_telemetry::init_default_ut_logging(); + + let instance = + tests::create_distributed_instance("test_distributed_insert_and_query").await; + let frontend = &instance.frontend; + + let table_name = "my_dist_table"; + let sql = format!( + r" +CREATE TABLE {table_name} ( + a INT, + ts TIMESTAMP, + TIME INDEX (ts) +) PARTITION BY RANGE COLUMNS(a) ( + PARTITION r0 VALUES LESS THAN (10), + PARTITION r1 VALUES LESS THAN (20), + PARTITION r2 VALUES LESS THAN (50), + PARTITION r3 VALUES LESS THAN (MAXVALUE), +)" + ); + create_table(frontend, sql).await; + + test_insert_and_query_on_existing_table(frontend, table_name).await; + + verify_data_distribution( + &instance, + table_name, + HashMap::from([ + ( + 0u32, + "\ ++---------------------+---+ +| ts | a | ++---------------------+---+ +| 2023-01-01T07:26:12 | 1 | +| 2023-01-01T07:26:14 | | ++---------------------+---+", + ), + ( + 1u32, + "\ ++---------------------+----+ +| ts | a | ++---------------------+----+ +| 2023-01-01T07:26:13 | 11 | ++---------------------+----+", + ), + ( + 2u32, + "\ ++---------------------+----+ +| ts | a | ++---------------------+----+ +| 2023-01-01T07:26:15 | 20 | +| 2023-01-01T07:26:16 | 22 | ++---------------------+----+", + ), + ( + 3u32, + "\ ++---------------------+----+ +| ts | a | ++---------------------+----+ +| 2023-01-01T07:26:17 | 50 | +| 2023-01-01T07:26:18 | 55 | +| 2023-01-01T07:26:19 | 99 | ++---------------------+----+", + ), + ]), + ) + .await; + + test_insert_and_query_on_auto_created_table(frontend).await; + + // Auto created table has only one region. + verify_data_distribution( + &instance, + "auto_created_table", + HashMap::from([( + 0u32, + "\ ++---------------------+---+ +| ts | a | ++---------------------+---+ +| 2023-01-01T07:26:15 | 4 | +| 2023-01-01T07:26:16 | | +| 2023-01-01T07:26:17 | 6 | +| 2023-01-01T07:26:18 | | +| 2023-01-01T07:26:19 | | +| 2023-01-01T07:26:20 | | ++---------------------+---+", + )]), + ) + .await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_standalone_insert_and_query() { + common_telemetry::init_default_ut_logging(); + + let standalone = + tests::create_standalone_instance("test_standalone_insert_and_query").await; + let instance = &standalone.instance; + + let table_name = "my_table"; + let sql = format!("CREATE TABLE {table_name} (a INT, ts TIMESTAMP, TIME INDEX (ts))"); + create_table(instance, sql).await; + + test_insert_and_query_on_existing_table(instance, table_name).await; + + test_insert_and_query_on_auto_created_table(instance).await + } + + async fn create_table(frontend: &Arc, sql: String) { + let query = GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql(sql)), + })), + }; + let output = GrpcQueryHandler::do_query(frontend.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(0))); + } + + async fn test_insert_and_query_on_existing_table(instance: &Arc, table_name: &str) { + let insert = InsertRequest { + schema_name: "public".to_string(), + table_name: table_name.to_string(), + columns: vec![ + Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![1, 11, 20, 22, 50, 55, 99], + ..Default::default() + }), + null_mask: vec![4], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![ + 1672557972000, + 1672557973000, + 1672557974000, + 1672557975000, + 1672557976000, + 1672557977000, + 1672557978000, + 1672557979000, + ], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 8, + ..Default::default() + }; + + let query = GreptimeRequest { + request: Some(Request::Insert(insert)), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(8))); + + let query = GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql(format!( + "SELECT ts, a FROM {table_name} ORDER BY ts" + ))), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+----+ +| ts | a | ++---------------------+----+ +| 2023-01-01T07:26:12 | 1 | +| 2023-01-01T07:26:13 | 11 | +| 2023-01-01T07:26:14 | | +| 2023-01-01T07:26:15 | 20 | +| 2023-01-01T07:26:16 | 22 | +| 2023-01-01T07:26:17 | 50 | +| 2023-01-01T07:26:18 | 55 | +| 2023-01-01T07:26:19 | 99 | ++---------------------+----+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); + } + + async fn verify_data_distribution( + instance: &MockDistributedInstance, + table_name: &str, + expected_distribution: HashMap, + ) { + let table = instance + .frontend + .catalog_manager() + .table("greptime", "public", table_name) + .unwrap() + .unwrap(); + let table = table.as_any().downcast_ref::().unwrap(); + + let TableGlobalValue { regions_id_map, .. } = table + .table_global_value(&TableGlobalKey { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + table_name: table_name.to_string(), + }) + .await + .unwrap() + .unwrap(); + let region_to_dn_map = regions_id_map + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); + assert_eq!(region_to_dn_map.len(), expected_distribution.len()); + + for (region, dn) in region_to_dn_map.iter() { + let dn = instance.datanodes.get(dn).unwrap(); + let output = dn + .execute_sql( + &format!("SELECT ts, a FROM {table_name} ORDER BY ts"), + QueryContext::arc(), + ) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let actual = recordbatches.pretty_print().unwrap(); + + let expected = expected_distribution.get(region).unwrap(); + assert_eq!(&actual, expected); + } + } + + async fn test_insert_and_query_on_auto_created_table(instance: &Arc) { + let insert = InsertRequest { + schema_name: "public".to_string(), + table_name: "auto_created_table".to_string(), + columns: vec![ + Column { + column_name: "a".to_string(), + values: Some(Values { + i32_values: vec![4, 6], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Int32 as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + // Test auto create not existed table upon insertion. + let query = GreptimeRequest { + request: Some(Request::Insert(insert)), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(3))); + + let insert = InsertRequest { + schema_name: "public".to_string(), + table_name: "auto_created_table".to_string(), + columns: vec![ + Column { + column_name: "b".to_string(), + values: Some(Values { + string_values: vec!["x".to_string(), "z".to_string()], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::String as i32, + }, + Column { + column_name: "ts".to_string(), + values: Some(Values { + ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], + ..Default::default() + }), + semantic_type: SemanticType::Timestamp as i32, + datatype: ColumnDataType::TimestampMillisecond as i32, + ..Default::default() + }, + ], + row_count: 3, + ..Default::default() + }; + + // Test auto add not existed column upon insertion. + let query = GreptimeRequest { + request: Some(Request::Insert(insert)), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + assert!(matches!(output, Output::AffectedRows(3))); + + let query = GreptimeRequest { + request: Some(Request::Query(QueryRequest { + query: Some(Query::Sql( + "SELECT ts, a, b FROM auto_created_table".to_string(), + )), + })), + }; + let output = GrpcQueryHandler::do_query(instance.as_ref(), query) + .await + .unwrap(); + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++---------------------+---+---+ +| ts | a | b | ++---------------------+---+---+ +| 2023-01-01T07:26:15 | 4 | | +| 2023-01-01T07:26:16 | | | +| 2023-01-01T07:26:17 | 6 | | +| 2023-01-01T07:26:18 | | x | +| 2023-01-01T07:26:19 | | | +| 2023-01-01T07:26:20 | | z | ++---------------------+---+---+"; + assert_eq!(recordbatches.pretty_print().unwrap(), expected); } } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 1849ecca0f..a295598562 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -13,11 +13,8 @@ // limitations under the License. use async_trait::async_trait; -use common_error::prelude::BoxedError; -use servers::error as server_error; use servers::influxdb::InfluxdbRequest; use servers::query_handler::InfluxdbLineProtocolHandler; -use snafu::ResultExt; use crate::instance::Instance; @@ -25,12 +22,7 @@ use crate::instance::Instance; impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { let requests = request.try_into()?; - self.handle_inserts(requests) - .await - .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{request:?}"), - })?; + self.handle_inserts(requests).await?; Ok(()) } } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index ab8ddb9058..98314ff0ca 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -14,11 +14,11 @@ use api::prometheus::remote::read_request::ResponseType; use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; -use api::v1::object_expr::Request; -use api::v1::{query_request, ObjectExpr, QueryRequest}; +use api::v1::greptime_request::Request; +use api::v1::{query_request, GreptimeRequest, QueryRequest}; use async_trait::async_trait; -use client::RpcOutput; -use common_error::prelude::BoxedError; +use common_query::Output; +use common_recordbatch::RecordBatches; use common_telemetry::logging; use prost::Message; use servers::error::{self, Result as ServerResult}; @@ -59,8 +59,11 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult ServerResult { - let RpcOutput::RecordBatches(recordbatches) = object_result else { unreachable!() }; +async fn to_query_result(table_name: &str, output: Output) -> ServerResult { + let Output::Stream(stream) = output else { unreachable!() }; + let recordbatches = RecordBatches::try_collect(stream) + .await + .context(error::CollectRecordbatchSnafu)?; Ok(QueryResult { timeseries: prometheus::recordbatches_to_timeseries(table_name, recordbatches)?, }) @@ -71,7 +74,7 @@ impl Instance { &self, db: &str, queries: &[Query], - ) -> ServerResult> { + ) -> ServerResult> { let mut results = Vec::with_capacity(queries.len()); for query in queries { @@ -82,19 +85,14 @@ impl Instance { sql ); - let query = ObjectExpr { + let query = GreptimeRequest { request: Some(Request::Query(QueryRequest { query: Some(query_request::Query::Sql(sql.to_string())), })), }; - let object_result = self - .do_query(query) - .await? - .try_into() - .map_err(BoxedError::new) - .context(error::ExecuteQuerySnafu { query: &sql })?; + let output = self.do_query(query).await?; - results.push((table_name, object_result)); + results.push((table_name, output)); } Ok(results) } @@ -104,12 +102,7 @@ impl Instance { impl PrometheusProtocolHandler for Instance { async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { let requests = prometheus::to_grpc_insert_requests(database, request.clone())?; - self.handle_inserts(requests) - .await - .map_err(BoxedError::new) - .with_context(|_| error::ExecuteInsertSnafu { - msg: format!("{request:?}"), - })?; + self.handle_inserts(requests).await?; Ok(()) } @@ -123,10 +116,10 @@ impl PrometheusProtocolHandler for Instance { match response_type { ResponseType::Samples => { - let query_results = results - .into_iter() - .map(|(table_name, object_result)| to_query_result(&table_name, object_result)) - .collect::>>()?; + let mut query_results = Vec::with_capacity(results.len()); + for (table_name, output) in results { + query_results.push(to_query_result(&table_name, output).await?); + } let response = ReadResponse { results: query_results, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 9cb5773a2c..a02d02752b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -22,12 +22,13 @@ use api::v1::AlterExpr; use async_trait::async_trait; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use catalog::remote::KvBackendRef; -use client::{Database, RpcOutput}; +use client::Database; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; +use common_query::Output; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use common_telemetry::debug; @@ -108,7 +109,7 @@ impl Table for DistTable { .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let RpcOutput::AffectedRows(rows) = output else { unreachable!() }; + let Output::AffectedRows(rows) = output else { unreachable!() }; Ok(rows) } diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index bcb4539675..3996edb796 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::column::SemanticType; use api::v1::{Column, InsertRequest as GrpcInsertRequest}; -use client::{Database, RpcOutput}; +use client::Database; +use common_query::Output; use datatypes::prelude::ConcreteDataType; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; @@ -33,7 +34,7 @@ impl DistTable { pub async fn dist_insert( &self, inserts: HashMap, - ) -> Result { + ) -> Result { let route = self.table_routes.get_route(&self.table_name).await?; let mut joins = Vec::with_capacity(inserts.len()); @@ -68,10 +69,10 @@ impl DistTable { let mut success = 0; for join in joins { let object_result = join.await.context(error::JoinTaskSnafu)??; - let RpcOutput::AffectedRows(rows) = object_result else { unreachable!() }; + let Output::AffectedRows(rows) = object_result else { unreachable!() }; success += rows; } - Ok(RpcOutput::AffectedRows(success)) + Ok(Output::AffectedRows(success)) } } diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index ae69011f4d..5b9da34e71 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -16,8 +16,9 @@ use std::fmt::Formatter; use std::sync::Arc; use api::v1::InsertRequest; -use client::{Database, RpcOutput}; +use client::Database; use common_query::prelude::Expr; +use common_query::Output; use common_recordbatch::RecordBatches; use datafusion::datasource::DefaultTableSource; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; @@ -46,7 +47,7 @@ impl DatanodeInstance { Self { table, db } } - pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result { + pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result { self.db.insert(request).await } @@ -62,7 +63,7 @@ impl DatanodeInstance { .logical_plan(substrait_plan.to_vec()) .await .context(error::RequestDatanodeSnafu)?; - let RpcOutput::RecordBatches(recordbatches) = result else { unreachable!() }; + let Output::RecordBatches(recordbatches) = result else { unreachable!() }; Ok(recordbatches) } diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 3fbac5838d..f80fa83510 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -21,8 +21,8 @@ common-error = { path = "../common/error" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } crc = "3.0" -futures-util = "0.3" futures.workspace = true +futures-util.workspace = true hex = "0.4" protobuf = { version = "2", features = ["bytes"] } raft-engine = "0.3" diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 43b26ffbcb..003bec65f6 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -25,7 +25,7 @@ h2 = "0.3" http-body = "0.4" lazy_static = "1.4" parking_lot = "0.12" -prost = "0.11" +prost.workspace = true regex = "1.6" serde = "1.0" serde_json = "1.0" diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 34fbc17453..5890f192b7 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -23,7 +23,7 @@ datafusion-physical-expr.workspace = true datafusion-sql.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" -futures-util = "0.3" +futures-util.workspace = true metrics = "0.20" once_cell = "1.10" promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", rev = "d2f6ec4bbbae19b5156cfc977a6e7de9c6684651" } diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 4494624ac4..433c3b3c5b 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -39,8 +39,8 @@ datafusion-common = { workspace = true, optional = true } datafusion-expr = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } datatypes = { path = "../datatypes" } -futures-util = "0.3" futures.workspace = true +futures-util.workspace = true once_cell = "1.17.0" paste = { workspace = true, optional = true } query = { path = "../query" } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 8ef8f9cb73..e87b9d2501 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] aide = { version = "0.9", features = ["axum"] } +arrow-flight.workspace = true api = { path = "../api" } async-trait = "0.1" axum = "0.6" @@ -17,6 +18,7 @@ common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } +common-grpc-expr = { path = "../common/grpc-expr" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } @@ -36,7 +38,8 @@ once_cell = "1.16" openmetrics-parser = "0.4" opensrv-mysql = "0.3" pgwire = "0.6.3" -prost = "0.11" +pin-project = "1.0" +prost.workspace = true query = { path = "../query" } rand = "0.8" regex = "1.6" @@ -56,7 +59,6 @@ tokio.workspace = true tokio-rustls = "0.23" tokio-stream = { version = "0.1", features = ["net"] } tonic.workspace = true -tonic-reflection = "0.5" tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["full"] } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 8e110e6b0e..bedfccf749 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -24,6 +24,9 @@ use catalog; use common_error::prelude::*; use hyper::header::ToStrError; use serde_json::json; +use tonic::codegen::http::{HeaderMap, HeaderValue}; +use tonic::metadata::MetadataMap; +use tonic::Code; use crate::auth; @@ -79,6 +82,12 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to execute GRPC query, source: {}", source))] + ExecuteGrpcQuery { + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display("Failed to execute sql statement, source: {}", source))] ExecuteStatement { #[snafu(backtrace)] @@ -184,12 +193,9 @@ pub enum Error { #[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))] InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to decode region id, source: {}", source))] - DecodeRegionNumber { source: api::DecodeError }, - - #[snafu(display("Failed to build gRPC reflection service, source: {}", source))] - GrpcReflectionService { - source: tonic_reflection::server::Error, + #[snafu(display("Invalid Flight ticket, source: {}", source))] + InvalidFlightTicket { + source: api::DecodeError, backtrace: Backtrace, }, @@ -252,6 +258,15 @@ pub enum Error { #[snafu(display("Cannot find requested database: {}-{}", catalog, schema))] DatabaseNotFound { catalog: String, schema: String }, + + #[snafu(display("Failed to find new columns on insertion: {}", source))] + FindNewColumnsOnInsertion { + #[snafu(backtrace)] + source: common_grpc_expr::error::Error, + }, + + #[snafu(display("GRPC request missing field: {}", name))] + GrpcRequestMissingField { name: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -270,13 +285,13 @@ impl ErrorExt for Error { | AlreadyStarted { .. } | InvalidPromRemoteReadQueryResult { .. } | TcpBind { .. } - | GrpcReflectionService { .. } | CatalogError { .. } | BuildingContext { .. } => StatusCode::Internal, InsertScript { source, .. } | ExecuteScript { source, .. } | ExecuteQuery { source, .. } + | ExecuteGrpcQuery { source, .. } | ExecuteStatement { source, .. } | ExecuteInsert { source, .. } | ExecuteAlter { source, .. } @@ -291,7 +306,8 @@ impl ErrorExt for Error { | DecodePromRemoteRequest { .. } | DecompressPromRemoteRequest { .. } | InvalidPromRemoteRequest { .. } - | DecodeRegionNumber { .. } + | InvalidFlightTicket { .. } + | GrpcRequestMissingField { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } | ConvertFlightMessage { source } => { @@ -311,6 +327,8 @@ impl ErrorExt for Error { | InvalidUtf8Value { .. } => StatusCode::InvalidAuthHeader, DatabaseNotFound { .. } => StatusCode::DatabaseNotFound, + + FindNewColumnsOnInsertion { source } => source.status_code(), } } @@ -325,7 +343,20 @@ impl ErrorExt for Error { impl From for tonic::Status { fn from(err: Error) -> Self { - tonic::Status::new(tonic::Code::Internal, err.to_string()) + let mut headers = HeaderMap::::with_capacity(2); + + // If either of the status_code or error msg cannot convert to valid HTTP header value + // (which is a very rare case), just ignore. Client will use Tonic status code and message. + if let Ok(code) = HeaderValue::from_bytes((err.status_code() as u32).to_string().as_bytes()) + { + headers.insert(INNER_ERROR_CODE, code); + } + if let Ok(err_msg) = HeaderValue::from_bytes(err.to_string().as_bytes()) { + headers.insert(INNER_ERROR_MSG, err_msg); + } + + let metadata = MetadataMap::from_headers(headers); + tonic::Status::with_metadata(Code::Internal, err.to_string(), metadata) } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 14a1f21ac9..acbbf5fdcd 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod handler; +mod flight; use std::net::SocketAddr; use std::sync::Arc; -use api::v1::{greptime_server, BatchRequest, BatchResponse}; +use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::info; @@ -27,10 +27,9 @@ use tokio::net::TcpListener; use tokio::sync::oneshot::{self, Sender}; use tokio::sync::Mutex; use tokio_stream::wrappers::TcpListenerStream; -use tonic::{Request, Response, Status}; -use crate::error::{self, AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; -use crate::grpc::handler::BatchHandler; +use crate::error::{AlreadyStartedSnafu, Result, StartGrpcSnafu, TcpBindSnafu}; +use crate::grpc::flight::FlightHandler; use crate::query_handler::GrpcQueryHandlerRef; use crate::server::Server; @@ -49,27 +48,9 @@ impl GrpcServer { } } - pub fn create_service(&self) -> greptime_server::GreptimeServer { - let service = GrpcService { - handler: BatchHandler::new(self.query_handler.clone(), self.runtime.clone()), - }; - greptime_server::GreptimeServer::new(service) - } -} - -pub struct GrpcService { - handler: BatchHandler, -} - -#[tonic::async_trait] -impl greptime_server::Greptime for GrpcService { - async fn batch( - &self, - req: Request, - ) -> std::result::Result, Status> { - let req = req.into_inner(); - let res = self.handler.batch(req).await?; - Ok(Response::new(res)) + pub fn create_service(&self) -> FlightServiceServer { + let service = FlightHandler::new(self.query_handler.clone(), self.runtime.clone()); + FlightServiceServer::new(service) } } @@ -107,16 +88,9 @@ impl Server for GrpcServer { (listener, addr) }; - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(api::v1::GREPTIME_FD_SET) - .with_service_name("greptime.v1.Greptime") - .build() - .context(error::GrpcReflectionServiceSnafu)?; - // Would block to serve requests. tonic::transport::Server::builder() .add_service(self.create_service()) - .add_service(reflection_service) .serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop)) .await .context(StartGrpcSnafu)?; diff --git a/src/frontend/src/instance/distributed/flight.rs b/src/servers/src/grpc/flight.rs similarity index 60% rename from src/frontend/src/instance/distributed/flight.rs rename to src/servers/src/grpc/flight.rs index f1bc2e7f7a..e4ab590bd4 100644 --- a/src/frontend/src/instance/distributed/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -12,31 +12,47 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; +mod stream; -use api::v1::ddl_request::Expr as DdlExpr; -use api::v1::object_expr::Request as GrpcRequest; -use api::v1::ObjectExpr; +use std::pin::Pin; +use std::sync::Arc; + +use api::v1::GreptimeRequest; use arrow_flight::flight_service_server::FlightService; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, }; use async_trait::async_trait; -use datanode::instance::flight::to_flight_data_stream; +use common_grpc::flight::{FlightEncoder, FlightMessage}; +use common_query::Output; +use common_runtime::Runtime; use futures::Stream; use prost::Message; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; +use tokio::sync::oneshot; use tonic::{Request, Response, Status, Streaming}; -use crate::error::{IncompleteGrpcResultSnafu, InvalidFlightTicketSnafu}; -use crate::instance::distributed::DistInstance; +use crate::error; +use crate::grpc::flight::stream::FlightRecordBatchStream; +use crate::query_handler::GrpcQueryHandlerRef; type TonicResult = Result; type TonicStream = Pin> + Send + Sync + 'static>>; +pub(crate) struct FlightHandler { + handler: GrpcQueryHandlerRef, + runtime: Arc, +} + +impl FlightHandler { + pub(crate) fn new(handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { + Self { handler, runtime } + } +} + #[async_trait] -impl FlightService for DistInstance { +impl FlightService for FlightHandler { type HandshakeStream = TonicStream; async fn handshake( @@ -73,37 +89,27 @@ impl FlightService for DistInstance { async fn do_get(&self, request: Request) -> TonicResult> { let ticket = request.into_inner().ticket; - let request = ObjectExpr::decode(ticket.as_slice()) - .context(InvalidFlightTicketSnafu)? - .request - .context(IncompleteGrpcResultSnafu { - err_msg: "Missing 'request' in ObjectExpr", - })?; - let output = match request { - GrpcRequest::Insert(request) => self.handle_dist_insert(request).await?, - GrpcRequest::Query(_) => { - unreachable!("Query should have been handled directly in Frontend Instance!") - } - GrpcRequest::Ddl(request) => { - let expr = request.expr.context(IncompleteGrpcResultSnafu { - err_msg: "Missing 'expr' in DDL request", - })?; - match expr { - DdlExpr::CreateDatabase(expr) => self.handle_create_database(expr).await?, - DdlExpr::CreateTable(mut expr) => { - // TODO(LFC): Support creating distributed table through GRPC interface. - // Currently only SQL supports it; how to design the fields in CreateTableExpr? - self.create_table(&mut expr, None).await? - } - DdlExpr::Alter(expr) => self.handle_alter_table(expr).await?, - DdlExpr::DropTable(_) => { - // TODO(LFC): Implement distributed drop table. - // Seems the whole "drop table through GRPC interface" feature is not implemented? - return Err(Status::unimplemented("Not yet implemented")); - } - } - } - }; + let request = + GreptimeRequest::decode(ticket.as_slice()).context(error::InvalidFlightTicketSnafu)?; + + let (tx, rx) = oneshot::channel(); + let handler = self.handler.clone(); + + // Executes requests in another runtime to + // 1. prevent the execution from being cancelled unexpected by Tonic runtime; + // 2. avoid the handler blocks the gRPC runtime incidentally. + self.runtime.spawn(async move { + let result = handler.do_query(request).await; + + // Ignore the sending result. + // Usually an error indicates the rx at Tonic side is dropped (due to request timeout). + let _ = tx.send(result); + }); + + // Safety: An early-dropped tx usually indicates a serious problem (like panic). + // This unwrap is used to poison the upper layer. + let output = rx.await.unwrap()?; + let stream = to_flight_data_stream(output); Ok(Response::new(stream)) } @@ -141,3 +147,22 @@ impl FlightService for DistInstance { Err(Status::unimplemented("Not yet implemented")) } } + +fn to_flight_data_stream(output: Output) -> TonicStream { + match output { + Output::Stream(stream) => { + let stream = FlightRecordBatchStream::new(stream); + Box::pin(stream) as _ + } + Output::RecordBatches(x) => { + let stream = FlightRecordBatchStream::new(x.as_stream()); + Box::pin(stream) as _ + } + Output::AffectedRows(rows) => { + let stream = tokio_stream::once(Ok( + FlightEncoder::default().encode(FlightMessage::AffectedRows(rows)) + )); + Box::pin(stream) as _ + } + } +} diff --git a/src/datanode/src/instance/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs similarity index 97% rename from src/datanode/src/instance/flight/stream.rs rename to src/servers/src/grpc/flight/stream.rs index d0a04a3962..cd39a31fe4 100644 --- a/src/datanode/src/instance/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -26,8 +26,8 @@ use pin_project::{pin_project, pinned_drop}; use snafu::ResultExt; use tokio::task::JoinHandle; +use super::TonicResult; use crate::error; -use crate::instance::flight::TonicResult; #[pin_project(PinnedDrop)] pub(super) struct FlightRecordBatchStream { @@ -72,7 +72,7 @@ impl FlightRecordBatchStream { } } Err(e) => { - let e = Err(e).context(error::PollRecordbatchStreamSnafu); + let e = Err(e).context(error::CollectRecordbatchSnafu); if let Err(e) = tx.send(e.map_err(|x| x.into())).await { warn!("stop sending Flight data, err: {e}"); } diff --git a/src/servers/src/grpc/handler.rs b/src/servers/src/grpc/handler.rs deleted file mode 100644 index 1697828788..0000000000 --- a/src/servers/src/grpc/handler.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use api::v1::{BatchRequest, BatchResponse, DatabaseResponse}; -use common_runtime::Runtime; -use tokio::sync::oneshot; - -use crate::error::Result; -use crate::query_handler::GrpcQueryHandlerRef; - -#[derive(Clone)] -pub struct BatchHandler { - query_handler: GrpcQueryHandlerRef, - runtime: Arc, -} - -impl BatchHandler { - pub fn new(query_handler: GrpcQueryHandlerRef, runtime: Arc) -> Self { - Self { - query_handler, - runtime, - } - } - - pub async fn batch(&self, batch_req: BatchRequest) -> Result { - let (tx, rx) = oneshot::channel(); - let query_handler = self.query_handler.clone(); - - let future = async move { - let mut batch_resp = BatchResponse::default(); - let mut db_resp = DatabaseResponse::default(); - - for db_req in batch_req.databases { - db_resp.results.reserve(db_req.exprs.len()); - - for obj_expr in db_req.exprs { - let object_resp = query_handler.do_query(obj_expr).await?; - - db_resp.results.push(object_resp); - } - } - batch_resp.databases.push(db_resp); - - Ok(batch_resp) - }; - - // Executes requests in another runtime to - // 1. prevent the execution from being cancelled unexpected by tonic runtime. - // 2. avoid the handler blocks the gRPC runtime - self.runtime.spawn(async move { - let result = future.await; - - // Ignore send result. Usually an error indicates the rx is dropped (request timeouted). - let _ = tx.send(result); - }); - // Safety: An early-dropped tx usually indicates a serious problem (like panic). This unwrap - // is used to poison the upper layer. - rx.await.unwrap() - } -} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 13603b0eb1..aed712ab52 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::prometheus::remote::{ReadRequest, WriteRequest}; -use api::v1::{ObjectExpr, ObjectResult}; +use api::v1::GreptimeRequest; use async_trait::async_trait; use common_query::Output; use session::context::QueryContextRef; @@ -65,7 +65,7 @@ pub trait ScriptHandler { #[async_trait] pub trait GrpcQueryHandler { - async fn do_query(&self, query: ObjectExpr) -> Result; + async fn do_query(&self, query: GreptimeRequest) -> Result; } #[async_trait] diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 6dce96b73a..6cc5cb10c9 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -19,13 +19,13 @@ common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } futures.workspace = true -futures-util = "0.3" +futures-util.workspace = true lazy_static = "1.4" object-store = { path = "../object-store" } parquet = { workspace = true, features = ["async"] } paste.workspace = true planus = "0.2" -prost = "0.11" +prost.workspace = true regex = "1.5" serde.workspace = true serde_json = "1.0" diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index d79883daf4..5ce976e489 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -13,6 +13,7 @@ client = { path = "../src/client" } common-catalog = { path = "../src/common/catalog" } common-error = { path = "../src/common/error" } common-grpc = { path = "../src/common/grpc" } +common-query = { path = "../src/common/query" } common-runtime = { path = "../src/common/runtime" } common-telemetry = { path = "../src/common/telemetry" } datanode = { path = "../src/datanode" } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7c8f65ec48..88ade995fd 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -17,8 +17,9 @@ use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId, }; -use client::{Client, Database, RpcOutput}; +use client::{Client, Database}; use common_catalog::consts::MIN_USER_TABLE_ID; +use common_query::Output; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; @@ -136,7 +137,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { // create let expr = testing_create_expr(); let result = db.create(expr).await.unwrap(); - assert!(matches!(result, RpcOutput::AffectedRows(0))); + assert!(matches!(result, Output::AffectedRows(0))); //alter let add_column = ColumnDef { @@ -158,7 +159,7 @@ pub async fn test_insert_and_select(store_type: StorageType) { kind: Some(kind), }; let result = db.alter(expr).await.unwrap(); - assert!(matches!(result, RpcOutput::AffectedRows(0))); + assert!(matches!(result, Output::AffectedRows(0))); // insert insert_and_assert(&db).await; @@ -194,7 +195,7 @@ async fn insert_and_assert(db: &Database) { ) .await .unwrap(); - assert!(matches!(result, RpcOutput::AffectedRows(2))); + assert!(matches!(result, Output::AffectedRows(2))); // select let result = db @@ -202,7 +203,7 @@ async fn insert_and_assert(db: &Database) { .await .unwrap(); match result { - RpcOutput::RecordBatches(recordbatches) => { + Output::RecordBatches(recordbatches) => { let pretty = recordbatches.pretty_print().unwrap(); let expected = "\ +-------+------+--------+-------------------------+ diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 5a5b58bf70..5a50dc3a68 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -9,5 +9,6 @@ async-trait = "0.1" client = { path = "../../src/client" } common-base = { path = "../../src/common/base" } common-grpc = { path = "../../src/common/grpc" } +common-query = { path = "../../src/common/query" } sqlness = { git = "https://github.com/ceresdb/sqlness.git" } tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 652568840b..a61c1c434d 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -18,7 +18,8 @@ use std::process::Stdio; use std::time::Duration; use async_trait::async_trait; -use client::{Client, Database as DB, Error as ClientError, RpcOutput}; +use client::{Client, Database as DB, Error as ClientError}; +use common_query::Output; use sqlness::{Database, Environment}; use tokio::process::{Child, Command}; @@ -117,17 +118,17 @@ impl Database for GreptimeDB { } struct ResultDisplayer { - result: Result, + result: Result, } impl Display for ResultDisplayer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.result { Ok(result) => match result { - RpcOutput::AffectedRows(rows) => { + Output::AffectedRows(rows) => { write!(f, "Affected Rows: {rows}") } - RpcOutput::RecordBatches(recordbatches) => { + Output::RecordBatches(recordbatches) => { let pretty = recordbatches.pretty_print().map_err(|e| e.to_string()); match pretty { Ok(s) => write!(f, "{s}"), @@ -136,6 +137,7 @@ impl Display for ResultDisplayer { } } } + Output::Stream(_) => unreachable!(), }, Err(e) => write!(f, "Failed to execute, error: {e:?}"), }