From c5b0d2431f4095c7a429e84124a594e4fd56df78 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 24 Nov 2022 14:04:48 +0800 Subject: [PATCH] feat: remove InsertBatch in gRPC message (#570) --- Cargo.lock | 1 + benchmarks/src/bin/nyc-taxi.rs | 19 +-- src/api/build.rs | 1 - src/api/greptime/v1/database.proto | 25 +-- src/api/greptime/v1/insert.proto | 14 -- src/api/src/serde.rs | 79 +-------- src/client/examples/insert.rs | 21 +-- src/common/grpc-expr/Cargo.toml | 1 + src/common/grpc-expr/src/insert.rs | 208 ++++++++++-------------- src/common/grpc-expr/src/lib.rs | 2 +- src/common/grpc/src/writer.rs | 25 +-- src/datanode/src/instance/grpc.rs | 30 +--- src/datanode/src/tests/grpc_test.rs | 19 +-- src/frontend/src/expr_factory.rs | 13 +- src/frontend/src/instance.rs | 156 +++++++----------- src/frontend/src/instance/influxdb.rs | 88 +++++----- src/frontend/src/instance/opentsdb.rs | 2 +- src/frontend/src/instance/prometheus.rs | 2 +- src/frontend/src/table.rs | 13 +- src/frontend/src/table/insert.rs | 35 +--- src/servers/src/influxdb.rs | 37 ++--- src/servers/src/opentsdb/codec.rs | 64 +++----- src/servers/src/prometheus.rs | 181 +++++++++------------ 23 files changed, 377 insertions(+), 659 deletions(-) delete mode 100644 src/api/greptime/v1/insert.proto diff --git a/Cargo.lock b/Cargo.lock index 4ab981bced..c4965d9e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,7 @@ dependencies = [ "common-base", "common-catalog", "common-error", + "common-grpc", "common-query", "common-telemetry", "common-time", diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index fe434591f3..0ca1f33182 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -28,9 +28,8 @@ use arrow::datatypes::{DataType, Float64Type, Int64Type}; use arrow::record_batch::RecordBatch; use clap::Parser; use client::admin::Admin; -use client::api::v1::codec::InsertBatch; use client::api::v1::column::Values; -use client::api::v1::{insert_expr, Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr}; +use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr}; use client::{Client, Database, Select}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; @@ -100,16 +99,13 @@ async fn write_data( for record_batch in record_batch_reader { let record_batch = record_batch.unwrap(); - let row_count = record_batch.num_rows(); - let insert_batch = convert_record_batch(record_batch).into(); + let (columns, row_count) = convert_record_batch(record_batch); let insert_expr = InsertExpr { schema_name: "public".to_string(), table_name: TABLE_NAME.to_string(), - expr: Some(insert_expr::Expr::Values(insert_expr::Values { - values: vec![insert_batch], - })), - options: HashMap::default(), region_number: 0, + columns, + row_count, }; let now = Instant::now(); db.insert(insert_expr).await.unwrap(); @@ -125,7 +121,7 @@ async fn write_data( total_rpc_elapsed_ms } -fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch { +fn convert_record_batch(record_batch: RecordBatch) -> (Vec, u32) { let schema = record_batch.schema(); let fields = schema.fields(); let row_count = record_batch.num_rows(); @@ -143,10 +139,7 @@ fn convert_record_batch(record_batch: RecordBatch) -> InsertBatch { columns.push(column); } - InsertBatch { - columns, - row_count: row_count as _, - } + (columns, row_count as _) } fn build_values(column: &ArrayRef) -> Values { diff --git a/src/api/build.rs b/src/api/build.rs index ec88ccf408..f3ff5f6600 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -20,7 +20,6 @@ fn main() { .file_descriptor_set_path(default_out_dir.join("greptime_fd.bin")) .compile( &[ - "greptime/v1/insert.proto", "greptime/v1/select.proto", "greptime/v1/physical_plan.proto", "greptime/v1/greptime.proto", diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index e4b651f322..1cd6a6ee3e 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package greptime.v1; +import "greptime/v1/column.proto"; import "greptime/v1/common.proto"; message DatabaseRequest { @@ -41,26 +42,16 @@ message InsertExpr { string schema_name = 1; string table_name = 2; - message Values { - repeated bytes values = 1; - } + // Data is represented here. + repeated Column columns = 3; - oneof expr { - Values values = 3; + // The row_count of all columns, which include null and non-null values. + // + // Note: the row_count of all columns in a InsertExpr must be same. + uint32 row_count = 4; - // TODO(LFC): Remove field "sql" in InsertExpr. - // When Frontend instance received an insertion SQL (`insert into ...`), it's anticipated to parse the SQL and - // assemble the values to insert to feed Datanode. In other words, inserting data through Datanode instance's GRPC - // interface shouldn't use SQL directly. - // Then why the "sql" field exists here? It's because the Frontend needs table schema to create the values to insert, - // which is currently not able to find anywhere. (Maybe the table schema is suppose to be fetched from Meta?) - // The "sql" field is meant to be removed in the future. - string sql = 4; - } - - /// The region number of current insert request. + // The region number of current insert request. uint32 region_number = 5; - map options = 6; } // TODO(jiachun) diff --git a/src/api/greptime/v1/insert.proto b/src/api/greptime/v1/insert.proto deleted file mode 100644 index 0e173723a6..0000000000 --- a/src/api/greptime/v1/insert.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - -package greptime.v1.codec; - -import "greptime/v1/column.proto"; - -message InsertBatch { - repeated Column columns = 1; - uint32 row_count = 2; -} - -message RegionNumber { - uint32 id = 1; -} diff --git a/src/api/src/serde.rs b/src/api/src/serde.rs index 9daf64e8e6..1523bfbcfe 100644 --- a/src/api/src/serde.rs +++ b/src/api/src/serde.rs @@ -15,7 +15,7 @@ pub use prost::DecodeError; use prost::Message; -use crate::v1::codec::{InsertBatch, PhysicalPlanNode, RegionNumber, SelectResult}; +use crate::v1::codec::{PhysicalPlanNode, SelectResult}; use crate::v1::meta::TableRouteValue; macro_rules! impl_convert_with_bytes { @@ -36,10 +36,8 @@ macro_rules! impl_convert_with_bytes { }; } -impl_convert_with_bytes!(InsertBatch); impl_convert_with_bytes!(SelectResult); impl_convert_with_bytes!(PhysicalPlanNode); -impl_convert_with_bytes!(RegionNumber); impl_convert_with_bytes!(TableRouteValue); #[cfg(test)] @@ -51,52 +49,6 @@ mod tests { const SEMANTIC_TAG: i32 = 0; - #[test] - fn test_convert_insert_batch() { - let insert_batch = mock_insert_batch(); - - let bytes: Vec = insert_batch.into(); - let insert: InsertBatch = bytes.deref().try_into().unwrap(); - - assert_eq!(8, insert.row_count); - assert_eq!(1, insert.columns.len()); - - let column = &insert.columns[0]; - assert_eq!("foo", column.column_name); - assert_eq!(SEMANTIC_TAG, column.semantic_type); - assert_eq!(vec![1], column.null_mask); - assert_eq!( - vec![2, 3, 4, 5, 6, 7, 8], - column.values.as_ref().unwrap().i32_values - ); - } - - #[should_panic] - #[test] - fn test_convert_insert_batch_wrong() { - let insert_batch = mock_insert_batch(); - - let mut bytes: Vec = insert_batch.into(); - - // modify some bytes - bytes[0] = 0b1; - bytes[1] = 0b1; - - let insert: InsertBatch = bytes.deref().try_into().unwrap(); - - assert_eq!(8, insert.row_count); - assert_eq!(1, insert.columns.len()); - - let column = &insert.columns[0]; - assert_eq!("foo", column.column_name); - assert_eq!(SEMANTIC_TAG, column.semantic_type); - assert_eq!(vec![1], column.null_mask); - assert_eq!( - vec![2, 3, 4, 5, 6, 7, 8], - column.values.as_ref().unwrap().i32_values - ); - } - #[test] fn test_convert_select_result() { let select_result = mock_select_result(); @@ -143,35 +95,6 @@ mod tests { ); } - #[test] - fn test_convert_region_id() { - let region_id = RegionNumber { id: 12 }; - - let bytes: Vec = region_id.into(); - let region_id: RegionNumber = bytes.deref().try_into().unwrap(); - - assert_eq!(12, region_id.id); - } - - fn mock_insert_batch() -> InsertBatch { - let values = column::Values { - i32_values: vec![2, 3, 4, 5, 6, 7, 8], - ..Default::default() - }; - let null_mask = vec![1]; - let column = Column { - column_name: "foo".to_string(), - semantic_type: SEMANTIC_TAG, - values: Some(values), - null_mask, - ..Default::default() - }; - InsertBatch { - columns: vec![column], - row_count: 8, - } - } - fn mock_select_result() -> SelectResult { let values = column::Values { i32_values: vec![2, 3, 4, 5, 6, 7, 8], diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs index e85d45200c..66f38eded3 100644 --- a/src/client/examples/insert.rs +++ b/src/client/examples/insert.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use api::v1::codec::InsertBatch; use api::v1::*; use client::{Client, Database}; + fn main() { tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::builder().finish()) .unwrap(); @@ -29,19 +27,19 @@ async fn run() { let client = Client::with_urls(vec!["127.0.0.1:3001"]); let db = Database::new("greptime", client); + let (columns, row_count) = insert_data(); + let expr = InsertExpr { schema_name: "public".to_string(), table_name: "demo".to_string(), - expr: Some(insert_expr::Expr::Values(insert_expr::Values { - values: insert_batches(), - })), - options: HashMap::default(), region_number: 0, + columns, + row_count, }; db.insert(expr).await.unwrap(); } -fn insert_batches() -> Vec> { +fn insert_data() -> (Vec, u32) { const SEMANTIC_TAG: i32 = 0; const SEMANTIC_FIELD: i32 = 1; const SEMANTIC_TS: i32 = 2; @@ -101,9 +99,8 @@ fn insert_batches() -> Vec> { ..Default::default() }; - let insert_batch = InsertBatch { - columns: vec![host_column, cpu_column, mem_column, ts_column], + ( + vec![host_column, cpu_column, mem_column, ts_column], row_count, - }; - vec![insert_batch.into()] + ) } diff --git a/src/common/grpc-expr/Cargo.toml b/src/common/grpc-expr/Cargo.toml index 5adef32823..111b133861 100644 --- a/src/common/grpc-expr/Cargo.toml +++ b/src/common/grpc-expr/Cargo.toml @@ -9,6 +9,7 @@ api = { path = "../../api" } async-trait = "0.1" common-base = { path = "../base" } common-error = { path = "../error" } +common-grpc = { path = "../grpc" } common-telemetry = { path = "../telemetry" } common-time = { path = "../time" } common-catalog = { path = "../catalog" } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index e37139e037..d7687d0789 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -14,11 +14,9 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::ops::Deref; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::codec::InsertBatch; use api::v1::column::{SemanticType, Values}; use api::v1::{AddColumn, AddColumns, Column, ColumnDataType, ColumnDef, CreateExpr}; use common_base::BitVec; @@ -35,9 +33,8 @@ use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequ use table::Table; use crate::error::{ - ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DecodeInsertSnafu, - DuplicatedTimestampColumnSnafu, IllegalInsertDataSnafu, InvalidColumnProtoSnafu, - MissingTimestampColumnSnafu, Result, + ColumnDataTypeSnafu, ColumnNotFoundSnafu, CreateVectorSnafu, DuplicatedTimestampColumnSnafu, + IllegalInsertDataSnafu, InvalidColumnProtoSnafu, MissingTimestampColumnSnafu, Result, }; const TAG_SEMANTIC_TYPE: i32 = SemanticType::Tag as i32; const TIMESTAMP_SEMANTIC_TYPE: i32 = SemanticType::Timestamp as i32; @@ -52,35 +49,25 @@ fn build_column_def(column_name: &str, datatype: i32, nullable: bool) -> ColumnD } } -pub fn find_new_columns( - schema: &SchemaRef, - insert_batches: &[InsertBatch], -) -> Result> { +pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result> { let mut columns_to_add = Vec::default(); let mut new_columns: HashSet = HashSet::default(); - for InsertBatch { columns, row_count } in insert_batches { - if *row_count == 0 || columns.is_empty() { - continue; - } - - for Column { - column_name, - semantic_type, - datatype, - .. - } in columns + for Column { + column_name, + semantic_type, + datatype, + .. + } in columns + { + if schema.column_schema_by_name(column_name).is_none() && !new_columns.contains(column_name) { - if schema.column_schema_by_name(column_name).is_none() - && !new_columns.contains(column_name) - { - let column_def = Some(build_column_def(column_name, *datatype, true)); - columns_to_add.push(AddColumn { - column_def, - is_key: *semantic_type == TAG_SEMANTIC_TYPE, - }); - new_columns.insert(column_name.to_string()); - } + let column_def = Some(build_column_def(column_name, *datatype, true)); + columns_to_add.push(AddColumn { + column_def, + is_key: *semantic_type == TAG_SEMANTIC_TYPE, + }); + new_columns.insert(column_name.to_string()); } } @@ -201,92 +188,84 @@ pub fn build_create_expr_from_insertion( schema_name: &str, table_id: Option, table_name: &str, - insert_batches: &[InsertBatch], + columns: &[Column], ) -> Result { let mut new_columns: HashSet = HashSet::default(); let mut column_defs = Vec::default(); let mut primary_key_indices = Vec::default(); let mut timestamp_index = usize::MAX; - for InsertBatch { columns, row_count } in insert_batches { - if *row_count == 0 || columns.is_empty() { - continue; - } - - for Column { - column_name, - semantic_type, - datatype, - .. - } in columns - { - if !new_columns.contains(column_name) { - let mut is_nullable = true; - match *semantic_type { - TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()), - TIMESTAMP_SEMANTIC_TYPE => { - ensure!( - timestamp_index == usize::MAX, - DuplicatedTimestampColumnSnafu { - exists: &columns[timestamp_index].column_name, - duplicated: column_name, - } - ); - timestamp_index = column_defs.len(); - // Timestamp column must not be null. - is_nullable = false; - } - _ => {} + for Column { + column_name, + semantic_type, + datatype, + .. + } in columns + { + if !new_columns.contains(column_name) { + let mut is_nullable = true; + match *semantic_type { + TAG_SEMANTIC_TYPE => primary_key_indices.push(column_defs.len()), + TIMESTAMP_SEMANTIC_TYPE => { + ensure!( + timestamp_index == usize::MAX, + DuplicatedTimestampColumnSnafu { + exists: &columns[timestamp_index].column_name, + duplicated: column_name, + } + ); + timestamp_index = column_defs.len(); + // Timestamp column must not be null. + is_nullable = false; } - - let column_def = build_column_def(column_name, *datatype, is_nullable); - column_defs.push(column_def); - new_columns.insert(column_name.to_string()); + _ => {} } + + let column_def = build_column_def(column_name, *datatype, is_nullable); + column_defs.push(column_def); + new_columns.insert(column_name.to_string()); } - - ensure!( - timestamp_index != usize::MAX, - MissingTimestampColumnSnafu { msg: table_name } - ); - let timestamp_field_name = columns[timestamp_index].column_name.clone(); - - let primary_keys = primary_key_indices - .iter() - .map(|idx| columns[*idx].column_name.clone()) - .collect::>(); - - let expr = CreateExpr { - catalog_name: Some(catalog_name.to_string()), - schema_name: Some(schema_name.to_string()), - table_name: table_name.to_string(), - desc: Some("Created on insertion".to_string()), - column_defs, - time_index: timestamp_field_name, - primary_keys, - create_if_not_exists: true, - table_options: Default::default(), - table_id, - region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend - }; - - return Ok(expr); } - IllegalInsertDataSnafu.fail() + ensure!( + timestamp_index != usize::MAX, + MissingTimestampColumnSnafu { msg: table_name } + ); + let timestamp_field_name = columns[timestamp_index].column_name.clone(); + + let primary_keys = primary_key_indices + .iter() + .map(|idx| columns[*idx].column_name.clone()) + .collect::>(); + + let expr = CreateExpr { + catalog_name: Some(catalog_name.to_string()), + schema_name: Some(schema_name.to_string()), + table_name: table_name.to_string(), + desc: Some("Created on insertion".to_string()), + column_defs, + time_index: timestamp_field_name, + primary_keys, + create_if_not_exists: true, + table_options: Default::default(), + table_id, + region_ids: vec![0], // TODO:(hl): region id should be allocated by frontend + }; + + Ok(expr) } pub fn insertion_expr_to_request( catalog_name: &str, schema_name: &str, table_name: &str, - insert_batches: Vec, + insert_batches: Vec<(Vec, u32)>, table: Arc, ) -> Result { let schema = table.schema(); let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len()); - for InsertBatch { columns, row_count } in insert_batches { + for (columns, row_count) in insert_batches { for Column { column_name, values, @@ -332,14 +311,6 @@ pub fn insertion_expr_to_request( }) } -#[inline] -pub fn insert_batches(bytes_vec: &[Vec]) -> Result> { - bytes_vec - .iter() - .map(|bytes| bytes.deref().try_into().context(DecodeInsertSnafu)) - .collect() -} - fn add_values_to_builder( builder: &mut VectorBuilder, values: Values, @@ -466,9 +437,8 @@ mod tests { use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; - use api::v1::codec::InsertBatch; use api::v1::column::{self, SemanticType, Values}; - use api::v1::{insert_expr, Column, ColumnDataType}; + use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; @@ -482,11 +452,12 @@ mod tests { use table::Table; use super::{ - build_create_expr_from_insertion, convert_values, find_new_columns, insert_batches, - insertion_expr_to_request, is_null, TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE, + build_create_expr_from_insertion, convert_values, insertion_expr_to_request, is_null, + TAG_SEMANTIC_TYPE, TIMESTAMP_SEMANTIC_TYPE, }; use crate::error; use crate::error::ColumnDataTypeSnafu; + use crate::insert::find_new_columns; #[inline] fn build_column_schema( @@ -511,11 +482,10 @@ mod tests { assert!(build_create_expr_from_insertion("", "", table_id, table_name, &[]).is_err()); - let mock_batch_bytes = mock_insert_batches(); - let insert_batches = insert_batches(&mock_batch_bytes).unwrap(); + let insert_batch = mock_insert_batch(); let create_expr = - build_create_expr_from_insertion("", "", table_id, table_name, &insert_batches) + build_create_expr_from_insertion("", "", table_id, table_name, &insert_batch.0) .unwrap(); assert_eq!(table_id, create_expr.table_id); @@ -601,9 +571,9 @@ mod tests { assert!(find_new_columns(&schema, &[]).unwrap().is_none()); - let mock_insert_bytes = mock_insert_batches(); - let insert_batches = insert_batches(&mock_insert_bytes).unwrap(); - let add_columns = find_new_columns(&schema, &insert_batches).unwrap().unwrap(); + let insert_batch = mock_insert_batch(); + + let add_columns = find_new_columns(&schema, &insert_batch.0).unwrap().unwrap(); assert_eq!(2, add_columns.add_columns.len()); let host_column = &add_columns.add_columns[0]; @@ -633,10 +603,7 @@ mod tests { fn test_insertion_expr_to_request() { let table: Arc = Arc::new(DemoTable {}); - let values = insert_expr::Values { - values: mock_insert_batches(), - }; - let insert_batches = insert_batches(&values.values).unwrap(); + let insert_batches = vec![mock_insert_batch()]; let insert_req = insertion_expr_to_request("greptime", "public", "demo", insert_batches, table).unwrap(); @@ -734,7 +701,7 @@ mod tests { } } - fn mock_insert_batches() -> Vec> { + fn mock_insert_batch() -> (Vec, u32) { let row_count = 2; let host_vals = column::Values { @@ -785,10 +752,9 @@ mod tests { datatype: ColumnDataType::Timestamp as i32, }; - let insert_batch = InsertBatch { - columns: vec![host_column, cpu_column, mem_column, ts_column], + ( + vec![host_column, cpu_column, mem_column, ts_column], row_count, - }; - vec![insert_batch.into()] + ) } } diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index 2c3beea6ff..71786d670f 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -20,5 +20,5 @@ mod insert; pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; pub use insert::{ build_alter_table_request, build_create_expr_from_insertion, column_to_vector, - find_new_columns, insert_batches, insertion_expr_to_request, + find_new_columns, insertion_expr_to_request, }; diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 42e27fc887..2cd28f45af 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; -use api::v1::codec::InsertBatch; use api::v1::column::{SemanticType, Values}; use api::v1::{Column, ColumnDataType}; use common_base::BitVec; @@ -24,12 +23,14 @@ use crate::error::{Result, TypeMismatchSnafu}; type ColumnName = String; +type RowCount = u32; + // TODO(fys): will remove in the future. #[derive(Default)] pub struct LinesWriter { column_name_index: HashMap, null_masks: Vec, - batch: InsertBatch, + batch: (Vec, RowCount), lines: usize, } @@ -171,20 +172,20 @@ impl LinesWriter { pub fn commit(&mut self) { let batch = &mut self.batch; - batch.row_count += 1; + batch.1 += 1; - for i in 0..batch.columns.len() { + for i in 0..batch.0.len() { let null_mask = &mut self.null_masks[i]; - if batch.row_count as usize > null_mask.len() { + if batch.1 as usize > null_mask.len() { null_mask.push(true); } } } - pub fn finish(mut self) -> InsertBatch { + pub fn finish(mut self) -> (Vec, RowCount) { let null_masks = self.null_masks; for (i, null_mask) in null_masks.into_iter().enumerate() { - let columns = &mut self.batch.columns; + let columns = &mut self.batch.0; columns[i].null_mask = null_mask.into_vec(); } self.batch @@ -204,9 +205,9 @@ impl LinesWriter { let batch = &mut self.batch; let to_insert = self.lines; let mut null_mask = BitVec::with_capacity(to_insert); - null_mask.extend(BitVec::repeat(true, batch.row_count as usize)); + null_mask.extend(BitVec::repeat(true, batch.1 as usize)); self.null_masks.push(null_mask); - batch.columns.push(Column { + batch.0.push(Column { column_name: column_name.to_string(), semantic_type: semantic_type.into(), values: Some(Values::with_capacity(datatype, to_insert)), @@ -217,7 +218,7 @@ impl LinesWriter { new_idx } }; - (column_idx, &mut self.batch.columns[column_idx]) + (column_idx, &mut self.batch.0[column_idx]) } } @@ -282,9 +283,9 @@ mod tests { writer.commit(); let insert_batch = writer.finish(); - assert_eq!(3, insert_batch.row_count); + assert_eq!(3, insert_batch.1); - let columns = insert_batch.columns; + let columns = insert_batch.0; assert_eq!(9, columns.len()); let column = &columns[0]; diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index e9e0a48f19..ee7c4f8c0e 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -14,7 +14,7 @@ use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; use api::v1::{ - admin_expr, insert_expr, object_expr, select_expr, AdminExpr, AdminResult, CreateDatabaseExpr, + admin_expr, object_expr, select_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr, ObjectExpr, ObjectResult, SelectExpr, }; use async_trait::async_trait; @@ -44,7 +44,7 @@ impl Instance { catalog_name: &str, schema_name: &str, table_name: &str, - values: insert_expr::Values, + insert_batches: Vec<(Vec, u32)>, ) -> Result { let schema_provider = self .catalog_manager @@ -55,11 +55,7 @@ impl Instance { .context(CatalogSnafu)? .context(SchemaNotFoundSnafu { name: schema_name })?; - let insert_batches = - common_grpc_expr::insert_batches(&values.values).context(InsertDataSnafu)?; - ensure!(!insert_batches.is_empty(), EmptyInsertBatchSnafu); - let table = schema_provider .table(table_name) .context(CatalogSnafu)? @@ -87,10 +83,10 @@ impl Instance { catalog_name: &str, schema_name: &str, table_name: &str, - values: insert_expr::Values, + insert_batches: Vec<(Vec, u32)>, ) -> ObjectResult { match self - .execute_grpc_insert(catalog_name, schema_name, table_name, values) + .execute_grpc_insert(catalog_name, schema_name, table_name, insert_batches) .await { Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new() @@ -170,25 +166,13 @@ impl GrpcQueryHandler for Instance { let catalog_name = DEFAULT_CATALOG_NAME; let schema_name = &insert_expr.schema_name; let table_name = &insert_expr.table_name; - let expr = insert_expr - .expr - .context(servers::error::InvalidQuerySnafu { - reason: "missing `expr` in `InsertExpr`", - })?; // TODO(fys): _region_number is for later use. let _region_number: u32 = insert_expr.region_number; - match expr { - insert_expr::Expr::Values(values) => { - self.handle_insert(catalog_name, schema_name, table_name, values) - .await - } - insert_expr::Expr::Sql(sql) => { - let output = self.execute_sql(&sql).await; - to_object_result(output).await - } - } + let insert_batches = vec![(insert_expr.columns, insert_expr.row_count)]; + self.handle_insert(catalog_name, schema_name, table_name, insert_batches) + .await } Some(object_expr::Expr::Select(select_expr)) => self.handle_select(select_expr).await, other => { diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 3116543c1b..5fefa42cba 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -13,17 +13,15 @@ // limitations under the License. use std::assert_matches::assert_matches; -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use api::v1::alter_expr::Kind; -use api::v1::codec::InsertBatch; use api::v1::column::SemanticType; use api::v1::{ - admin_result, column, insert_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, - ColumnDef, CreateExpr, InsertExpr, MutateResult, + admin_result, column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, + CreateExpr, InsertExpr, MutateResult, }; use client::admin::Admin; use client::{Client, Database, ObjectResult}; @@ -230,7 +228,10 @@ async fn insert_and_assert(db: &Database) { // testing data: let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); - let values = vec![InsertBatch { + let expr = InsertExpr { + schema_name: "public".to_string(), + table_name: "demo".to_string(), + region_number: 0, columns: vec![ expected_host_col.clone(), expected_cpu_col.clone(), @@ -238,14 +239,6 @@ async fn insert_and_assert(db: &Database) { expected_ts_col.clone(), ], row_count: 4, - } - .into()]; - let expr = InsertExpr { - schema_name: "public".to_string(), - table_name: "demo".to_string(), - expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), - options: HashMap::default(), - region_number: 0, }; let result = db.insert(expr).await; result.unwrap(); diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index 99532257de..9f406ace0b 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -16,8 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::codec::InsertBatch; -use api::v1::{ColumnDataType, CreateExpr}; +use api::v1::{Column, ColumnDataType, CreateExpr}; use datatypes::schema::ColumnSchema; use snafu::{ensure, ResultExt}; use sql::statements::create::{CreateTable, TIME_INDEX}; @@ -35,12 +34,12 @@ pub type CreateExprFactoryRef = Arc; pub trait CreateExprFactory { async fn create_expr_by_stmt(&self, stmt: &CreateTable) -> Result; - async fn create_expr_by_insert_batch( + async fn create_expr_by_columns( &self, catalog_name: &str, schema_name: &str, table_name: &str, - batch: &[InsertBatch], + columns: &[Column], ) -> crate::error::Result; } @@ -53,12 +52,12 @@ impl CreateExprFactory for DefaultCreateExprFactory { create_to_expr(None, vec![0], stmt) } - async fn create_expr_by_insert_batch( + async fn create_expr_by_columns( &self, catalog_name: &str, schema_name: &str, table_name: &str, - batch: &[InsertBatch], + columns: &[Column], ) -> Result { let table_id = None; let create_expr = common_grpc_expr::build_create_expr_from_insertion( @@ -66,7 +65,7 @@ impl CreateExprFactory for DefaultCreateExprFactory { schema_name, table_id, table_name, - batch, + columns, ) .context(BuildCreateExprOnInsertionSnafu)?; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 26f1dd72fe..4d82d4d601 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -17,16 +17,14 @@ mod influxdb; mod opentsdb; mod prometheus; -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use api::result::ObjectResultBuilder; use api::v1::alter_expr::Kind; -use api::v1::codec::InsertBatch; use api::v1::object_expr::Expr; use api::v1::{ - admin_expr, insert_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, + admin_expr, select_expr, AddColumns, AdminExpr, AdminResult, AlterExpr, Column, CreateDatabaseExpr, CreateExpr, InsertExpr, ObjectExpr, ObjectResult as GrpcObjectResult, }; use async_trait::async_trait; @@ -60,13 +58,13 @@ use crate::catalog::FrontendCatalogManager; use crate::datanode::DatanodeClients; use crate::error::{ self, AlterTableOnInsertionSnafu, AlterTableSnafu, CatalogNotFoundSnafu, CatalogSnafu, - CreateDatabaseSnafu, CreateTableSnafu, DeserializeInsertBatchSnafu, - FindNewColumnsOnInsertionSnafu, InsertSnafu, MissingMetasrvOptsSnafu, Result, - SchemaNotFoundSnafu, SelectSnafu, + CreateDatabaseSnafu, CreateTableSnafu, FindNewColumnsOnInsertionSnafu, InsertSnafu, + MissingMetasrvOptsSnafu, Result, SchemaNotFoundSnafu, SelectSnafu, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; use crate::sql::insert_to_request; +use crate::table::insert::insert_request_to_insert_batch; use crate::table::route::TableRoutes; #[async_trait] @@ -292,7 +290,7 @@ impl Instance { } /// Handle batch inserts - pub async fn handle_inserts(&self, insert_expr: &[InsertExpr]) -> Result { + pub async fn handle_inserts(&self, insert_expr: Vec) -> Result { let mut success = 0; for expr in insert_expr { match self.handle_insert(expr).await? { @@ -304,68 +302,20 @@ impl Instance { } /// Handle insert. for 'values' insertion, create/alter the destination table on demand. - pub async fn handle_insert(&self, insert_expr: &InsertExpr) -> Result { + pub async fn handle_insert(&self, mut insert_expr: InsertExpr) -> Result { let table_name = &insert_expr.table_name; let catalog_name = DEFAULT_CATALOG_NAME; let schema_name = &insert_expr.schema_name; - if let Some(expr) = &insert_expr.expr { - match expr { - api::v1::insert_expr::Expr::Values(values) => { - // TODO(hl): gRPC should also support partitioning. - let region_number = 0; - self.handle_insert_values( - catalog_name, - schema_name, - table_name, - region_number, - values, - ) - .await - } - api::v1::insert_expr::Expr::Sql(_) => { - // Frontend does not comprehend insert request that is raw SQL string - self.database(schema_name) - .insert(insert_expr.clone()) - .await - .and_then(Output::try_from) - .context(InsertSnafu) - } - } - } else { - // expr is empty - Ok(Output::AffectedRows(0)) - } - } + let columns = &insert_expr.columns; + + self.create_or_alter_table_on_demand(catalog_name, schema_name, table_name, columns) + .await?; + + insert_expr.region_number = 0; - /// Handle insert requests in frontend - /// If insert is SQL string flavor, just forward to datanode - /// If insert is parsed InsertExpr, frontend should comprehend the schema and create/alter table on demand. - pub async fn handle_insert_values( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - region_number: u32, - values: &insert_expr::Values, - ) -> Result { - let insert_batches = common_grpc_expr::insert_batches(&values.values) - .context(DeserializeInsertBatchSnafu)?; - self.create_or_alter_table_on_demand( - catalog_name, - schema_name, - table_name, - &insert_batches, - ) - .await?; self.database(schema_name) - .insert(InsertExpr { - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - region_number, - options: Default::default(), - expr: Some(insert_expr::Expr::Values(values.clone())), - }) + .insert(insert_expr) .await .and_then(Output::try_from) .context(InsertSnafu) @@ -379,7 +329,7 @@ impl Instance { catalog_name: &str, schema_name: &str, table_name: &str, - insert_batches: &[InsertBatch], + columns: &[Column], ) -> Result<()> { match self .catalog_manager @@ -401,13 +351,8 @@ impl Instance { "Table {}.{}.{} does not exist, try create table", catalog_name, schema_name, table_name, ); - self.create_table_by_insert_batches( - catalog_name, - schema_name, - table_name, - insert_batches, - ) - .await?; + self.create_table_by_columns(catalog_name, schema_name, table_name, columns) + .await?; info!( "Successfully created table on insertion: {}.{}.{}", catalog_name, schema_name, table_name @@ -415,9 +360,9 @@ impl Instance { } Some(table) => { let schema = table.schema(); - if let Some(add_columns) = - common_grpc_expr::find_new_columns(&schema, insert_batches) - .context(FindNewColumnsOnInsertionSnafu)? + + if let Some(add_columns) = common_grpc_expr::find_new_columns(&schema, columns) + .context(FindNewColumnsOnInsertionSnafu)? { info!( "Find new columns {:?} on insertion, try to alter table: {}.{}.{}", @@ -441,17 +386,17 @@ impl Instance { } /// Infer create table expr from inserting data - async fn create_table_by_insert_batches( + async fn create_table_by_columns( &self, catalog_name: &str, schema_name: &str, table_name: &str, - insert_batches: &[InsertBatch], + columns: &[Column], ) -> Result { // Create table automatically, build schema from data. let create_expr = self .create_expr_factory - .create_expr_by_insert_batch(catalog_name, schema_name, table_name, insert_batches) + .create_expr_by_columns(catalog_name, schema_name, table_name, columns) .await?; info!( @@ -512,9 +457,10 @@ impl Instance { let insert_request = insert_to_request(&schema_provider, *insert)?; - let batch = crate::table::insert::insert_request_to_insert_batch(&insert_request)?; + let (columns, _row_count) = + crate::table::insert::insert_request_to_insert_batch(&insert_request)?; - self.create_or_alter_table_on_demand(&catalog, &schema, &table, &[batch]) + self.create_or_alter_table_on_demand(&catalog, &schema, &table, &columns) .await?; let table = schema_provider @@ -527,6 +473,19 @@ impl Instance { .await .context(error::TableSnafu) } + + fn stmt_to_insert_batch( + &self, + catalog: &str, + schema: &str, + insert: Box, + ) -> Result<(Vec, u32)> { + let catalog_provider = self.get_catalog(catalog)?; + let schema_provider = Self::get_schema(catalog_provider, schema)?; + + let insert_request = insert_to_request(&schema_provider, *insert)?; + insert_request_to_insert_batch(&insert_request) + } } #[async_trait] @@ -580,7 +539,7 @@ impl SqlQueryHandler for Instance { .context(server_error::ExecuteQuerySnafu { query }), Statement::Insert(insert) => match self.mode { Mode::Standalone => { - let (_, schema_name, table_name) = insert + let (catalog_name, schema_name, table_name) = insert .full_table_name() .context(error::ParseSqlSnafu) .map_err(BoxedError::new) @@ -588,14 +547,19 @@ impl SqlQueryHandler for Instance { msg: "Failed to get table name", })?; + let (columns, row_count) = self + .stmt_to_insert_batch(&catalog_name, &schema_name, insert) + .map_err(BoxedError::new) + .context(server_error::ExecuteQuerySnafu { query })?; + let expr = InsertExpr { schema_name, table_name, - expr: Some(insert_expr::Expr::Sql(query.to_string())), region_number: 0, - options: HashMap::default(), + columns, + row_count, }; - self.handle_insert(&expr) + self.handle_insert(expr) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { query }) @@ -696,7 +660,8 @@ impl GrpcQueryHandler for Instance { if let Some(expr) = &query.expr { match expr { Expr::Insert(insert) => { - let result = self.handle_insert(insert).await; + // TODO(fys): refactor, avoid clone + let result = self.handle_insert(insert.clone()).await; result .map(|o| match o { Output::AffectedRows(rows) => ObjectResultBuilder::new() @@ -787,7 +752,7 @@ impl GrpcAdminHandler for Instance { mod tests { use std::assert_matches::assert_matches; - use api::v1::codec::{InsertBatch, SelectResult}; + use api::v1::codec::SelectResult; use api::v1::column::SemanticType; use api::v1::{ admin_expr, admin_result, column, object_expr, object_result, select_expr, Column, @@ -946,22 +911,19 @@ mod tests { ); // insert - let values = vec![InsertBatch { - columns: vec![ - expected_host_col.clone(), - expected_cpu_col.clone(), - expected_mem_col.clone(), - expected_ts_col.clone(), - ], - row_count: 4, - } - .into()]; + let columns = vec![ + expected_host_col.clone(), + expected_cpu_col.clone(), + expected_mem_col.clone(), + expected_ts_col.clone(), + ]; + let row_count = 4; let insert_expr = InsertExpr { schema_name: "public".to_string(), table_name: "demo".to_string(), - expr: Some(insert_expr::Expr::Values(insert_expr::Values { values })), - options: HashMap::default(), region_number: 0, + columns, + row_count, }; let object_expr = ObjectExpr { header: Some(ExprHeader::default()), diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 8441094d50..aa25cffb8f 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -14,9 +14,7 @@ use std::collections::HashMap; -use api::v1::codec::InsertBatch; -use api::v1::insert_expr::Expr; -use api::v1::InsertExpr; +use api::v1::{Column, InsertExpr}; use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; @@ -28,7 +26,7 @@ use snafu::{OptionExt, ResultExt}; use table::requests::InsertRequest; use crate::error; -use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result}; +use crate::error::{InsertBatchToRequestSnafu, Result}; use crate::instance::Instance; #[async_trait] @@ -37,7 +35,7 @@ impl InfluxdbLineProtocolHandler for Instance { match self.mode { Mode::Standalone => { let exprs: Vec = request.try_into()?; - self.handle_inserts(&exprs) + self.handle_inserts(exprs) .await .map_err(BoxedError::new) .context(server_error::ExecuteQuerySnafu { @@ -61,53 +59,41 @@ impl InfluxdbLineProtocolHandler for Instance { impl Instance { pub(crate) async fn dist_insert(&self, inserts: Vec) -> Result { let mut joins = Vec::with_capacity(inserts.len()); - let catalog_name = DEFAULT_CATALOG_NAME.to_string(); + let catalog_name = DEFAULT_CATALOG_NAME; for insert in inserts { let self_clone = self.clone(); - let insert_batches = match &insert.expr.unwrap() { - Expr::Values(values) => common_grpc_expr::insert_batches(&values.values) - .context(DeserializeInsertBatchSnafu)?, - Expr::Sql(_) => unreachable!(), - }; - self.create_or_alter_table_on_demand( - DEFAULT_CATALOG_NAME, - &insert.schema_name, - &insert.table_name, - &insert_batches, - ) - .await?; + let schema_name = insert.schema_name.to_string(); + let table_name = insert.table_name.to_string(); - let schema_name = insert.schema_name.clone(); - let table_name = insert.table_name.clone(); + let columns = &insert.columns; + let row_count = insert.row_count; - for insert_batch in &insert_batches { - let catalog_name = catalog_name.clone(); - let schema_name = schema_name.clone(); - let table_name = table_name.clone(); - let request = Self::insert_batch_to_request( - DEFAULT_CATALOG_NAME, - &schema_name, - &table_name, - insert_batch, - )?; - // TODO(fys): need a separate runtime here - let self_clone = self_clone.clone(); - let join = tokio::spawn(async move { - let catalog = self_clone.get_catalog(&catalog_name)?; - let schema = Self::get_schema(catalog, &schema_name)?; - let table = schema - .table(&table_name) - .context(error::CatalogSnafu)? - .context(error::TableNotFoundSnafu { - table_name: &table_name, - })?; + self.create_or_alter_table_on_demand(catalog_name, &schema_name, &table_name, columns) + .await?; - table.insert(request).await.context(error::TableSnafu) - }); - joins.push(join); - } + let request = Self::columns_to_request( + catalog_name, + &schema_name, + &table_name, + columns, + row_count, + )?; + + // TODO(fys): need a separate runtime here + let self_clone = self_clone.clone(); + let join = tokio::spawn(async move { + let catalog = self_clone.get_catalog(catalog_name)?; + let schema = Self::get_schema(catalog, &schema_name)?; + let table = schema + .table(&table_name) + .context(error::CatalogSnafu)? + .context(error::TableNotFoundSnafu { table_name })?; + + table.insert(request).await.context(error::TableSnafu) + }); + joins.push(join); } let mut affected = 0; @@ -119,16 +105,16 @@ impl Instance { Ok(affected) } - fn insert_batch_to_request( + fn columns_to_request( catalog_name: &str, schema_name: &str, table_name: &str, - batches: &InsertBatch, + columns: &[Column], + row_count: u32, ) -> Result { - let mut vectors = HashMap::with_capacity(batches.columns.len()); - for col in &batches.columns { - let vector = - column_to_vector(col, batches.row_count).context(InsertBatchToRequestSnafu)?; + let mut vectors = HashMap::with_capacity(columns.len()); + for col in columns { + let vector = column_to_vector(col, row_count).context(InsertBatchToRequestSnafu)?; vectors.insert(col.column_name.clone(), vector); } Ok(InsertRequest { diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 1a7db09014..4d35b1d8ea 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance { impl Instance { async fn insert_opentsdb_metric(&self, data_point: &DataPoint) -> Result<()> { let expr = data_point.as_grpc_insert(); - self.handle_insert(&expr).await?; + self.handle_insert(expr).await?; Ok(()) } } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 246542893d..ab9f0dea59 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -115,7 +115,7 @@ impl PrometheusProtocolHandler for Instance { Mode::Standalone => { let exprs = prometheus::write_request_to_insert_exprs(database, request)?; let futures = exprs - .iter() + .into_iter() .map(|e| self.handle_insert(e)) .collect::>(); let res = futures_util::future::join_all(futures) diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 91211f88d9..8f97ba12f7 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -511,9 +511,8 @@ impl PartitionExec { mod test { use std::time::Duration; - use api::v1::codec::InsertBatch; use api::v1::column::SemanticType; - use api::v1::{column, insert_expr, Column, ColumnDataType}; + use api::v1::{column, Column, ColumnDataType}; use catalog::remote::MetaKvBackend; use common_recordbatch::util; use datafusion::arrow_print; @@ -970,8 +969,8 @@ mod test { start_ts: i64, ) { let rows = data.len() as u32; - let values = vec![InsertBatch { - columns: vec![ + let values = vec![( + vec![ Column { column_name: "ts".to_string(), values: Some(column::Values { @@ -1001,10 +1000,8 @@ mod test { ..Default::default() }, ], - row_count: rows, - } - .into()]; - let values = insert_expr::Values { values }; + rows, + )]; dn_instance .execute_grpc_insert( &table_name.catalog_name, diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index ceb6780e13..409632474f 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -16,10 +16,8 @@ use std::collections::HashMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; -use api::v1::codec::InsertBatch; use api::v1::column::SemanticType; -use api::v1::insert_expr::Expr; -use api::v1::{codec, insert_expr, Column, InsertExpr, MutateResult}; +use api::v1::{Column, InsertExpr, MutateResult}; use client::{Database, ObjectResult}; use datatypes::prelude::ConcreteDataType; use snafu::{ensure, OptionExt, ResultExt}; @@ -84,7 +82,7 @@ impl DistTable { } } -pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result { +pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec, u32)> { let mut row_count = None; let columns = insert @@ -127,24 +125,20 @@ pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result>>()?; - let insert_batch = codec::InsertBatch { - columns, - row_count: row_count.map(|rows| rows as u32).unwrap_or(0), - }; - Ok(insert_batch) + let row_count = row_count.unwrap_or(0) as u32; + + Ok((columns, row_count)) } fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result { let table_name = insert.table_name.clone(); - let insert_batch = insert_request_to_insert_batch(&insert)?; + let (columns, row_count) = insert_request_to_insert_batch(&insert)?; Ok(InsertExpr { schema_name: insert.schema_name, table_name, - expr: Some(Expr::Values(insert_expr::Values { - values: vec![insert_batch.into()], - })), region_number, - options: Default::default(), + columns, + row_count, }) } @@ -152,8 +146,6 @@ fn to_insert_expr(region_number: RegionNumber, insert: InsertRequest) -> Result< mod tests { use std::collections::HashMap; - use api::v1::codec::InsertBatch; - use api::v1::insert_expr::Expr; use api::v1::{ColumnDataType, InsertExpr}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ConcreteDataType; @@ -199,16 +191,7 @@ mod tests { let table_name = insert_expr.table_name; assert_eq!("demo", table_name); - let expr = insert_expr.expr.as_ref().unwrap(); - let vals = match expr { - Expr::Values(vals) => vals, - Expr::Sql(_) => unreachable!(), - }; - - let batch: &[u8] = vals.values[0].as_ref(); - let vals: InsertBatch = batch.try_into().unwrap(); - - for column in vals.columns { + for column in insert_expr.columns { let name = column.column_name; if name == "id" { assert_eq!(0, column.null_mask[0]); diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index df1835efcd..0766d65843 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; -use api::v1::insert_expr::{self, Expr}; use api::v1::InsertExpr; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; @@ -165,14 +164,15 @@ impl TryFrom<&InfluxdbRequest> for Vec { Ok(writers .into_iter() - .map(|(table_name, writer)| InsertExpr { - schema_name: schema_name.clone(), - table_name, - expr: Some(Expr::Values(insert_expr::Values { - values: vec![writer.finish().into()], - })), - options: HashMap::default(), - region_number: 0, + .map(|(table_name, writer)| { + let (columns, row_count) = writer.finish(); + InsertExpr { + schema_name: schema_name.clone(), + table_name, + region_number: 0, + columns, + row_count, + } }) .collect()) } @@ -180,12 +180,9 @@ impl TryFrom<&InfluxdbRequest> for Vec { #[cfg(test)] mod tests { - use std::ops::Deref; use std::sync::Arc; - use api::v1::codec::InsertBatch; use api::v1::column::{SemanticType, Values}; - use api::v1::insert_expr::Expr; use api::v1::{Column, ColumnDataType, InsertExpr}; use common_base::BitVec; use common_time::timestamp::TimeUnit; @@ -242,15 +239,9 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; for expr in insert_exprs { assert_eq!("public", expr.schema_name); - let values = match expr.expr.unwrap() { - Expr::Values(vals) => vals, - Expr::Sql(_) => panic!(), - }; - let raw_batch = values.values.get(0).unwrap(); - let batch: InsertBatch = raw_batch.deref().try_into().unwrap(); match &expr.table_name[..] { - "monitor1" => assert_monitor_1(&batch), - "monitor2" => assert_monitor_2(&batch), + "monitor1" => assert_monitor_1(&expr.columns), + "monitor2" => assert_monitor_2(&expr.columns), _ => panic!(), } } @@ -327,8 +318,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; } } - fn assert_monitor_1(insert_batch: &InsertBatch) { - let columns = &insert_batch.columns; + fn assert_monitor_1(columns: &[Column]) { assert_eq!(4, columns.len()); verify_column( &columns[0], @@ -379,8 +369,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; ); } - fn assert_monitor_2(insert_batch: &InsertBatch) { - let columns = &insert_batch.columns; + fn assert_monitor_2(columns: &[Column]) { assert_eq!(4, columns.len()); verify_column( &columns[0], diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 2253fd7e63..260a206fe5 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - -use api::v1::codec::InsertBatch; use api::v1::column::SemanticType; -use api::v1::{column, insert_expr, Column, ColumnDataType, InsertExpr}; +use api::v1::{column, Column, ColumnDataType, InsertExpr}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_grpc::writer::Precision; use table::requests::InsertRequest; @@ -189,18 +186,12 @@ impl DataPoint { }); } - let batch = InsertBatch { - columns, - row_count: 1, - }; InsertExpr { schema_name, table_name: self.metric.clone(), - expr: Some(insert_expr::Expr::Values(insert_expr::Values { - values: vec![batch.into()], - })), - options: HashMap::default(), region_number: 0, + columns, + row_count: 1, } } @@ -337,36 +328,31 @@ mod test { let grpc_insert = data_point.as_grpc_insert(); assert_eq!(grpc_insert.table_name, "my_metric_1"); - match grpc_insert.expr { - Some(insert_expr::Expr::Values(insert_expr::Values { values })) => { - assert_eq!(values.len(), 1); - let insert_batch = InsertBatch::try_from(values[0].as_slice()).unwrap(); - assert_eq!(insert_batch.row_count, 1); - let columns = insert_batch.columns; - assert_eq!(columns.len(), 4); + let columns = &grpc_insert.columns; + let row_count = grpc_insert.row_count; - assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, - vec![1000] - ); + assert_eq!(row_count, 1); + assert_eq!(columns.len(), 4); - assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME); - assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]); + assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[0].values.as_ref().unwrap().ts_millis_values, + vec![1000] + ); - assert_eq!(columns[2].column_name, "tagk1"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["tagv1"] - ); + assert_eq!(columns[1].column_name, OPENTSDB_VALUE_COLUMN_NAME); + assert_eq!(columns[1].values.as_ref().unwrap().f64_values, vec![1.0]); - assert_eq!(columns[3].column_name, "tagk2"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, - vec!["tagv2"] - ); - } - _ => unreachable!(), - } + assert_eq!(columns[2].column_name, "tagk1"); + assert_eq!( + columns[2].values.as_ref().unwrap().string_values, + vec!["tagv1"] + ); + + assert_eq!(columns[3].column_name, "tagk2"); + assert_eq!( + columns[3].values.as_ref().unwrap().string_values, + vec!["tagv2"] + ); } } diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 48045b066e..1c2b035ec0 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -14,14 +14,14 @@ //! prometheus protocol supportings use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::hash::{Hash, Hasher}; use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; -use api::v1::codec::{InsertBatch, SelectResult}; +use api::v1::codec::SelectResult; use api::v1::column::SemanticType; -use api::v1::{column, insert_expr, Column, ColumnDataType, InsertExpr}; +use api::v1::{column, Column, ColumnDataType, InsertExpr}; use common_grpc::writer::Precision::MILLISECOND; use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; use snafu::{OptionExt, ResultExt}; @@ -413,21 +413,14 @@ fn timeseries_to_insert_expr(database: &str, mut timeseries: TimeSeries) -> Resu }); } - let batch = InsertBatch { - columns, - row_count: row_count as u32, - }; Ok(InsertExpr { schema_name, table_name: table_name.context(error::InvalidPromRemoteRequestSnafu { msg: "missing '__name__' label in timeseries", })?, - - expr: Some(insert_expr::Expr::Values(insert_expr::Values { - values: vec![batch.into()], - })), - options: HashMap::default(), region_number: 0, + columns, + row_count: row_count as u32, }) } @@ -683,105 +676,93 @@ mod tests { assert_eq!("metric2", exprs[1].table_name); assert_eq!("metric3", exprs[2].table_name); - let values = exprs[0].clone().expr.unwrap(); - match values { - insert_expr::Expr::Values(insert_expr::Values { values }) => { - assert_eq!(1, values.len()); - let batch = InsertBatch::try_from(values[0].as_slice()).unwrap(); - assert_eq!(2, batch.row_count); - let columns = batch.columns; - assert_eq!(columns.len(), 3); + let expr = exprs.get(0).unwrap(); - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, - vec![1000, 2000] - ); + let columns = &expr.columns; + let row_count = expr.row_count; - assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, - vec![1.0, 2.0] - ); + assert_eq!(2, row_count); + assert_eq!(columns.len(), 3); - assert_eq!(columns[2].column_name, "job"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["spark", "spark"] - ); - } - _ => unreachable!(), - } + assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[0].values.as_ref().unwrap().ts_millis_values, + vec![1000, 2000] + ); - let values = exprs[1].clone().expr.unwrap(); - match values { - insert_expr::Expr::Values(insert_expr::Values { values }) => { - assert_eq!(1, values.len()); - let batch = InsertBatch::try_from(values[0].as_slice()).unwrap(); - assert_eq!(2, batch.row_count); - let columns = batch.columns; - assert_eq!(columns.len(), 4); + assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); + assert_eq!( + columns[1].values.as_ref().unwrap().f64_values, + vec![1.0, 2.0] + ); - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, - vec![1000, 2000] - ); + assert_eq!(columns[2].column_name, "job"); + assert_eq!( + columns[2].values.as_ref().unwrap().string_values, + vec!["spark", "spark"] + ); - assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, - vec![3.0, 4.0] - ); + let expr = exprs.get(1).unwrap(); - assert_eq!(columns[2].column_name, "instance"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["test_host1", "test_host1"] - ); - assert_eq!(columns[3].column_name, "idc"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, - vec!["z001", "z001"] - ); - } - _ => unreachable!(), - } + let columns = &expr.columns; + let row_count = expr.row_count; - let values = exprs[2].clone().expr.unwrap(); - match values { - insert_expr::Expr::Values(insert_expr::Values { values }) => { - assert_eq!(1, values.len()); - let batch = InsertBatch::try_from(values[0].as_slice()).unwrap(); - assert_eq!(3, batch.row_count); - let columns = batch.columns; - assert_eq!(columns.len(), 4); + assert_eq!(2, row_count); + assert_eq!(columns.len(), 4); - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millis_values, - vec![1000, 2000, 3000] - ); + assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[0].values.as_ref().unwrap().ts_millis_values, + vec![1000, 2000] + ); - assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, - vec![5.0, 6.0, 7.0] - ); + assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); + assert_eq!( + columns[1].values.as_ref().unwrap().f64_values, + vec![3.0, 4.0] + ); - assert_eq!(columns[2].column_name, "idc"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["z002", "z002", "z002"] - ); - assert_eq!(columns[3].column_name, "app"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, - vec!["biz", "biz", "biz"] - ); - } - _ => unreachable!(), - } + assert_eq!(columns[2].column_name, "instance"); + assert_eq!( + columns[2].values.as_ref().unwrap().string_values, + vec!["test_host1", "test_host1"] + ); + assert_eq!(columns[3].column_name, "idc"); + assert_eq!( + columns[3].values.as_ref().unwrap().string_values, + vec!["z001", "z001"] + ); + + let expr = exprs.get(2).unwrap(); + + let columns = &expr.columns; + let row_count = expr.row_count; + + assert_eq!(3, row_count); + assert_eq!(columns.len(), 4); + + assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); + assert_eq!( + columns[0].values.as_ref().unwrap().ts_millis_values, + vec![1000, 2000, 3000] + ); + + assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME); + assert_eq!( + columns[1].values.as_ref().unwrap().f64_values, + vec![5.0, 6.0, 7.0] + ); + + assert_eq!(columns[2].column_name, "idc"); + assert_eq!( + columns[2].values.as_ref().unwrap().string_values, + vec!["z002", "z002", "z002"] + ); + assert_eq!(columns[3].column_name, "app"); + assert_eq!( + columns[3].values.as_ref().unwrap().string_values, + vec!["biz", "biz", "biz"] + ); } #[test]