From de6803d2532130ee99538976d4bb384f87c21ec1 Mon Sep 17 00:00:00 2001 From: LFC Date: Fri, 30 Dec 2022 10:24:09 +0800 Subject: [PATCH] feat: handle InsertRequest(formerly InsertExpr) in new Arrow Flight (#800) feat: handle InsertRequest(formerly InsertExpr) in new Arrow Flight interface --- Cargo.lock | 3 +- benchmarks/src/bin/nyc-taxi.rs | 6 +- src/api/Cargo.toml | 1 + src/api/greptime/v1/database.proto | 20 +-- src/api/src/result.rs | 81 ++------- src/client/examples/insert.rs | 106 ----------- src/client/src/database.rs | 117 +++++------- src/client/src/error.rs | 9 +- src/client/src/lib.rs | 2 +- src/common/grpc-expr/src/insert.rs | 101 +++++------ src/common/grpc-expr/src/lib.rs | 5 +- src/common/grpc/Cargo.toml | 1 + src/common/grpc/src/flight.rs | 73 ++++++-- src/datanode/Cargo.toml | 1 - src/datanode/src/error.rs | 4 - src/datanode/src/instance/flight.rs | 130 +++++++------- src/datanode/src/instance/flight/stream.rs | 48 +++-- src/datanode/src/instance/grpc.rs | 131 ++------------ src/frontend/src/error.rs | 7 - src/frontend/src/instance.rs | 198 ++++++++++----------- src/frontend/src/instance/influxdb.rs | 7 +- src/frontend/src/instance/prometheus.rs | 29 ++- src/frontend/src/table.rs | 89 +++++---- src/frontend/src/table/insert.rs | 46 +++-- src/frontend/src/table/scan.rs | 17 +- src/servers/src/influxdb.rs | 24 +-- src/servers/src/opentsdb/codec.rs | 9 +- src/servers/src/prometheus.rs | 15 +- src/servers/tests/http/influxdb_test.rs | 6 +- tests-integration/tests/grpc.rs | 17 +- tests/runner/src/env.rs | 29 ++- 31 files changed, 510 insertions(+), 822 deletions(-) delete mode 100644 src/client/examples/insert.rs diff --git a/Cargo.lock b/Cargo.lock index f314919393..a2eb40eeee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,6 +137,7 @@ checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72" name = "api" version = "0.1.0" dependencies = [ + "arrow-flight", "common-base", "common-error", "common-time", @@ -1477,6 +1478,7 @@ dependencies = [ "dashmap", "datafusion", "datatypes", + "flatbuffers", "futures", "prost 0.11.3", "rand 0.8.5", @@ -2158,7 +2160,6 @@ dependencies = [ "datafusion", "datafusion-common", "datatypes", - "flatbuffers", "futures", "hyper", "log-store", diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index bbc1a4462a..a0d4eee141 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch; use clap::Parser; use client::admin::Admin; use client::api::v1::column::Values; -use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertExpr, TableId}; +use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, TableId}; use client::{Client, Database}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -100,7 +100,7 @@ async fn write_data( for record_batch in record_batch_reader { let record_batch = record_batch.unwrap(); let (columns, row_count) = convert_record_batch(record_batch); - let insert_expr = InsertExpr { + let request = InsertRequest { schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), region_number: 0, @@ -108,7 +108,7 @@ async fn write_data( row_count, }; let now = Instant::now(); - db.insert(insert_expr).await.unwrap(); + db.insert(request).await.unwrap(); let elapsed = now.elapsed(); total_rpc_elapsed_ms += elapsed.as_millis(); progress_bar.inc(row_count as _); diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5e4fbd8e4a..1352705a5d 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +arrow-flight.workspace = true common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 0db3aa7abe..82295a6437 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -15,10 +15,9 @@ message DatabaseResponse { } message ObjectExpr { - ExprHeader header = 1; - oneof expr { - InsertExpr insert = 2; - QueryRequest query = 3; + oneof request { + InsertRequest insert = 1; + QueryRequest query = 2; } } @@ -29,7 +28,7 @@ message QueryRequest { } } -message InsertExpr { +message InsertRequest { string schema_name = 1; string table_name = 2; @@ -38,7 +37,7 @@ message InsertExpr { // The row_count of all columns, which include null and non-null values. // - // Note: the row_count of all columns in a InsertExpr must be same. + // Note: the row_count of all columns in a InsertRequest must be same. uint32 row_count = 4; // The region number of current insert request. @@ -47,14 +46,7 @@ message InsertExpr { message ObjectResult { ResultHeader header = 1; - oneof result { - MutateResult mutate = 2; - FlightDataRaw flight_data = 3; - } -} - -message FlightDataRaw { - repeated bytes raw_data = 1; + repeated bytes flight_data = 2; } message FlightDataExt { diff --git a/src/api/src/result.rs b/src/api/src/result.rs index 6d79a3b775..860d144600 100644 --- a/src/api/src/result.rs +++ b/src/api/src/result.rs @@ -12,29 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::prelude::ErrorExt; +use arrow_flight::FlightData; +use prost::Message; -use crate::v1::{ - admin_result, object_result, AdminResult, MutateResult, ObjectResult, ResultHeader, -}; +use crate::v1::{admin_result, AdminResult, MutateResult, ObjectResult, ResultHeader}; pub const PROTOCOL_VERSION: u32 = 1; pub type Success = u32; pub type Failure = u32; -type FlightDataRaw = Vec>; #[derive(Default)] pub struct ObjectResultBuilder { version: u32, code: u32, err_msg: Option, - result: Option, -} - -pub enum Body { - Mutate((Success, Failure)), - FlightDataRaw(FlightDataRaw), + flight_data: Option>, } impl ObjectResultBuilder { @@ -61,13 +54,8 @@ impl ObjectResultBuilder { self } - pub fn mutate_result(mut self, success: u32, failure: u32) -> Self { - self.result = Some(Body::Mutate((success, failure))); - self - } - - pub fn flight_data(mut self, flight_data: FlightDataRaw) -> Self { - self.result = Some(Body::FlightDataRaw(flight_data)); + pub fn flight_data(mut self, flight_data: Vec) -> Self { + self.flight_data = Some(flight_data); self } @@ -78,30 +66,21 @@ impl ObjectResultBuilder { err_msg: self.err_msg.unwrap_or_default(), }); - let result = match self.result { - Some(Body::Mutate((success, failure))) => { - Some(object_result::Result::Mutate(MutateResult { - success, - failure, - })) - } - Some(Body::FlightDataRaw(raw_data)) => Some(object_result::Result::FlightData( - crate::v1::FlightDataRaw { raw_data }, - )), - None => None, + let flight_data = if let Some(flight_data) = self.flight_data { + flight_data + .into_iter() + .map(|x| x.encode_to_vec()) + .collect::>>() + } else { + vec![] }; - - ObjectResult { header, result } + ObjectResult { + header, + flight_data, + } } } -pub fn build_err_result(err: &impl ErrorExt) -> ObjectResult { - ObjectResultBuilder::new() - .status_code(err.status_code() as u32) - .err_msg(err.to_string()) - .build() -} - #[derive(Debug)] pub struct AdminResultBuilder { version: u32, @@ -159,11 +138,7 @@ impl Default for AdminResultBuilder { #[cfg(test)] mod tests { - use common_error::status_code::StatusCode; - use super::*; - use crate::error::UnknownColumnDataTypeSnafu; - use crate::v1::{object_result, MutateResult}; #[test] fn test_object_result_builder() { @@ -171,32 +146,10 @@ mod tests { .version(101) .status_code(500) .err_msg("Failed to read this file!".to_string()) - .mutate_result(100, 20) .build(); let header = obj_result.header.unwrap(); assert_eq!(101, header.version); assert_eq!(500, header.code); assert_eq!("Failed to read this file!", header.err_msg); - - let result = obj_result.result.unwrap(); - assert_eq!( - object_result::Result::Mutate(MutateResult { - success: 100, - failure: 20, - }), - result - ); - } - - #[test] - fn test_build_err_result() { - let err = UnknownColumnDataTypeSnafu { datatype: 1 }.build(); - let err_result = build_err_result(&err); - let header = err_result.header.unwrap(); - let result = err_result.result; - - assert_eq!(PROTOCOL_VERSION, header.version); - assert_eq!(StatusCode::InvalidArguments as u32, header.code); - assert!(result.is_none()); } } diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs deleted file mode 100644 index 66f38eded3..0000000000 --- a/src/client/examples/insert.rs +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::*; -use client::{Client, Database}; - -fn main() { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) - .unwrap(); - - run(); -} - -#[tokio::main] -async fn run() { - let client = Client::with_urls(vec!["127.0.0.1:3001"]); - let db = Database::new("greptime", client); - - let (columns, row_count) = insert_data(); - - let expr = InsertExpr { - schema_name: "public".to_string(), - table_name: "demo".to_string(), - region_number: 0, - columns, - row_count, - }; - db.insert(expr).await.unwrap(); -} - -fn insert_data() -> (Vec, u32) { - const SEMANTIC_TAG: i32 = 0; - const SEMANTIC_FIELD: i32 = 1; - const SEMANTIC_TS: i32 = 2; - - let row_count = 4; - - let host_vals = column::Values { - string_values: vec![ - "host1".to_string(), - "host2".to_string(), - "host3".to_string(), - "host4".to_string(), - ], - ..Default::default() - }; - let host_column = Column { - column_name: "host".to_string(), - semantic_type: SEMANTIC_TAG, - values: Some(host_vals), - null_mask: vec![0], - ..Default::default() - }; - - let cpu_vals = column::Values { - f64_values: vec![0.31, 0.41, 0.2], - ..Default::default() - }; - let cpu_column = Column { - column_name: "cpu".to_string(), - semantic_type: SEMANTIC_FIELD, - values: Some(cpu_vals), - null_mask: vec![2], - ..Default::default() - }; - - let mem_vals = column::Values { - f64_values: vec![0.1, 0.2, 0.3], - ..Default::default() - }; - let mem_column = Column { - column_name: "memory".to_string(), - semantic_type: SEMANTIC_FIELD, - values: Some(mem_vals), - null_mask: vec![4], - ..Default::default() - }; - - let ts_vals = column::Values { - i64_values: vec![100, 101, 102, 103], - ..Default::default() - }; - let ts_column = Column { - column_name: "ts".to_string(), - semantic_type: SEMANTIC_TS, - values: Some(ts_vals), - null_mask: vec![0], - ..Default::default() - }; - - ( - vec![host_column, cpu_column, mem_column, ts_column], - row_count, - ) -} diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 29ecc07b56..78b27a208f 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -13,15 +13,18 @@ // limitations under the License. use api::v1::{ - object_expr, object_result, query_request, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest, + object_expr, query_request, DatabaseRequest, InsertRequest, ObjectExpr, + ObjectResult as GrpcObjectResult, QueryRequest, }; use common_error::status_code::StatusCode; -use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; +use common_grpc::flight::{ + flight_messages_to_recordbatches, raw_flight_data_to_message, FlightMessage, +}; use common_query::Output; +use common_recordbatch::RecordBatches; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::DatanodeSnafu; +use crate::error::{ConvertFlightDataSnafu, DatanodeSnafu, IllegalFlightMessagesSnafu}; use crate::{error, Client, Result}; pub const PROTOCOL_VERSION: u32 = 1; @@ -44,57 +47,30 @@ impl Database { &self.name } - pub async fn insert(&self, insert: InsertExpr) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; + pub async fn insert(&self, request: InsertRequest) -> Result { let expr = ObjectExpr { - header: Some(header), - expr: Some(object_expr::Expr::Insert(insert)), + request: Some(object_expr::Request::Insert(request)), }; self.object(expr).await?.try_into() } - pub async fn batch_insert(&self, insert_exprs: Vec) -> Result> { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - let obj_exprs = insert_exprs - .into_iter() - .map(|expr| ObjectExpr { - header: Some(header.clone()), - expr: Some(object_expr::Expr::Insert(expr)), - }) - .collect(); - self.objects(obj_exprs) - .await? - .into_iter() - .map(|result| result.try_into()) - .collect() - } - - pub async fn sql(&self, sql: &str) -> Result { + pub async fn sql(&self, sql: &str) -> Result { let query = QueryRequest { query: Some(query_request::Query::Sql(sql.to_string())), }; - self.do_select(query).await + self.do_query(query).await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - let select_expr = QueryRequest { + pub async fn logical_plan(&self, logical_plan: Vec) -> Result { + let query = QueryRequest { query: Some(query_request::Query::LogicalPlan(logical_plan)), }; - self.do_select(select_expr).await + self.do_query(query).await } - async fn do_select(&self, select_expr: QueryRequest) -> Result { - let header = ExprHeader { - version: PROTOCOL_VERSION, - }; - + async fn do_query(&self, request: QueryRequest) -> Result { let expr = ObjectExpr { - header: Some(header), - expr: Some(object_expr::Expr::Query(select_expr)), + request: Some(object_expr::Request::Query(request)), }; let obj_result = self.object(expr).await?; @@ -130,12 +106,12 @@ impl Database { } #[derive(Debug)] -pub enum ObjectResult { - FlightData(Vec), - Mutate(GrpcMutateResult), +pub enum RpcOutput { + RecordBatches(RecordBatches), + AffectedRows(usize), } -impl TryFrom for ObjectResult { +impl TryFrom for RpcOutput { type Error = error::Error; fn try_from(object_result: api::v1::ObjectResult) -> std::result::Result { @@ -148,39 +124,32 @@ impl TryFrom for ObjectResult { .fail(); } - let obj_result = object_result.result.context(error::MissingResultSnafu { - name: "result".to_string(), - expected: 1_usize, - actual: 0_usize, - })?; - Ok(match obj_result { - object_result::Result::Mutate(mutate) => ObjectResult::Mutate(mutate), - object_result::Result::FlightData(flight_data) => { - let flight_messages = raw_flight_data_to_message(flight_data.raw_data) - .context(error::ConvertFlightDataSnafu)?; - ObjectResult::FlightData(flight_messages) - } - }) + let flight_messages = raw_flight_data_to_message(object_result.flight_data) + .context(ConvertFlightDataSnafu)?; + + let output = if let Some(FlightMessage::AffectedRows(rows)) = flight_messages.get(0) { + ensure!( + flight_messages.len() == 1, + IllegalFlightMessagesSnafu { + reason: "Expect 'AffectedRows' Flight messages to be one and only!" + } + ); + RpcOutput::AffectedRows(*rows) + } else { + let recordbatches = flight_messages_to_recordbatches(flight_messages) + .context(ConvertFlightDataSnafu)?; + RpcOutput::RecordBatches(recordbatches) + }; + Ok(output) } } -impl TryFrom for Output { - type Error = error::Error; - - fn try_from(value: ObjectResult) -> Result { - let output = match value { - ObjectResult::Mutate(mutate) => { - if mutate.failure != 0 { - return error::MutateFailureSnafu { - failure: mutate.failure, - } - .fail(); - } - Output::AffectedRows(mutate.success as usize) - } - ObjectResult::FlightData(_) => unreachable!(), - }; - Ok(output) +impl From for Output { + fn from(value: RpcOutput) -> Self { + match value { + RpcOutput::AffectedRows(x) => Output::AffectedRows(x), + RpcOutput::RecordBatches(x) => Output::RecordBatches(x), + } } } diff --git a/src/client/src/error.rs b/src/client/src/error.rs index e7e6147c6b..9564761fc6 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -19,10 +19,9 @@ use common_error::prelude::*; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Connect failed to {}, source: {}", url, source))] - ConnectFailed { - url: String, - source: tonic::transport::Error, + #[snafu(display("Illegal Flight messages, reason: {}", reason))] + IllegalFlightMessages { + reason: String, backtrace: Backtrace, }, @@ -87,7 +86,7 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::ConnectFailed { .. } + Error::IllegalFlightMessages { .. } | Error::MissingResult { .. } | Error::MissingHeader { .. } | Error::TonicStatus { .. } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 3a64c6b962..bc93954d22 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -21,5 +21,5 @@ pub mod load_balance; pub use api; pub use self::client::Client; -pub use self::database::{Database, ObjectResult}; +pub use self::database::{Database, RpcOutput}; pub use self::error::{Error, Result}; diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index 169574b444..23755fec26 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::column::{SemanticType, Values}; -use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr}; +use api::v1::{ + AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateTableExpr, + InsertRequest as GrpcInsertRequest, +}; use common_base::BitVec; +use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_time::timestamp::Timestamp; use common_time::{Date, DateTime}; use datatypes::data_type::{ConcreteDataType, DataType}; @@ -30,7 +32,6 @@ use datatypes::vectors::MutableVector; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; -use table::Table; use crate::error::{ ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, @@ -280,50 +281,43 @@ pub fn build_create_expr_from_insertion( Ok(expr) } -pub fn insertion_expr_to_request( - catalog_name: &str, - schema_name: &str, - table_name: &str, - insert_batches: Vec<(Vec, u32)>, - table: Arc, +pub fn to_table_insert_request( + request: GrpcInsertRequest, + schema: SchemaRef, ) -> Result { - let schema = table.schema(); - let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len()); + let catalog_name = DEFAULT_CATALOG_NAME; + let schema_name = &request.schema_name; + let table_name = &request.table_name; + let row_count = request.row_count as usize; - for (columns, row_count) in insert_batches { - for Column { - column_name, - values, - null_mask, - .. - } in columns - { - let values = match values { - Some(vals) => vals, - None => continue, - }; + let mut columns_values = HashMap::with_capacity(request.columns.len()); + for Column { + column_name, + values, + null_mask, + .. + } in request.columns + { + let Some(values) = values else { continue }; - let column = column_name.clone(); - let vector_builder = match columns_builders.entry(column) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(entry) => { - let column_schema = schema.column_schema_by_name(&column_name).context( - ColumnNotFoundSnafu { - column_name: &column_name, - table_name, - }, - )?; - let data_type = &column_schema.data_type; - entry.insert(data_type.create_mutable_vector(row_count as usize)) - } - }; - add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?; - } + let vector_builder = &mut schema + .column_schema_by_name(&column_name) + .context(ColumnNotFoundSnafu { + column_name: &column_name, + table_name, + })? + .data_type + .create_mutable_vector(row_count); + + add_values_to_builder(vector_builder, values, row_count, null_mask)?; + + ensure!( + columns_values + .insert(column_name, vector_builder.to_vector()) + .is_none(), + IllegalInsertDataSnafu + ); } - let columns_values = columns_builders - .into_iter() - .map(|(column_name, mut vector_builder)| (column_name, vector_builder.to_vector())) - .collect(); Ok(InsertRequest { catalog_name: catalog_name.to_string(), @@ -479,10 +473,7 @@ mod tests { use table::metadata::TableInfoRef; use table::Table; - use super::{ - build_create_expr_from_insertion, convert_values, insertion_expr_to_request, is_null, - TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE, - }; + use super::*; use crate::error; use crate::error::ColumnDataTypeSnafu; use crate::insert::find_new_columns; @@ -628,12 +619,18 @@ mod tests { } #[test] - fn test_insertion_expr_to_request() { + fn test_to_table_insert_request() { let table: Arc = Arc::new(DemoTable {}); - let insert_batches = vec![mock_insert_batch()]; - let insert_req = - insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap(); + let (columns, row_count) = mock_insert_batch(); + let request = GrpcInsertRequest { + schema_name: "public".to_string(), + table_name: "demo".to_string(), + columns, + row_count, + region_number: 0, + }; + let insert_req = to_table_insert_request(request, table.schema()).unwrap(); assert_eq!("greptime", insert_req.catalog_name); assert_eq!("public", insert_req.schema_name); diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index f296966bb3..ab03627b25 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -14,10 +14,9 @@ mod alter; pub mod error; -mod insert; +pub mod insert; pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; pub use insert::{ - build_alter_table_request, build_create_expr_from_insertion, column_to_vector, - find_new_columns, insertion_expr_to_request, + build_alter_table_request, build_create_expr_from_insertion, column_to_vector, find_new_columns, }; diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index 09fb4b27bc..a9434ed32e 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -16,6 +16,7 @@ common-runtime = { path = "../runtime" } dashmap = "5.4" datafusion.workspace = true datatypes = { path = "../../datatypes" } +flatbuffers = "22" futures = "0.3" prost = "0.11" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index d08e53ba9e..76b274dc04 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -18,13 +18,15 @@ use std::sync::Arc; use api::result::ObjectResultBuilder; use api::v1::{FlightDataExt, ObjectResult}; -use arrow_flight::utils::flight_data_to_arrow_batch; -use arrow_flight::FlightData; +use arrow_flight::utils::{flight_data_from_arrow_batch, flight_data_to_arrow_batch}; +use arrow_flight::{FlightData, IpcMessage, SchemaAsIpc}; use common_error::prelude::StatusCode; use common_recordbatch::{RecordBatch, RecordBatches}; +use datatypes::arrow; use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::arrow::ipc::{root_as_message, MessageHeader}; +use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader}; use datatypes::schema::{Schema, SchemaRef}; +use flatbuffers::FlatBufferBuilder; use futures::TryStreamExt; use prost::Message; use snafu::{OptionExt, ResultExt}; @@ -46,6 +48,43 @@ pub enum FlightMessage { AffectedRows(usize), } +#[derive(Default)] +pub struct FlightEncoder { + write_options: writer::IpcWriteOptions, +} + +impl FlightEncoder { + pub fn encode(&self, flight_message: FlightMessage) -> FlightData { + match flight_message { + FlightMessage::Schema(schema) => { + SchemaAsIpc::new(schema.arrow_schema(), &self.write_options).into() + } + FlightMessage::Recordbatch(recordbatch) => { + let (flight_dictionaries, flight_batch) = flight_data_from_arrow_batch( + recordbatch.df_record_batch(), + &self.write_options, + ); + + // TODO(LFC): Handle dictionary as FlightData here, when we supported Arrow's Dictionary DataType. + // Currently we don't have a datatype corresponding to Arrow's Dictionary DataType, + // so there won't be any "dictionaries" here. Assert to be sure about it, and + // perform a "testing guard" in case we forgot to handle the possible "dictionaries" + // here in the future. + debug_assert_eq!(flight_dictionaries.len(), 0); + + flight_batch + } + FlightMessage::AffectedRows(rows) => { + let ext_data = FlightDataExt { + affected_rows: rows as _, + } + .encode_to_vec(); + FlightData::new(None, IpcMessage(build_none_flight_msg()), vec![], ext_data) + } + } + } +} + #[derive(Default)] pub struct FlightDecoder { schema: Option, @@ -115,16 +154,10 @@ pub async fn flight_data_to_object_result( let stream = response.into_inner(); let result: TonicResult> = stream.try_collect().await; match result { - Ok(flight_data) => { - let flight_data = flight_data - .into_iter() - .map(|x| x.encode_to_vec()) - .collect::>>(); - Ok(ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .flight_data(flight_data) - .build()) - } + Ok(flight_data) => Ok(ObjectResultBuilder::new() + .status_code(StatusCode::Success as u32) + .flight_data(flight_data) + .build()), Err(e) => Ok(ObjectResultBuilder::new() .status_code(StatusCode::Internal as _) .err_msg(e.to_string()) @@ -177,6 +210,20 @@ pub fn flight_messages_to_recordbatches(messages: Vec) -> Result< } } +fn build_none_flight_msg() -> Vec { + let mut builder = FlatBufferBuilder::new(); + + let mut message = arrow::ipc::MessageBuilder::new(&mut builder); + message.add_version(arrow::ipc::MetadataVersion::V5); + message.add_header_type(MessageHeader::NONE); + message.add_bodyLength(0); + + let data = message.finish(); + builder.finish(data, None); + + builder.finished_data().to_vec() +} + #[cfg(test)] mod test { use arrow_flight::utils::batches_to_flight_data; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 9d4ae86113..52790a1df5 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -29,7 +29,6 @@ common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datafusion.workspace = true datatypes = { path = "../datatypes" } -flatbuffers = "22" futures = "0.3" hyper = { version = "0.14", features = ["full"] } log-store = { path = "../log-store" } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index afe83c782d..148c5913c6 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -267,9 +267,6 @@ pub enum Error { source: common_grpc_expr::error::Error, }, - #[snafu(display("Insert batch is empty"))] - EmptyInsertBatch, - #[snafu(display( "Table id provider not found, cannot execute SQL directly on datanode in distributed mode" ))] @@ -384,7 +381,6 @@ impl ErrorExt for Error { Error::OpenStorageEngine { source } => source.status_code(), Error::RuntimeResource { .. } => StatusCode::RuntimeResourcesExhausted, Error::MetaClientInit { source, .. } => source.status_code(), - Error::EmptyInsertBatch => StatusCode::InvalidArguments, Error::TableIdProviderNotFound { .. } => StatusCode::Unsupported, Error::BumpTableId { source, .. } => source.status_code(), Error::MissingNodeId { .. } => StatusCode::InvalidArguments, diff --git a/src/datanode/src/instance/flight.rs b/src/datanode/src/instance/flight.rs index 27eb13d7d9..c1703b06b4 100644 --- a/src/datanode/src/instance/flight.rs +++ b/src/datanode/src/instance/flight.rs @@ -16,25 +16,28 @@ mod stream; use std::pin::Pin; -use api::v1::object_expr::Expr; +use api::v1::object_expr::Request as GrpcRequest; use api::v1::query_request::Query; -use api::v1::{FlightDataExt, ObjectExpr}; +use api::v1::{InsertRequest, ObjectExpr}; use arrow_flight::flight_service_server::FlightService; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, - HandshakeRequest, HandshakeResponse, IpcMessage, PutResult, SchemaResult, Ticket, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, }; use async_trait::async_trait; +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::Output; -use datatypes::arrow; -use flatbuffers::FlatBufferBuilder; use futures::Stream; use prost::Message; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response, Streaming}; -use crate::error::{self, Result}; +use crate::error::{ + CatalogSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, InvalidFlightTicketSnafu, + MissingRequiredFieldSnafu, Result, TableNotFoundSnafu, +}; use crate::instance::flight::stream::FlightRecordBatchStream; use crate::instance::Instance; @@ -77,21 +80,21 @@ impl FlightService for Instance { async fn do_get(&self, request: Request) -> TonicResult> { let ticket = request.into_inner().ticket; - let expr = ObjectExpr::decode(ticket.as_slice()) - .context(error::InvalidFlightTicketSnafu)? - .expr - .context(error::MissingRequiredFieldSnafu { name: "expr" })?; - match expr { - Expr::Query(query_request) => { + let request = ObjectExpr::decode(ticket.as_slice()) + .context(InvalidFlightTicketSnafu)? + .request + .context(MissingRequiredFieldSnafu { name: "request" })?; + let output = match request { + GrpcRequest::Query(query_request) => { let query = query_request .query - .context(error::MissingRequiredFieldSnafu { name: "expr" })?; - let stream = self.handle_query(query).await?; - Ok(Response::new(stream)) + .context(MissingRequiredFieldSnafu { name: "query" })?; + self.handle_query(query).await? } - // TODO(LFC): Implement Insertion Flight interface. - Expr::Insert(_) => Err(tonic::Status::unimplemented("Not yet implemented")), - } + GrpcRequest::Insert(request) => self.handle_insert(request).await?, + }; + let stream = to_flight_data_stream(output); + Ok(Response::new(stream)) } type DoPutStream = TonicStream; @@ -132,56 +135,61 @@ impl FlightService for Instance { } impl Instance { - async fn handle_query(&self, query: Query) -> Result> { - let output = match query { + async fn handle_query(&self, query: Query) -> Result { + Ok(match query { Query::Sql(sql) => { let stmt = self .query_engine .sql_to_statement(&sql) - .context(error::ExecuteSqlSnafu)?; + .context(ExecuteSqlSnafu)?; self.execute_stmt(stmt, QueryContext::arc()).await? } Query::LogicalPlan(plan) => self.execute_logical(plan).await?, - }; - Ok(match output { - Output::Stream(stream) => { - let stream = FlightRecordBatchStream::new(stream); - Box::pin(stream) as _ - } - Output::RecordBatches(x) => { - let stream = FlightRecordBatchStream::new(x.as_stream()); - Box::pin(stream) as _ - } - Output::AffectedRows(rows) => { - let stream = async_stream::stream! { - let ext_data = FlightDataExt { - affected_rows: rows as _, - }.encode_to_vec(); - yield Ok(FlightData::new(None, IpcMessage(build_none_flight_msg()), vec![], ext_data)) - }; - Box::pin(stream) as _ - } }) } + + pub async fn handle_insert(&self, request: InsertRequest) -> Result { + let table_name = &request.table_name.clone(); + // TODO(LFC): InsertRequest should carry catalog name, too. + let table = self + .catalog_manager + .table(DEFAULT_CATALOG_NAME, &request.schema_name, table_name) + .context(CatalogSnafu)? + .context(TableNotFoundSnafu { table_name })?; + + let request = common_grpc_expr::insert::to_table_insert_request(request, table.schema()) + .context(InsertDataSnafu)?; + + let affected_rows = table + .insert(request) + .await + .context(InsertSnafu { table_name })?; + Ok(Output::AffectedRows(affected_rows)) + } } -fn build_none_flight_msg() -> Vec { - let mut builder = FlatBufferBuilder::new(); - - let mut message = arrow::ipc::MessageBuilder::new(&mut builder); - message.add_version(arrow::ipc::MetadataVersion::V5); - message.add_header_type(arrow::ipc::MessageHeader::NONE); - message.add_bodyLength(0); - - let data = message.finish(); - builder.finish(data, None); - - builder.finished_data().to_vec() +fn to_flight_data_stream(output: Output) -> TonicStream { + match output { + Output::Stream(stream) => { + let stream = FlightRecordBatchStream::new(stream); + Box::pin(stream) as _ + } + Output::RecordBatches(x) => { + let stream = FlightRecordBatchStream::new(x.as_stream()); + Box::pin(stream) as _ + } + Output::AffectedRows(rows) => { + let stream = tokio_stream::once(Ok( + FlightEncoder::default().encode(FlightMessage::AffectedRows(rows)) + )); + Box::pin(stream) as _ + } + } } #[cfg(test)] mod test { - use api::v1::{object_result, FlightDataRaw, QueryRequest}; + use api::v1::QueryRequest; use common_grpc::flight; use common_grpc::flight::FlightMessage; use datatypes::prelude::*; @@ -201,8 +209,7 @@ mod test { let ticket = Request::new(Ticket { ticket: ObjectExpr { - header: None, - expr: Some(Expr::Query(QueryRequest { + request: Some(GrpcRequest::Query(QueryRequest { query: Some(Query::Sql( "INSERT INTO demo(host, cpu, memory, ts) VALUES \ ('host1', 66.6, 1024, 1672201025000),\ @@ -218,10 +225,7 @@ mod test { let result = flight::flight_data_to_object_result(response) .await .unwrap(); - let result = result.result.unwrap(); - assert!(matches!(result, object_result::Result::FlightData(_))); - - let object_result::Result::FlightData(FlightDataRaw { raw_data }) = result else { unreachable!() }; + let raw_data = result.flight_data; let mut messages = flight::raw_flight_data_to_message(raw_data).unwrap(); assert_eq!(messages.len(), 1); @@ -232,8 +236,7 @@ mod test { let ticket = Request::new(Ticket { ticket: ObjectExpr { - header: None, - expr: Some(Expr::Query(QueryRequest { + request: Some(GrpcRequest::Query(QueryRequest { query: Some(Query::Sql( "SELECT ts, host, cpu, memory FROM demo".to_string(), )), @@ -246,10 +249,7 @@ mod test { let result = flight::flight_data_to_object_result(response) .await .unwrap(); - let result = result.result.unwrap(); - assert!(matches!(result, object_result::Result::FlightData(_))); - - let object_result::Result::FlightData(FlightDataRaw { raw_data }) = result else { unreachable!() }; + let raw_data = result.flight_data; let messages = flight::raw_flight_data_to_message(raw_data).unwrap(); assert_eq!(messages.len(), 2); diff --git a/src/datanode/src/instance/flight/stream.rs b/src/datanode/src/instance/flight/stream.rs index 0cb3b35da4..5e86ca1d2c 100644 --- a/src/datanode/src/instance/flight/stream.rs +++ b/src/datanode/src/instance/flight/stream.rs @@ -15,11 +15,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use arrow_flight::utils::flight_data_from_arrow_batch; -use arrow_flight::{FlightData, SchemaAsIpc}; +use arrow_flight::FlightData; +use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::warn; -use datatypes::arrow; use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; @@ -33,14 +32,15 @@ use crate::instance::flight::TonicResult; #[pin_project(PinnedDrop)] pub(super) struct FlightRecordBatchStream { #[pin] - rx: mpsc::Receiver>, + rx: mpsc::Receiver>, join_handle: JoinHandle<()>, done: bool, + encoder: FlightEncoder, } impl FlightRecordBatchStream { pub(super) fn new(recordbatches: SendableRecordBatchStream) -> Self { - let (tx, rx) = mpsc::channel::>(1); + let (tx, rx) = mpsc::channel::>(1); let join_handle = common_runtime::spawn_read( async move { Self::flight_data_stream(recordbatches, tx).await }, @@ -49,36 +49,24 @@ impl FlightRecordBatchStream { rx, join_handle, done: false, + encoder: FlightEncoder::default(), } } async fn flight_data_stream( mut recordbatches: SendableRecordBatchStream, - mut tx: Sender>, + mut tx: Sender>, ) { let schema = recordbatches.schema(); - let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema_flight_data: FlightData = - SchemaAsIpc::new(schema.arrow_schema(), &options).into(); - if let Err(e) = tx.send(Ok(schema_flight_data)).await { + if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await { warn!("stop sending Flight data, err: {e}"); return; } while let Some(batch_or_err) = recordbatches.next().await { match batch_or_err { - Ok(batch) => { - let (flight_dictionaries, flight_batch) = - flight_data_from_arrow_batch(batch.df_record_batch(), &options); - - // TODO(LFC): Handle dictionary as FlightData here, when we supported Arrow's Dictionary DataType. - // Currently we don't have a datatype corresponding to Arrow's Dictionary DataType, - // so there won't be any "dictionaries" here. Assert to be sure about it, and - // perform a "testing guard" in case we forgot to handle the possible "dictionaries" - // here in the future. - debug_assert_eq!(flight_dictionaries.len(), 0); - - if let Err(e) = tx.send(Ok(flight_batch)).await { + Ok(recordbatch) => { + if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await { warn!("stop sending Flight data, err: {e}"); return; } @@ -115,11 +103,17 @@ impl Stream for FlightRecordBatchStream { *this.done = true; Poll::Ready(None) } - e @ Poll::Ready(Some(Err(_))) => { - *this.done = true; - e - } - other => other, + Poll::Ready(Some(result)) => match result { + Ok(flight_message) => { + let flight_data = this.encoder.encode(flight_message); + Poll::Ready(Some(Ok(flight_data))) + } + Err(e) => { + *this.done = true; + Poll::Ready(Some(Err(e))) + } + }, + Poll::Pending => Poll::Pending, } } } diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 2a52a07e85..8932b8776f 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -12,20 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; -use api::v1::{ - admin_expr, object_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr, ObjectExpr, - ObjectResult, QueryRequest, -}; +use api::result::AdminResultBuilder; +use api::v1::{admin_expr, AdminExpr, AdminResult, CreateDatabaseExpr, ObjectExpr, ObjectResult}; use arrow_flight::flight_service_server::FlightService; use arrow_flight::Ticket; use async_trait::async_trait; -use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::ext::ErrorExt; use common_error::prelude::BoxedError; use common_error::status_code::StatusCode; -use common_grpc::flight::flight_data_to_object_result; -use common_grpc_expr::insertion_expr_to_request; +use common_grpc::flight; use common_query::Output; use prost::Message; use query::plan::LogicalPlan; @@ -36,88 +31,16 @@ use table::requests::CreateDatabaseRequest; use tonic::Request; use crate::error::{ - self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, EmptyInsertBatchSnafu, - ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, + DecodeLogicalPlanSnafu, ExecuteSqlSnafu, FlightGetSnafu, InvalidFlightDataSnafu, Result, }; use crate::instance::Instance; impl Instance { - pub async fn execute_grpc_insert( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - insert_batches: Vec<(Vec, u32)>, - ) -> Result { - let schema_provider = self - .catalog_manager - .catalog(catalog_name) - .context(CatalogSnafu)? - .context(CatalogNotFoundSnafu { name: catalog_name })? - .schema(schema_name) - .context(CatalogSnafu)? - .context(SchemaNotFoundSnafu { name: schema_name })?; - - ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu); - let table = schema_provider - .table(table_name) - .context(CatalogSnafu)? - .context(TableNotFoundSnafu { table_name })?; - - let insert = insertion_expr_to_request( - catalog_name, - schema_name, - table_name, - insert_batches, - table.clone(), - ) - .context(InsertDataSnafu)?; - - let affected_rows = table - .insert(insert) + async fn boarding(&self, ticket: Request) -> Result { + let response = self.do_get(ticket).await.context(FlightGetSnafu)?; + flight::flight_data_to_object_result(response) .await - .context(InsertSnafu { table_name })?; - - Ok(Output::AffectedRows(affected_rows)) - } - - async fn handle_insert( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - insert_batches: Vec<(Vec, u32)>, - ) -> ObjectResult { - match self - .execute_grpc_insert(catalog_name, schema_name, table_name, insert_batches) - .await - { - Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() - .status_code(StatusCode::Success as u32) - .mutate_result(rows as u32, 0) - .build(), - Err(err) => { - common_telemetry::error!(err; "Failed to handle insert, catalog name: {}, schema name: {}, table name: {}", catalog_name, schema_name, table_name); - // TODO(fys): failure count - build_err_result(&err) - } - _ => unreachable!(), - } - } - - async fn handle_query_request(&self, query_request: QueryRequest) -> Result { - let ticket = Request::new(Ticket { - ticket: ObjectExpr { - header: None, - expr: Some(object_expr::Expr::Query(query_request)), - } - .encode_to_vec(), - }); - // TODO(LFC): Temporarily use old GRPC interface here, will make it been replaced. - let response = self.do_get(ticket).await.context(error::FlightGetSnafu)?; - flight_data_to_object_result(response) - .await - .context(error::InvalidFlightDataSnafu) + .context(InvalidFlightDataSnafu) } async fn execute_create_database( @@ -156,34 +79,16 @@ impl Instance { #[async_trait] impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> servers::error::Result { - let object_resp = match query.expr { - Some(object_expr::Expr::Insert(insert_expr)) => { - let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = &insert_expr.schema_name; - let table_name = &insert_expr.table_name; - - // TODO(fys): _region_number is for later use. - let _region_number: u32 = insert_expr.region_number; - - let insert_batches = vec![(insert_expr.columns, insert_expr.row_count)]; - self.handle_insert(catalog_name, schema_name, table_name, insert_batches) - .await - } - Some(object_expr::Expr::Query(query_request)) => self - .handle_query_request(query_request.clone()) - .await - .map_err(BoxedError::new) - .context(servers::error::ExecuteQuerySnafu { - query: format!("{query_request:?}"), - })?, - other => { - return servers::error::NotSupportedSnafu { - feat: format!("{other:?}"), - } - .fail(); - } - }; - Ok(object_resp) + let ticket = Request::new(Ticket { + ticket: query.encode_to_vec(), + }); + // TODO(LFC): Temporarily use old GRPC interface here, will get rid of them near the end of Arrow Flight adoption. + self.boarding(ticket) + .await + .map_err(BoxedError::new) + .with_context(|_| servers::error::ExecuteQuerySnafu { + query: format!("{query:?}"), + }) } } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index e970a07058..73d586bb27 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -442,12 +442,6 @@ pub enum Error { #[snafu(backtrace)] source: servers::error::Error, }, - - #[snafu(display("Failed to convert Flight Message, source: {}", source))] - ConvertFlightMessage { - #[snafu(backtrace)] - source: common_grpc::error::Error, - }, } pub type Result = std::result::Result; @@ -537,7 +531,6 @@ impl ErrorExt for Error { Error::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, Error::EncodeSubstraitLogicalPlan { source } => source.status_code(), Error::BuildVector { source, .. } => source.status_code(), - Error::ConvertFlightMessage { source } => source.status_code(), } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 7b01890b76..2b4bff55c9 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -22,20 +22,21 @@ use std::time::Duration; use api::result::{ObjectResultBuilder, PROTOCOL_VERSION}; use api::v1::alter_expr::Kind; -use api::v1::object_expr::Expr; +use api::v1::object_expr::Request; use api::v1::{ admin_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr, - CreateTableExpr, DropTableExpr, ExprHeader, InsertExpr, ObjectExpr, + CreateTableExpr, DropTableExpr, ExprHeader, InsertRequest, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; use catalog::remote::MetaKvBackend; use catalog::{CatalogManagerRef, CatalogProviderRef, SchemaProviderRef}; use client::admin::admin_result_to_output; -use client::ObjectResult; +use client::RpcOutput; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::{debug, info}; @@ -245,10 +246,10 @@ impl Instance { } /// Handle batch inserts - pub async fn handle_inserts(&self, insert_expr: Vec) -> Result { + pub async fn handle_inserts(&self, requests: Vec) -> Result { let mut success = 0; - for expr in insert_expr { - match self.handle_insert(expr).await? { + for request in requests { + match self.handle_insert(request).await? { Output::AffectedRows(rows) => success += rows, _ => unreachable!("Insert should not yield output other than AffectedRows"), } @@ -256,30 +257,26 @@ impl Instance { Ok(Output::AffectedRows(success)) } - /// Handle insert. for 'values' insertion, create/alter the destination table on demand. - async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result { - let table_name = &insert_expr.table_name; + // TODO(LFC): Revisit GRPC insertion feature, check if the "create/alter table on demand" functionality is broken. + // Should be supplied with enough tests. + async fn handle_insert(&self, request: InsertRequest) -> Result { + let schema_name = &request.schema_name; + let table_name = &request.table_name; let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = &insert_expr.schema_name; - let columns = &insert_expr.columns; + let columns = &request.columns; self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns) .await?; - insert_expr.region_number = 0; - let query = ObjectExpr { - header: Some(ExprHeader { - version: PROTOCOL_VERSION, - }), - expr: Some(Expr::Insert(insert_expr)), + request: Some(Request::Insert(request)), }; let result = GrpcQueryHandler::do_query(&*self.grpc_query_handler, query) .await .context(error::InvokeGrpcServerSnafu)?; - let result: ObjectResult = result.try_into().context(InsertSnafu)?; - result.try_into().context(InsertSnafu) + let result: RpcOutput = result.try_into().context(InsertSnafu)?; + Ok(result.into()) } // check if table already exist: @@ -514,14 +511,14 @@ impl Instance { .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query })?; - let expr = InsertExpr { + let request = InsertRequest { schema_name, table_name, region_number: 0, columns, row_count, }; - self.handle_insert(expr).await + self.handle_insert(request).await } Mode::Distributed => { let affected = self @@ -672,24 +669,26 @@ impl ScriptHandler for Instance { #[async_trait] impl GrpcQueryHandler for Instance { async fn do_query(&self, query: ObjectExpr) -> server_error::Result { - let expr = query + let request = query .clone() - .expr + .request .context(server_error::InvalidQuerySnafu { reason: "empty expr", })?; - match expr { - Expr::Insert(insert_expr) => { + match request { + Request::Insert(request) => { let output = self - .handle_insert(insert_expr.clone()) + .handle_insert(request.clone()) .await .map_err(BoxedError::new) .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{insert_expr:?}"), + query: format!("{request:?}"), })?; let object_result = match output { Output::AffectedRows(rows) => ObjectResultBuilder::default() - .mutate_result(rows as _, 0) + .flight_data(vec![ + FlightEncoder::default().encode(FlightMessage::AffectedRows(rows)) + ]) .build(), _ => unreachable!(), }; @@ -721,9 +720,8 @@ mod tests { use api::v1::column::SemanticType; use api::v1::{ - admin_expr, admin_result, column, object_expr, object_result, query_request, Column, - ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, FlightDataRaw, MutateResult, - QueryRequest, + admin_expr, admin_result, column, query_request, Column, ColumnDataType, + ColumnDef as GrpcColumnDef, ExprHeader, MutateResult, QueryRequest, }; use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; use common_recordbatch::RecordBatch; @@ -905,7 +903,7 @@ mod tests { expected_ts_col.clone(), ]; let row_count = 4; - let insert_expr = InsertExpr { + let request = InsertRequest { schema_name: "public".to_string(), table_name: "demo".to_string(), region_number: 0, @@ -913,94 +911,84 @@ mod tests { row_count, }; let object_expr = ObjectExpr { - header: Some(ExprHeader::default()), - expr: Some(object_expr::Expr::Insert(insert_expr)), + request: Some(Request::Insert(request)), }; let result = GrpcQueryHandler::do_query(&*instance, object_expr) .await .unwrap(); - assert_matches!( - result.result, - Some(object_result::Result::Mutate(MutateResult { - success: 4, - failure: 0 - })) - ); + let raw_data = result.flight_data; + let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap(); + assert_eq!(flight_messages.len(), 1); + let message = flight_messages.remove(0); + assert!(matches!(message, FlightMessage::AffectedRows(4))); // select let object_expr = ObjectExpr { - header: Some(ExprHeader::default()), - expr: Some(Expr::Query(QueryRequest { + request: Some(Request::Query(QueryRequest { query: Some(query_request::Query::Sql("select * from demo".to_string())), })), }; let result = GrpcQueryHandler::do_query(&*instance, object_expr) .await .unwrap(); - match result.result { - Some(object_result::Result::FlightData(FlightDataRaw { raw_data })) => { - let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap(); - assert_eq!(flight_messages.len(), 2); + let raw_data = result.flight_data; + let mut flight_messages = raw_flight_data_to_message(raw_data).unwrap(); + assert_eq!(flight_messages.len(), 2); - let expected_schema = Arc::new(Schema::new(vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("disk_util", ConcreteDataType::float64_datatype(), true) - .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from( - 9.9f64, - )))) - .unwrap(), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ])); - match flight_messages.remove(0) { - FlightMessage::Schema(schema) => { - assert_eq!(schema, expected_schema); - } - _ => unreachable!(), - } + let expected_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("disk_util", ConcreteDataType::float64_datatype(), true) + .with_default_constraint(Some(ColumnDefaultConstraint::Value(Value::from(9.9f64)))) + .unwrap(), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true), + ])); + match flight_messages.remove(0) { + FlightMessage::Schema(schema) => { + assert_eq!(schema, expected_schema); + } + _ => unreachable!(), + } - match flight_messages.remove(0) { - FlightMessage::Recordbatch(recordbatch) => { - let expect_recordbatch = RecordBatch::new( - expected_schema, - vec![ - Arc::new(StringVector::from(vec![ - "fe.host.a", - "fe.host.b", - "fe.host.c", - "fe.host.d", - ])) as _, - Arc::new(Float64Vector::from(vec![ - Some(1.0f64), - None, - Some(3.0f64), - Some(4.0f64), - ])) as _, - Arc::new(Float64Vector::from(vec![ - Some(100f64), - Some(200f64), - None, - Some(400f64), - ])) as _, - Arc::new(Float64Vector::from_vec( - iter::repeat(9.9f64).take(4).collect(), - )) as _, - Arc::new(TimestampMillisecondVector::from_vec(vec![ - 1000i64, 2000, 3000, 4000, - ])) as _, - ], - ) - .unwrap(); - assert_eq!(recordbatch, expect_recordbatch); - } - _ => unreachable!(), - } + match flight_messages.remove(0) { + FlightMessage::Recordbatch(recordbatch) => { + let expect_recordbatch = RecordBatch::new( + expected_schema, + vec![ + Arc::new(StringVector::from(vec![ + "fe.host.a", + "fe.host.b", + "fe.host.c", + "fe.host.d", + ])) as _, + Arc::new(Float64Vector::from(vec![ + Some(1.0f64), + None, + Some(3.0f64), + Some(4.0f64), + ])) as _, + Arc::new(Float64Vector::from(vec![ + Some(100f64), + Some(200f64), + None, + Some(400f64), + ])) as _, + Arc::new(Float64Vector::from_vec( + iter::repeat(9.9f64).take(4).collect(), + )) as _, + Arc::new(TimestampMillisecondVector::from_vec(vec![ + 1000i64, 2000, 3000, 4000, + ])) as _, + ], + ) + .unwrap(); + assert_eq!(recordbatch, expect_recordbatch); } _ => unreachable!(), } diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index aa25cffb8f..014b131ffc 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::{Column, InsertExpr}; +use api::v1::{Column, InsertRequest as GrpcInsertRequest}; use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; @@ -34,8 +34,7 @@ impl InfluxdbLineProtocolHandler for Instance { async fn exec(&self, request: &InfluxdbRequest) -> servers::error::Result<()> { match self.mode { Mode::Standalone => { - let exprs: Vec = request.try_into()?; - self.handle_inserts(exprs) + self.handle_inserts(request.try_into()?) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { @@ -57,7 +56,7 @@ impl InfluxdbLineProtocolHandler for Instance { } impl Instance { - pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { + pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { let mut joins = Vec::with_capacity(inserts.len()); let catalog_name = DEFAULT_CATALOG_NAME; diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index e2fa79a59c..52d72d1b3d 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -14,12 +14,11 @@ use api::prometheus::remote::read_request::ResponseType; use api::prometheus::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; -use api::v1::object_expr::Expr; +use api::v1::object_expr::Request; use api::v1::{query_request, ObjectExpr, QueryRequest}; use async_trait::async_trait; -use client::ObjectResult; +use client::RpcOutput; use common_error::prelude::BoxedError; -use common_grpc::flight; use common_telemetry::logging; use prost::Message; use servers::error::{self, Result as ServerResult}; @@ -61,13 +60,8 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult ServerResult { - let ObjectResult::FlightData(flight_messages) = object_result else { unreachable!() }; - let recordbatches = flight::flight_messages_to_recordbatches(flight_messages) - .context(error::ConvertFlightMessageSnafu)?; +fn to_query_result(table_name: &str, object_result: RpcOutput) -> ServerResult { + let RpcOutput::RecordBatches(recordbatches) = object_result else { unreachable!() }; Ok(QueryResult { timeseries: prometheus::recordbatches_to_timeseries(table_name, recordbatches)?, }) @@ -78,7 +72,7 @@ impl Instance { &self, db: &str, queries: &[Query], - ) -> ServerResult> { + ) -> ServerResult> { let mut results = Vec::with_capacity(queries.len()); for query in queries { @@ -90,8 +84,7 @@ impl Instance { ); let query = ObjectExpr { - header: None, - expr: Some(Expr::Query(QueryRequest { + request: Some(Request::Query(QueryRequest { query: Some(query_request::Query::Sql(sql.to_string())), })), }; @@ -112,10 +105,10 @@ impl Instance { #[async_trait] impl PrometheusProtocolHandler for Instance { async fn write(&self, database: &str, request: WriteRequest) -> ServerResult<()> { - let exprs = prometheus::write_request_to_insert_exprs(database, request.clone())?; + let requests = prometheus::to_grpc_insert_requests(database, request.clone())?; match self.mode { Mode::Standalone => { - self.handle_inserts(exprs) + self.handle_inserts(requests) .await .map_err(BoxedError::new) .with_context(|_| error::ExecuteInsertSnafu { @@ -123,7 +116,7 @@ impl PrometheusProtocolHandler for Instance { })?; } Mode::Distributed => { - self.dist_insert(exprs) + self.dist_insert(requests) .await .map_err(BoxedError::new) .with_context(|_| error::ExecuteInsertSnafu { @@ -146,9 +139,7 @@ impl PrometheusProtocolHandler for Instance { ResponseType::Samples => { let query_results = results .into_iter() - .map(|(table_name, object_result)| { - object_result_to_query_result(&table_name, object_result) - }) + .map(|(table_name, object_result)| to_query_result(&table_name, object_result)) .collect::>>()?; let response = ReadResponse { diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index ac4eb2a42f..7d63d7e47b 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use api::v1::AlterExpr; use async_trait::async_trait; use client::admin::Admin; -use client::Database; +use client::{Database, RpcOutput}; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; @@ -88,11 +88,10 @@ impl Table for DistTable { let spliter = WriteSpliter::with_partition_rule(partition_rule); let inserts = spliter.split(request).map_err(TableError::new)?; - let result = match self.dist_insert(inserts).await.map_err(TableError::new)? { - client::ObjectResult::Mutate(result) => result, - _ => unreachable!(), - }; - Ok(result.success as usize) + + let output = self.dist_insert(inserts).await.map_err(TableError::new)?; + let RpcOutput::AffectedRows(rows) = output else { unreachable!() }; + Ok(rows) } async fn scan( @@ -508,7 +507,7 @@ impl PartitionExec { #[cfg(test)] mod test { use api::v1::column::SemanticType; - use api::v1::{column, Column, ColumnDataType}; + use api::v1::{column, Column, ColumnDataType, InsertRequest}; use common_query::physical_plan::DfPhysicalPlanAdapter; use common_recordbatch::adapter::RecordBatchStreamAdapter; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -993,49 +992,45 @@ mod test { data: Vec, start_ts: i64, ) { - let rows = data.len() as u32; - let values = vec![( - vec![ - Column { - column_name: "ts".to_string(), - values: Some(column::Values { - i64_values: (start_ts..start_ts + rows as i64).collect::>(), - ..Default::default() - }), - datatype: ColumnDataType::Int64 as i32, - semantic_type: SemanticType::Timestamp as i32, + let row_count = data.len() as u32; + let columns = vec![ + Column { + column_name: "ts".to_string(), + values: Some(column::Values { + i64_values: (start_ts..start_ts + row_count as i64).collect::>(), ..Default::default() - }, - Column { - column_name: "a".to_string(), - values: Some(column::Values { - i32_values: data, - ..Default::default() - }), - datatype: ColumnDataType::Int32 as i32, + }), + datatype: ColumnDataType::Int64 as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + Column { + column_name: "a".to_string(), + values: Some(column::Values { + i32_values: data, ..Default::default() - }, - Column { - column_name: "row_id".to_string(), - values: Some(column::Values { - i32_values: (1..=rows as i32).collect::>(), - ..Default::default() - }), - datatype: ColumnDataType::Int32 as i32, + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + Column { + column_name: "row_id".to_string(), + values: Some(column::Values { + i32_values: (1..=row_count as i32).collect::>(), ..Default::default() - }, - ], - rows, - )]; - dn_instance - .execute_grpc_insert( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - values, - ) - .await - .unwrap(); + }), + datatype: ColumnDataType::Int32 as i32, + ..Default::default() + }, + ]; + let request = InsertRequest { + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + columns, + row_count, + region_number: 0, + }; + dn_instance.handle_insert(request).await.unwrap(); } #[tokio::test(flavor = "multi_thread")] diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 6473bec4fe..e5c02a8696 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::column::SemanticType; -use api::v1::{Column, InsertExpr, MutateResult}; -use client::{Database, ObjectResult}; +use api::v1::{Column, InsertRequest as GrpcInsertRequest}; +use client::{Database, RpcOutput}; use datatypes::prelude::ConcreteDataType; use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; @@ -33,7 +33,7 @@ impl DistTable { pub async fn dist_insert( &self, inserts: HashMap, - ) -> Result { + ) -> Result { let route = self.table_routes.get_route(&self.table_name).await?; let mut joins = Vec::with_capacity(inserts.len()); @@ -57,7 +57,7 @@ impl DistTable { // TODO(fys): a separate runtime should be used here. let join = tokio::spawn(async move { instance - .grpc_insert(to_insert_expr(region_id, insert)?) + .grpc_insert(to_grpc_insert_request(region_id, insert)?) .await .context(error::RequestDatanodeSnafu) }); @@ -66,19 +66,12 @@ impl DistTable { } let mut success = 0; - let mut failure = 0; - for join in joins { let object_result = join.await.context(error::JoinTaskSnafu)??; - let result = match object_result { - ObjectResult::Mutate(result) => result, - ObjectResult::FlightData(_) => unreachable!(), - }; - success += result.success; - failure += result.failure; + let RpcOutput::AffectedRows(rows) = object_result else { unreachable!() }; + success += rows; } - - Ok(ObjectResult::Mutate(MutateResult { success, failure })) + Ok(RpcOutput::AffectedRows(success)) } } @@ -130,10 +123,13 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec Result { +fn to_grpc_insert_request( + region_number: RegionNumber, + insert: InsertRequest, +) -> Result { let table_name = insert.table_name.clone(); let (columns, row_count) = insert_request_to_insert_batch(&insert)?; - Ok(InsertExpr { + Ok(GrpcInsertRequest { schema_name: insert.schema_name, table_name, region_number, @@ -146,21 +142,21 @@ fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result< mod tests { use std::collections::HashMap; - use api::v1::{ColumnDataType, InsertExpr}; + use api::v1::ColumnDataType; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ScalarVectorBuilder; use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; use table::requests::InsertRequest; - use super::to_insert_expr; + use super::*; #[test] - fn test_to_insert_expr() { + fn test_to_grpc_insert_request() { let insert_request = mock_insert_request(); - let insert_expr = to_insert_expr(12, insert_request).unwrap(); + let request = to_grpc_insert_request(12, insert_request).unwrap(); - verify_insert_expr(insert_expr); + verify_grpc_insert_request(request); } fn mock_insert_request() -> InsertRequest { @@ -186,11 +182,11 @@ mod tests { } } - fn verify_insert_expr(insert_expr: InsertExpr) { - let table_name = insert_expr.table_name; + fn verify_grpc_insert_request(request: GrpcInsertRequest) { + let table_name = request.table_name; assert_eq!("demo", table_name); - for column in insert_expr.columns { + for column in request.columns { let name = column.column_name; if name == "id" { assert_eq!(0, column.null_mask[0]); @@ -207,7 +203,7 @@ mod tests { } } - let region_number = insert_expr.region_number; + let region_number = request.region_number; assert_eq!(12, region_number); } } diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index 04cd05f831..c69478a8b9 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -15,9 +15,8 @@ use std::fmt::Formatter; use std::sync::Arc; -use api::v1::InsertExpr; -use client::{Database, ObjectResult}; -use common_grpc::flight::flight_messages_to_recordbatches; +use api::v1::InsertRequest; +use client::{Database, RpcOutput}; use common_query::prelude::Expr; use common_recordbatch::RecordBatches; use datafusion::datasource::DefaultTableSource; @@ -28,7 +27,7 @@ use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::table::adapter::DfTableProviderAdapter; use table::TableRef; -use crate::error::{self, ConvertFlightMessageSnafu, Result}; +use crate::error::{self, Result}; #[derive(Clone)] pub struct DatanodeInstance { @@ -47,7 +46,7 @@ impl DatanodeInstance { Self { table, db } } - pub(crate) async fn grpc_insert(&self, request: InsertExpr) -> client::Result { + pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result { self.db.insert(request).await } @@ -63,13 +62,7 @@ impl DatanodeInstance { .logical_plan(substrait_plan.to_vec()) .await .context(error::RequestDatanodeSnafu)?; - let recordbatches = match result { - ObjectResult::FlightData(flight_message) => { - flight_messages_to_recordbatches(flight_message) - .context(ConvertFlightMessageSnafu)? - } - _ => unreachable!(), - }; + let RpcOutput::RecordBatches(recordbatches) = result else { unreachable!() }; Ok(recordbatches) } diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 93da4c2c07..722bf65a94 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::InsertExpr; +use api::v1::InsertRequest as GrpcInsertRequest; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -86,7 +86,7 @@ impl TryFrom<&InfluxdbRequest> for Vec { } // TODO(fys): will remove in the future. -impl TryFrom<&InfluxdbRequest> for Vec { +impl TryFrom<&InfluxdbRequest> for Vec { type Error = Error; fn try_from(value: &InfluxdbRequest) -> Result { @@ -163,7 +163,7 @@ impl TryFrom<&InfluxdbRequest> for Vec { .into_iter() .map(|(table_name, writer)| { let (columns, row_count) = writer.finish(); - InsertExpr { + GrpcInsertRequest { schema_name: schema_name.clone(), table_name, region_number: 0, @@ -180,7 +180,7 @@ mod tests { use std::sync::Arc; use api::v1::column::{SemanticType, Values}; - use api::v1::{Column, ColumnDataType, InsertExpr}; + use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -188,6 +188,7 @@ mod tests { use datatypes::vectors::Vector; use table::requests::InsertRequest; + use super::*; use crate::influxdb::InfluxdbRequest; #[test] @@ -230,15 +231,14 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; lines: lines.to_string(), }; - let insert_exprs: Vec = influxdb_req.try_into().unwrap(); + let requests: Vec = influxdb_req.try_into().unwrap(); + assert_eq!(2, requests.len()); - assert_eq!(2, insert_exprs.len()); - - for expr in insert_exprs { - assert_eq!("public", expr.schema_name); - match &expr.table_name[..] { - "monitor1" => assert_monitor_1(&expr.columns), - "monitor2" => assert_monitor_2(&expr.columns), + for request in requests { + assert_eq!("public", request.schema_name); + match &request.table_name[..] { + "monitor1" => assert_monitor_1(&request.columns), + "monitor2" => assert_monitor_2(&request.columns), _ => panic!(), } } diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 6dc61f6784..a0b7a38b0d 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::column::SemanticType; -use api::v1::{column, Column, ColumnDataType, InsertExpr}; +use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; use table::requests::InsertRequest; @@ -144,8 +144,9 @@ impl DataPoint { line_writer.finish() } - // TODO(fys): will remove in the future. - pub fn as_grpc_insert(&self) -> InsertExpr { + // TODO(LFC): opentsdb and influxdb insertions should go through the Table trait directly. + // Currently: line protocol -> grpc request -> grpc interface -> table trait + pub fn as_grpc_insert(&self) -> GrpcInsertRequest { let schema_name = DEFAULT_SCHEMA_NAME.to_string(); let mut columns = Vec::with_capacity(2 + self.tags.len()); @@ -186,7 +187,7 @@ impl DataPoint { }); } - InsertExpr { + GrpcInsertRequest { schema_name, table_name: self.metric.clone(), region_number: 0, diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 2b46e581bd..c82019e1b6 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -20,7 +20,7 @@ use std::hash::{Hash, Hasher}; use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::column::SemanticType; -use api::v1::{column, Column, ColumnDataType, InsertExpr}; +use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; use common_grpc::writer::Precision::Millisecond; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; @@ -339,21 +339,20 @@ fn timeseries_to_insert_request(db: &str, mut timeseries: TimeSeries) -> Result< } // TODO(fys): it will remove in the future. -/// Cast a remote write request into gRPC's InsertExpr. -pub fn write_request_to_insert_exprs( +pub fn to_grpc_insert_requests( database: &str, mut request: WriteRequest, -) -> Result> { +) -> Result> { let timeseries = std::mem::take(&mut request.timeseries); timeseries .into_iter() - .map(|timeseries| timeseries_to_insert_expr(database, timeseries)) + .map(|timeseries| to_grpc_insert_request(database, timeseries)) .collect() } // TODO(fys): it will remove in the future. -fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Result { +fn to_grpc_insert_request(database: &str, mut timeseries: TimeSeries) -> Result { let schema_name = database.to_string(); // TODO(dennis): save exemplars into a column @@ -411,7 +410,7 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu }); } - Ok(InsertExpr { + Ok(GrpcInsertRequest { schema_name, table_name: table_name.context(error::InvalidPromRemoteRequestSnafu { msg: "missing '__name__' label in timeseries", @@ -666,7 +665,7 @@ mod tests { ..Default::default() }; - let exprs = write_request_to_insert_exprs("prometheus", write_request).unwrap(); + let exprs = to_grpc_insert_requests("prometheus", write_request).unwrap(); assert_eq!(3, exprs.len()); assert_eq!("prometheus", exprs[0].schema_name); assert_eq!("prometheus", exprs[1].schema_name); diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 3fffd76e6f..aaac972a09 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::InsertExpr; +use api::v1::InsertRequest; use async_trait::async_trait; use axum::{http, Router}; use axum_test_helper::TestClient; @@ -34,9 +34,9 @@ struct DummyInstance { #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: &InfluxdbRequest) -> Result<()> { - let exprs: Vec = request.try_into()?; + let requests: Vec = request.try_into()?; - for expr in exprs { + for expr in requests { let _ = self.tx.send((expr.schema_name, expr.table_name)).await; } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 7982706395..724d1b53ec 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -15,12 +15,11 @@ use api::v1::alter_expr::Kind; use api::v1::column::SemanticType; use api::v1::{ admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateTableExpr, InsertExpr, MutateResult, TableId, + CreateTableExpr, InsertRequest, MutateResult, TableId, }; use client::admin::Admin; -use client::{Client, Database, ObjectResult}; +use client::{Client, Database, RpcOutput}; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_grpc::flight::{flight_messages_to_recordbatches, FlightMessage}; use servers::server::Server; use tests_integration::test_util::{setup_grpc_server, StorageType}; @@ -180,7 +179,7 @@ async fn insert_and_assert(db: &Database) { // testing data: let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); - let expr = InsertExpr { + let request = InsertRequest { schema_name: "public".to_string(), table_name: "demo".to_string(), region_number: 0, @@ -192,7 +191,7 @@ async fn insert_and_assert(db: &Database) { ], row_count: 4, }; - let result = db.insert(expr).await; + let result = db.insert(request).await; result.unwrap(); let result = db @@ -203,16 +202,12 @@ async fn insert_and_assert(db: &Database) { ) .await .unwrap(); - assert!(matches!(result, ObjectResult::FlightData(_))); - let ObjectResult::FlightData(mut messages) = result else { unreachable!() }; - assert_eq!(messages.len(), 1); - assert!(matches!(messages.remove(0), FlightMessage::AffectedRows(2))); + assert!(matches!(result, RpcOutput::AffectedRows(2))); // select let result = db.sql("SELECT * FROM demo").await.unwrap(); match result { - ObjectResult::FlightData(flight_messages) => { - let recordbatches = flight_messages_to_recordbatches(flight_messages).unwrap(); + RpcOutput::RecordBatches(recordbatches) => { let pretty = recordbatches.pretty_print().unwrap(); let expected = "\ +-------+------+--------+-------------------------+ diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 5fb0073a02..38bb23ed52 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -18,9 +18,7 @@ use std::process::Stdio; use std::time::Duration; use async_trait::async_trait; -use client::{Client, Database as DB, Error as ClientError, ObjectResult}; -use common_grpc::flight; -use common_grpc::flight::FlightMessage; +use client::{Client, Database as DB, Error as ClientError, RpcOutput}; use sqlness::{Database, Environment}; use tokio::process::{Child, Command}; @@ -119,29 +117,22 @@ impl Database for GreptimeDB { } struct ResultDisplayer { - result: Result, + result: Result, } impl Display for ResultDisplayer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self.result { Ok(result) => match result { - ObjectResult::Mutate(mutate_result) => { - write!(f, "{mutate_result:?}") + RpcOutput::AffectedRows(rows) => { + write!(f, "Affected Rows: {rows}") } - ObjectResult::FlightData(messages) => { - if let Some(FlightMessage::AffectedRows(rows)) = messages.get(0) { - write!(f, "Affected Rows: {rows}") - } else { - let pretty = flight::flight_messages_to_recordbatches(messages.clone()) - .map_err(|e| e.to_string()) - .and_then(|x| x.pretty_print().map_err(|e| e.to_string())); - match pretty { - Ok(s) => write!(f, "{s}"), - Err(e) => write!( - f, - "Failed to convert Flight messages {messages:?} to Recordbatches, error: {e}" - ), + RpcOutput::RecordBatches(recordbatches) => { + let pretty = recordbatches.pretty_print().map_err(|e| e.to_string()); + match pretty { + Ok(s) => write!(f, "{s}"), + Err(e) => { + write!(f, "Failed to pretty format {recordbatches:?}, error: {e}") } } }