From 3b2716ed7063aabe2f3a6f1da32a54e1ac2f12cd Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Thu, 28 Jul 2022 10:25:22 +0800 Subject: [PATCH] feat: impl insert via grpc (#102) * fix: build protobuf * feat: impl grpc insert * Add an example of grpc insert * fix: cargo clippy * cr --- Cargo.lock | 1 + src/api/greptime/v1/greptime.proto | 1 + src/api/greptime/v1/insert.proto | 6 +- src/api/src/convert.rs | 89 +++++ src/api/src/lib.rs | 1 + src/client/Cargo.toml | 3 + src/client/examples/insert.rs | 76 +++++ src/datanode/src/error.rs | 11 +- src/datanode/src/instance.rs | 31 +- src/datanode/src/server/grpc.rs | 7 +- src/datanode/src/server/grpc/handler.rs | 60 +++- src/datanode/src/server/grpc/insert.rs | 420 ++++++++++++++++++++++++ src/datanode/src/server/grpc/server.rs | 2 + 13 files changed, 694 insertions(+), 14 deletions(-) create mode 100644 src/api/src/convert.rs create mode 100644 src/client/examples/insert.rs create mode 100644 src/datanode/src/server/grpc/insert.rs diff --git a/Cargo.lock b/Cargo.lock index 97aed491ec..610c342ce4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,6 +608,7 @@ dependencies = [ "api", "common-error", "snafu", + "tokio", "tonic", ] diff --git a/src/api/greptime/v1/greptime.proto b/src/api/greptime/v1/greptime.proto index bd7659046d..9e9408a8a1 100644 --- a/src/api/greptime/v1/greptime.proto +++ b/src/api/greptime/v1/greptime.proto @@ -4,6 +4,7 @@ package greptime.v1; import "greptime/v1/admin.proto"; import "greptime/v1/database.proto"; +import "greptime/v1/insert.proto"; service Greptime { rpc Batch(BatchRequest) returns (BatchResponse) {} diff --git a/src/api/greptime/v1/insert.proto b/src/api/greptime/v1/insert.proto index 565482bae9..47ec9c52f5 100644 --- a/src/api/greptime/v1/insert.proto +++ b/src/api/greptime/v1/insert.proto @@ -1,3 +1,7 @@ +syntax = "proto3"; + +package greptime.v1; + message InsertBatch { repeated Column columns = 1; uint32 row_count = 2; @@ -28,7 +32,7 @@ message Column { repeated double f64_values = 10; repeated bool bool_values = 11; - repeated bytes bytes_values = 12; + repeated bytes binary_values = 12; repeated string string_values = 13; } // The array of non-null values in this column. diff --git a/src/api/src/convert.rs b/src/api/src/convert.rs new file mode 100644 index 0000000000..30dbca66af --- /dev/null +++ b/src/api/src/convert.rs @@ -0,0 +1,89 @@ +pub use prost::DecodeError; +use prost::Message; + +use crate::v1::InsertBatch; + +impl From for Vec { + fn from(insert: InsertBatch) -> Self { + insert.encode_length_delimited_to_vec() + } +} + +impl TryFrom> for InsertBatch { + type Error = DecodeError; + + fn try_from(value: Vec) -> Result { + InsertBatch::decode_length_delimited(value.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use crate::v1::*; + + const SEMANTIC_TAG: i32 = 0; + + #[test] + fn test_convert_insert_batch() { + let insert_batch = mock_insert_batch(); + + let bytes: Vec = insert_batch.into(); + let insert: InsertBatch = bytes.try_into().unwrap(); + + assert_eq!(8, insert.row_count); + assert_eq!(1, insert.columns.len()); + + let column = &insert.columns[0]; + assert_eq!("foo", column.column_name); + assert_eq!(SEMANTIC_TAG, column.semantic_type); + assert_eq!(vec![1], column.null_mask); + assert_eq!( + vec![2, 3, 4, 5, 6, 7, 8], + column.values.as_ref().unwrap().i32_values + ); + } + + #[should_panic] + #[test] + fn test_convert_insert_batch_wrong() { + let insert_batch = mock_insert_batch(); + + let mut bytes: Vec = insert_batch.into(); + + // modify some bytes + bytes[0] = 0b1; + bytes[1] = 0b1; + + let insert: InsertBatch = bytes.try_into().unwrap(); + + assert_eq!(8, insert.row_count); + assert_eq!(1, insert.columns.len()); + + let column = &insert.columns[0]; + assert_eq!("foo", column.column_name); + assert_eq!(SEMANTIC_TAG, column.semantic_type); + assert_eq!(vec![1], column.null_mask); + assert_eq!( + vec![2, 3, 4, 5, 6, 7, 8], + column.values.as_ref().unwrap().i32_values + ); + } + + fn mock_insert_batch() -> InsertBatch { + let values = column::Values { + i32_values: vec![2, 3, 4, 5, 6, 7, 8], + ..Default::default() + }; + let null_mask = vec![1]; + let column = Column { + column_name: "foo".to_string(), + semantic_type: SEMANTIC_TAG, + values: Some(values), + null_mask, + }; + InsertBatch { + columns: vec![column], + row_count: 8, + } + } +} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index a3a6d96c3f..cc4fb22275 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1 +1,2 @@ +pub mod convert; pub mod v1; diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 3ad86d4e52..8e789ec744 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -10,3 +10,6 @@ api = { path = "../api" } common-error = { path = "../common/error" } snafu = { version = "0.7", features = ["backtraces"] } tonic = "0.7" + +[dev-dependencies] +tokio = { version = "1.0", features = ["full"] } diff --git a/src/client/examples/insert.rs b/src/client/examples/insert.rs new file mode 100644 index 0000000000..093a0c9bed --- /dev/null +++ b/src/client/examples/insert.rs @@ -0,0 +1,76 @@ +use api::v1::*; +use client::{Client, Database}; + +#[tokio::main] +async fn main() { + let url = "http://127.0.0.1:3001"; + let db_name = "db"; + let table_name = "demo"; + + let client = Client::connect(url).await.unwrap(); + let db = Database::new(db_name, client); + db.insert(table_name, insert_batches()).await.unwrap(); +} + +fn insert_batches() -> Vec> { + const SEMANTIC_TAG: i32 = 0; + const SEMANTIC_FEILD: i32 = 1; + const SEMANTIC_TS: i32 = 2; + + let row_count = 4; + + let host_vals = column::Values { + string_values: vec![ + "host1".to_string(), + "host2".to_string(), + "host3".to_string(), + "host4".to_string(), + ], + ..Default::default() + }; + let host_column = Column { + column_name: "host".to_string(), + semantic_type: SEMANTIC_TAG, + values: Some(host_vals), + null_mask: vec![0], + }; + + let cpu_vals = column::Values { + f64_values: vec![0.31, 0.41, 0.2], + ..Default::default() + }; + let cpu_column = Column { + column_name: "cpu".to_string(), + semantic_type: SEMANTIC_FEILD, + values: Some(cpu_vals), + null_mask: vec![2], + }; + + let mem_vals = column::Values { + f64_values: vec![0.1, 0.2, 0.3], + ..Default::default() + }; + let mem_column = Column { + column_name: "memory".to_string(), + semantic_type: SEMANTIC_FEILD, + values: Some(mem_vals), + null_mask: vec![4], + }; + + let ts_vals = column::Values { + i64_values: vec![100, 101, 102, 103], + ..Default::default() + }; + let ts_column = Column { + column_name: "ts".to_string(), + semantic_type: SEMANTIC_TS, + values: Some(ts_vals), + null_mask: vec![0], + }; + + let insert_batch = InsertBatch { + columns: vec![host_column, cpu_column, mem_column, ts_column], + row_count, + }; + vec![insert_batch.into()] +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 254d545812..42f29bb7a9 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,5 +1,6 @@ use std::any::Any; +use api::convert::DecodeError; use common_error::ext::BoxedError; use common_error::prelude::*; use datatypes::prelude::ConcreteDataType; @@ -74,6 +75,12 @@ pub enum Error { source: TableError, }, + #[snafu(display("Illegal insert data"))] + IllegalInsertData, + + #[snafu(display("Fail to convert bytes to insert batch, {}", source))] + DecodeInsert { source: DecodeError }, + // The error source of http error is clear even without backtrace now so // a backtrace is not carried in this varaint. #[snafu(display("Fail to start HTTP server, source: {}", source))] @@ -117,7 +124,9 @@ impl ErrorExt for Error { Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, Error::ColumnValuesNumberMismatch { .. } | Error::ParseSqlValue { .. } - | Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, + | Error::ColumnTypeMismatch { .. } + | Error::IllegalInsertData { .. } + | Error::DecodeInsert { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. Error::StartHttp { .. } | Error::ParseAddr { .. } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index b9dacd5622..8dea3c2d3a 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -1,12 +1,13 @@ use std::{fs, path, sync::Arc}; +use api::v1::InsertExpr; use common_telemetry::logging::info; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use log_store::fs::{config::LogConfig, log::LocalFileLogStore}; use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; use storage::{config::EngineConfig, EngineImpl}; use table::engine::EngineContext; @@ -15,7 +16,10 @@ use table::requests::CreateTableRequest; use table_engine::engine::MitoEngine; use crate::datanode::DatanodeOptions; -use crate::error::{self, CreateTableSnafu, ExecuteSqlSnafu, Result}; +use crate::error::{ + self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu, +}; +use crate::server::grpc::insert::insertion_expr_to_request; use crate::sql::SqlHandler; type DefaultEngine = MitoEngine>; @@ -51,6 +55,29 @@ impl Instance { }) } + pub async fn execute_grpc_insert(&self, insert_expr: InsertExpr) -> Result { + let schema_provider = self + .catalog_list + .catalog(DEFAULT_CATALOG_NAME) + .unwrap() + .schema(DEFAULT_SCHEMA_NAME) + .unwrap(); + + let table_name = &insert_expr.table_name.clone(); + let table = schema_provider + .table(table_name) + .context(TableNotFoundSnafu { table_name })?; + + let insert = insertion_expr_to_request(insert_expr, table.clone())?; + + let affected_rows = table + .insert(insert) + .await + .context(InsertSnafu { table_name })?; + + Ok(Output::AffectedRows(affected_rows)) + } + pub async fn execute_sql(&self, sql: &str) -> Result { let stmt = self .query_engine diff --git a/src/datanode/src/server/grpc.rs b/src/datanode/src/server/grpc.rs index 064745234c..0b6d9e5b1e 100644 --- a/src/datanode/src/server/grpc.rs +++ b/src/datanode/src/server/grpc.rs @@ -1,3 +1,7 @@ +mod handler; +pub mod insert; +mod server; + use common_telemetry::logging::info; use snafu::ResultExt; use tokio::net::TcpListener; @@ -9,9 +13,6 @@ use crate::{ server::grpc::{handler::BatchHandler, server::Server}, }; -mod handler; -mod server; - pub struct GrpcServer { handler: BatchHandler, } diff --git a/src/datanode/src/server/grpc/handler.rs b/src/datanode/src/server/grpc/handler.rs index 2eaa75a05b..30b1a196ca 100644 --- a/src/datanode/src/server/grpc/handler.rs +++ b/src/datanode/src/server/grpc/handler.rs @@ -1,18 +1,64 @@ use api::v1::*; +use query::Output; +use crate::server::grpc::server::PROTOCOL_VERSION; use crate::{error::Result, instance::InstanceRef}; #[derive(Clone)] -pub struct BatchHandler {} +pub struct BatchHandler { + instance: InstanceRef, +} impl BatchHandler { - pub fn new(_instance: InstanceRef) -> Self { - Self {} + pub fn new(instance: InstanceRef) -> Self { + Self { instance } } - pub async fn batch(&self, mut batch_req: BatchRequest) -> Result { - let batch_res = BatchResponse::default(); - let _databases = std::mem::take(&mut batch_req.databases); - Ok(batch_res) + pub async fn batch(&self, batch_req: BatchRequest) -> Result { + let mut batch_resp = BatchResponse::default(); + let mut db_resp = DatabaseResponse::default(); + let databases = batch_req.databases; + + for req in databases { + let exprs = req.exprs; + + for obj_expr in exprs { + let mut object_resp = ObjectResult::default(); + + match obj_expr.expr { + Some(object_expr::Expr::Insert(insert_expr)) => { + match self.instance.execute_grpc_insert(insert_expr).await { + Ok(Output::AffectedRows(rows)) => { + object_resp.header = Some(ResultHeader { + version: PROTOCOL_VERSION, + // TODO(fys): Only one success code (200) was provided + // in the early stage and we will refine it later + code: 200, + success: rows as u32, + ..Default::default() + }); + } + Err(err) => { + object_resp.header = Some(ResultHeader { + version: PROTOCOL_VERSION, + // TODO(fys): Only one error code (500) was provided + // in the early stage and we will refine it later + code: 500, + err_msg: err.to_string(), + // TODO(fys): failure count + ..Default::default() + }) + } + _ => unreachable!(), + } + } + _ => unimplemented!(), + } + + db_resp.results.push(object_resp); + } + } + batch_resp.databases.push(db_resp); + Ok(batch_resp) } } diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs new file mode 100644 index 0000000000..30a9e7a0b4 --- /dev/null +++ b/src/datanode/src/server/grpc/insert.rs @@ -0,0 +1,420 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + sync::Arc, +}; + +use api::v1::{column::Values, Column, InsertBatch, InsertExpr}; +use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder}; +use snafu::{ensure, OptionExt, ResultExt}; +use table::{requests::InsertRequest, Table}; + +use crate::error::{ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result}; + +pub fn insertion_expr_to_request( + insert: InsertExpr, + table: Arc, +) -> Result { + let schema = table.schema(); + let table_name = &insert.table_name; + let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len()); + let insert_batches = insert_batches(insert.values)?; + + for InsertBatch { columns, row_count } in insert_batches { + for Column { + column_name, + values, + null_mask, + .. + } in columns + { + let values = match values { + Some(vals) => vals, + None => continue, + }; + + let column = column_name.clone(); + let vector_builder = match columns_builders.entry(column) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(entry) => { + let column_schema = schema.column_schema_by_name(&column_name).context( + ColumnNotFoundSnafu { + column_name: &column_name, + table_name, + }, + )?; + let data_type = &column_schema.data_type; + entry.insert(VectorBuilder::with_capacity( + data_type.clone(), + row_count as usize, + )) + } + }; + + add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?; + } + } + let columns_values = columns_builders + .into_iter() + .map(|(column_name, mut vector_builder)| (column_name, vector_builder.finish())) + .collect(); + + Ok(InsertRequest { + table_name: table_name.to_string(), + columns_values, + }) +} + +fn insert_batches(bytes_vec: Vec>) -> Result> { + let mut insert_batches = Vec::with_capacity(bytes_vec.len()); + + for bytes in bytes_vec { + insert_batches.push(bytes.try_into().context(DecodeInsertSnafu)?); + } + Ok(insert_batches) +} + +fn add_values_to_builder( + builder: &mut VectorBuilder, + values: Values, + row_count: usize, + null_mask: impl Into, +) -> Result<()> { + let data_type = builder.data_type(); + let null_mask = null_mask.into(); + let values = convert_values(&data_type, values); + + if null_mask.len() == 0 { + ensure!(values.len() == row_count, IllegalInsertDataSnafu); + + values.iter().for_each(|value| { + builder.push(value); + }); + } else { + ensure!( + null_mask.set_count() + values.len() == row_count, + IllegalInsertDataSnafu + ); + + let mut idx_of_values = 0; + for idx in 0..row_count { + if is_null(&null_mask, idx) { + builder.push(&Value::Null); + } else { + builder.push(&values[idx_of_values]); + idx_of_values += 1; + } + } + } + + Ok(()) +} + +fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec { + // TODO(fys): use macros to optimize code + match data_type { + ConcreteDataType::Int64(_) => values + .i64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Float64(_) => values + .f64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::String(_) => values + .string_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Boolean(_) => values + .bool_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Int8(_) => values.i8_values.into_iter().map(|val| val.into()).collect(), + ConcreteDataType::Int16(_) => values + .i16_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Int32(_) => values + .i32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::UInt8(_) => values.u8_values.into_iter().map(|val| val.into()).collect(), + ConcreteDataType::UInt16(_) => values + .u16_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::UInt32(_) => values + .u32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::UInt64(_) => values + .u64_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Float32(_) => values + .f32_values + .into_iter() + .map(|val| val.into()) + .collect(), + ConcreteDataType::Binary(_) => values + .binary_values + .into_iter() + .map(|val| val.into()) + .collect(), + _ => unimplemented!(), + } +} + +fn is_null(null_mask: &BitSet, idx: usize) -> bool { + debug_assert!(idx < null_mask.len, "idx should be less than null_mask.len"); + + matches!(null_mask.get_bit(idx), Some(true)) +} + +// TOOD(fys): move BitSet to better location +struct BitSet { + buffer: Vec, + len: usize, +} + +impl BitSet { + fn len(&self) -> usize { + self.len + } + + fn set_count(&self) -> usize { + (0..self.len) + .into_iter() + .filter(|&i| matches!(self.get_bit(i), Some(true))) + .count() + } + + fn get_bit(&self, idx: usize) -> Option { + if idx >= self.len { + return None; + } + + let byte_idx = idx >> 3; + let bit_idx = idx & 7; + Some((self.buffer[byte_idx] >> bit_idx) & 1 != 0) + } +} + +impl From> for BitSet { + fn from(data: Vec) -> Self { + BitSet { + len: data.len() << 3, + buffer: data, + } + } +} + +impl From<&[u8]> for BitSet { + fn from(data: &[u8]) -> Self { + BitSet { + buffer: data.into(), + len: data.len() << 3, + } + } +} + +#[cfg(test)] +mod tests { + use std::{any::Any, sync::Arc}; + + use api::v1::{ + column::{self, Values}, + Column, InsertBatch, InsertExpr, + }; + use common_query::prelude::Expr; + use common_recordbatch::SendableRecordBatchStream; + use datatypes::{ + data_type::ConcreteDataType, + schema::{ColumnSchema, Schema, SchemaRef}, + value::Value, + }; + use table::error::Result as TableResult; + use table::Table; + + use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null, BitSet}; + + #[test] + fn test_insertion_expr_to_request() { + let insert_expr = InsertExpr { + table_name: "demo".to_string(), + values: mock_insert_batches(), + }; + let table: Arc = Arc::new(DemoTable {}); + + let insert_req = insertion_expr_to_request(insert_expr, table).unwrap(); + + assert_eq!("demo", insert_req.table_name); + + let host = insert_req.columns_values.get("host").unwrap(); + assert_eq!(Value::String("host1".into()), host.get(0)); + assert_eq!(Value::String("host2".into()), host.get(1)); + + let cpu = insert_req.columns_values.get("cpu").unwrap(); + assert_eq!(Value::Float64(0.31.into()), cpu.get(0)); + assert_eq!(Value::Null, cpu.get(1)); + + let memory = insert_req.columns_values.get("memory").unwrap(); + assert_eq!(Value::Null, memory.get(0)); + assert_eq!(Value::Float64(0.1.into()), memory.get(1)); + + let ts = insert_req.columns_values.get("ts").unwrap(); + assert_eq!(Value::Int64(100), ts.get(0)); + assert_eq!(Value::Int64(101), ts.get(1)); + } + + #[test] + fn test_convert_values() { + let data_type = ConcreteDataType::float64_datatype(); + let values = Values { + f64_values: vec![0.1, 0.2, 0.3], + ..Default::default() + }; + + let result = convert_values(&data_type, values); + + assert_eq!( + vec![ + Value::Float64(0.1.into()), + Value::Float64(0.2.into()), + Value::Float64(0.3.into()) + ], + result + ); + } + + #[test] + fn test_is_null() { + let null_mask: BitSet = vec![0b0000_0001, 0b0000_1000].into(); + + assert!(is_null(&null_mask, 0)); + assert!(!is_null(&null_mask, 1)); + assert!(!is_null(&null_mask, 10)); + assert!(is_null(&null_mask, 11)); + assert!(!is_null(&null_mask, 12)); + } + + #[test] + fn test_bit_set() { + let bit_set: BitSet = vec![0b0000_0001, 0b0000_1000].into(); + + assert!(bit_set.get_bit(0).unwrap()); + assert!(!bit_set.get_bit(1).unwrap()); + assert!(!bit_set.get_bit(10).unwrap()); + assert!(bit_set.get_bit(11).unwrap()); + assert!(!bit_set.get_bit(12).unwrap()); + + assert!(bit_set.get_bit(16).is_none()); + + assert_eq!(2, bit_set.set_count()); + assert_eq!(16, bit_set.len()); + + let bit_set: BitSet = vec![0b0000_0000, 0b0000_0000].into(); + assert_eq!(0, bit_set.set_count()); + assert_eq!(16, bit_set.len()); + + let bit_set: BitSet = vec![0b1111_1111, 0b1111_1111].into(); + assert_eq!(16, bit_set.set_count()); + + let bit_set: BitSet = vec![].into(); + assert_eq!(0, bit_set.len()); + } + + struct DemoTable; + + #[async_trait::async_trait] + impl Table for DemoTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + let column_schemas = vec![ + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true), + ]; + + Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap()) + } + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> TableResult { + unimplemented!(); + } + } + + fn mock_insert_batches() -> Vec> { + const SEMANTIC_TAG: i32 = 0; + const SEMANTIC_FEILD: i32 = 1; + const SEMANTIC_TS: i32 = 2; + + let row_count = 2; + + let host_vals = column::Values { + string_values: vec!["host1".to_string(), "host2".to_string()], + ..Default::default() + }; + let host_column = Column { + column_name: "host".to_string(), + semantic_type: SEMANTIC_TAG, + values: Some(host_vals), + null_mask: vec![0], + }; + + let cpu_vals = column::Values { + f64_values: vec![0.31], + ..Default::default() + }; + let cpu_column = Column { + column_name: "cpu".to_string(), + semantic_type: SEMANTIC_FEILD, + values: Some(cpu_vals), + null_mask: vec![2], + }; + + let mem_vals = column::Values { + f64_values: vec![0.1], + ..Default::default() + }; + let mem_column = Column { + column_name: "memory".to_string(), + semantic_type: SEMANTIC_FEILD, + values: Some(mem_vals), + null_mask: vec![1], + }; + + let ts_vals = column::Values { + i64_values: vec![100, 101], + ..Default::default() + }; + let ts_column = Column { + column_name: "ts".to_string(), + semantic_type: SEMANTIC_TS, + values: Some(ts_vals), + null_mask: vec![0], + }; + + let insert_batch = InsertBatch { + columns: vec![host_column, cpu_column, mem_column, ts_column], + row_count, + }; + vec![insert_batch.into()] + } +} diff --git a/src/datanode/src/server/grpc/server.rs b/src/datanode/src/server/grpc/server.rs index 0661d15ebb..433b406505 100644 --- a/src/datanode/src/server/grpc/server.rs +++ b/src/datanode/src/server/grpc/server.rs @@ -3,6 +3,8 @@ use tonic::{Request, Response, Status}; use super::handler::BatchHandler; +pub const PROTOCOL_VERSION: u32 = 1; + #[derive(Clone)] pub struct Server { handler: BatchHandler,