feat: impl insert via grpc (#102)

* fix: build protobuf

* feat: impl grpc insert

* Add an example of grpc insert

* fix: cargo clippy

* cr
This commit is contained in:
fys
2022-07-28 10:25:22 +08:00
committed by GitHub
parent 3e42334b92
commit 3b2716ed70
13 changed files with 694 additions and 14 deletions

1
Cargo.lock generated
View File

@@ -608,6 +608,7 @@ dependencies = [
"api",
"common-error",
"snafu",
"tokio",
"tonic",
]

View File

@@ -4,6 +4,7 @@ package greptime.v1;
import "greptime/v1/admin.proto";
import "greptime/v1/database.proto";
import "greptime/v1/insert.proto";
service Greptime {
rpc Batch(BatchRequest) returns (BatchResponse) {}

View File

@@ -1,3 +1,7 @@
syntax = "proto3";
package greptime.v1;
message InsertBatch {
repeated Column columns = 1;
uint32 row_count = 2;
@@ -28,7 +32,7 @@ message Column {
repeated double f64_values = 10;
repeated bool bool_values = 11;
repeated bytes bytes_values = 12;
repeated bytes binary_values = 12;
repeated string string_values = 13;
}
// The array of non-null values in this column.

89
src/api/src/convert.rs Normal file
View File

@@ -0,0 +1,89 @@
pub use prost::DecodeError;
use prost::Message;
use crate::v1::InsertBatch;
impl From<InsertBatch> for Vec<u8> {
fn from(insert: InsertBatch) -> Self {
insert.encode_length_delimited_to_vec()
}
}
impl TryFrom<Vec<u8>> for InsertBatch {
type Error = DecodeError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
InsertBatch::decode_length_delimited(value.as_ref())
}
}
#[cfg(test)]
mod tests {
use crate::v1::*;
const SEMANTIC_TAG: i32 = 0;
#[test]
fn test_convert_insert_batch() {
let insert_batch = mock_insert_batch();
let bytes: Vec<u8> = insert_batch.into();
let insert: InsertBatch = bytes.try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
let column = &insert.columns[0];
assert_eq!("foo", column.column_name);
assert_eq!(SEMANTIC_TAG, column.semantic_type);
assert_eq!(vec![1], column.null_mask);
assert_eq!(
vec![2, 3, 4, 5, 6, 7, 8],
column.values.as_ref().unwrap().i32_values
);
}
#[should_panic]
#[test]
fn test_convert_insert_batch_wrong() {
let insert_batch = mock_insert_batch();
let mut bytes: Vec<u8> = insert_batch.into();
// modify some bytes
bytes[0] = 0b1;
bytes[1] = 0b1;
let insert: InsertBatch = bytes.try_into().unwrap();
assert_eq!(8, insert.row_count);
assert_eq!(1, insert.columns.len());
let column = &insert.columns[0];
assert_eq!("foo", column.column_name);
assert_eq!(SEMANTIC_TAG, column.semantic_type);
assert_eq!(vec![1], column.null_mask);
assert_eq!(
vec![2, 3, 4, 5, 6, 7, 8],
column.values.as_ref().unwrap().i32_values
);
}
fn mock_insert_batch() -> InsertBatch {
let values = column::Values {
i32_values: vec![2, 3, 4, 5, 6, 7, 8],
..Default::default()
};
let null_mask = vec![1];
let column = Column {
column_name: "foo".to_string(),
semantic_type: SEMANTIC_TAG,
values: Some(values),
null_mask,
};
InsertBatch {
columns: vec![column],
row_count: 8,
}
}
}

View File

@@ -1 +1,2 @@
pub mod convert;
pub mod v1;

View File

@@ -10,3 +10,6 @@ api = { path = "../api" }
common-error = { path = "../common/error" }
snafu = { version = "0.7", features = ["backtraces"] }
tonic = "0.7"
[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }

View File

@@ -0,0 +1,76 @@
use api::v1::*;
use client::{Client, Database};
#[tokio::main]
async fn main() {
let url = "http://127.0.0.1:3001";
let db_name = "db";
let table_name = "demo";
let client = Client::connect(url).await.unwrap();
let db = Database::new(db_name, client);
db.insert(table_name, insert_batches()).await.unwrap();
}
fn insert_batches() -> Vec<Vec<u8>> {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FEILD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
let row_count = 4;
let host_vals = column::Values {
string_values: vec![
"host1".to_string(),
"host2".to_string(),
"host3".to_string(),
"host4".to_string(),
],
..Default::default()
};
let host_column = Column {
column_name: "host".to_string(),
semantic_type: SEMANTIC_TAG,
values: Some(host_vals),
null_mask: vec![0],
};
let cpu_vals = column::Values {
f64_values: vec![0.31, 0.41, 0.2],
..Default::default()
};
let cpu_column = Column {
column_name: "cpu".to_string(),
semantic_type: SEMANTIC_FEILD,
values: Some(cpu_vals),
null_mask: vec![2],
};
let mem_vals = column::Values {
f64_values: vec![0.1, 0.2, 0.3],
..Default::default()
};
let mem_column = Column {
column_name: "memory".to_string(),
semantic_type: SEMANTIC_FEILD,
values: Some(mem_vals),
null_mask: vec![4],
};
let ts_vals = column::Values {
i64_values: vec![100, 101, 102, 103],
..Default::default()
};
let ts_column = Column {
column_name: "ts".to_string(),
semantic_type: SEMANTIC_TS,
values: Some(ts_vals),
null_mask: vec![0],
};
let insert_batch = InsertBatch {
columns: vec![host_column, cpu_column, mem_column, ts_column],
row_count,
};
vec![insert_batch.into()]
}

View File

@@ -1,5 +1,6 @@
use std::any::Any;
use api::convert::DecodeError;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
@@ -74,6 +75,12 @@ pub enum Error {
source: TableError,
},
#[snafu(display("Illegal insert data"))]
IllegalInsertData,
#[snafu(display("Fail to convert bytes to insert batch, {}", source))]
DecodeInsert { source: DecodeError },
// The error source of http error is clear even without backtrace now so
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
@@ -117,7 +124,9 @@ impl ErrorExt for Error {
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ColumnValuesNumberMismatch { .. }
| Error::ParseSqlValue { .. }
| Error::ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
| Error::ColumnTypeMismatch { .. }
| Error::IllegalInsertData { .. }
| Error::DecodeInsert { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. }
| Error::ParseAddr { .. }

View File

@@ -1,12 +1,13 @@
use std::{fs, path, sync::Arc};
use api::v1::InsertExpr;
use common_telemetry::logging::info;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use log_store::fs::{config::LogConfig, log::LocalFileLogStore};
use query::catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use sql::statements::statement::Statement;
use storage::{config::EngineConfig, EngineImpl};
use table::engine::EngineContext;
@@ -15,7 +16,10 @@ use table::requests::CreateTableRequest;
use table_engine::engine::MitoEngine;
use crate::datanode::DatanodeOptions;
use crate::error::{self, CreateTableSnafu, ExecuteSqlSnafu, Result};
use crate::error::{
self, CreateTableSnafu, ExecuteSqlSnafu, InsertSnafu, Result, TableNotFoundSnafu,
};
use crate::server::grpc::insert::insertion_expr_to_request;
use crate::sql::SqlHandler;
type DefaultEngine = MitoEngine<EngineImpl<LocalFileLogStore>>;
@@ -51,6 +55,29 @@ impl Instance {
})
}
pub async fn execute_grpc_insert(&self, insert_expr: InsertExpr) -> Result<Output> {
let schema_provider = self
.catalog_list
.catalog(DEFAULT_CATALOG_NAME)
.unwrap()
.schema(DEFAULT_SCHEMA_NAME)
.unwrap();
let table_name = &insert_expr.table_name.clone();
let table = schema_provider
.table(table_name)
.context(TableNotFoundSnafu { table_name })?;
let insert = insertion_expr_to_request(insert_expr, table.clone())?;
let affected_rows = table
.insert(insert)
.await
.context(InsertSnafu { table_name })?;
Ok(Output::AffectedRows(affected_rows))
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let stmt = self
.query_engine

View File

@@ -1,3 +1,7 @@
mod handler;
pub mod insert;
mod server;
use common_telemetry::logging::info;
use snafu::ResultExt;
use tokio::net::TcpListener;
@@ -9,9 +13,6 @@ use crate::{
server::grpc::{handler::BatchHandler, server::Server},
};
mod handler;
mod server;
pub struct GrpcServer {
handler: BatchHandler,
}

View File

@@ -1,18 +1,64 @@
use api::v1::*;
use query::Output;
use crate::server::grpc::server::PROTOCOL_VERSION;
use crate::{error::Result, instance::InstanceRef};
#[derive(Clone)]
pub struct BatchHandler {}
pub struct BatchHandler {
instance: InstanceRef,
}
impl BatchHandler {
pub fn new(_instance: InstanceRef) -> Self {
Self {}
pub fn new(instance: InstanceRef) -> Self {
Self { instance }
}
pub async fn batch(&self, mut batch_req: BatchRequest) -> Result<BatchResponse> {
let batch_res = BatchResponse::default();
let _databases = std::mem::take(&mut batch_req.databases);
Ok(batch_res)
pub async fn batch(&self, batch_req: BatchRequest) -> Result<BatchResponse> {
let mut batch_resp = BatchResponse::default();
let mut db_resp = DatabaseResponse::default();
let databases = batch_req.databases;
for req in databases {
let exprs = req.exprs;
for obj_expr in exprs {
let mut object_resp = ObjectResult::default();
match obj_expr.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
match self.instance.execute_grpc_insert(insert_expr).await {
Ok(Output::AffectedRows(rows)) => {
object_resp.header = Some(ResultHeader {
version: PROTOCOL_VERSION,
// TODO(fys): Only one success code (200) was provided
// in the early stage and we will refine it later
code: 200,
success: rows as u32,
..Default::default()
});
}
Err(err) => {
object_resp.header = Some(ResultHeader {
version: PROTOCOL_VERSION,
// TODO(fys): Only one error code (500) was provided
// in the early stage and we will refine it later
code: 500,
err_msg: err.to_string(),
// TODO(fys): failure count
..Default::default()
})
}
_ => unreachable!(),
}
}
_ => unimplemented!(),
}
db_resp.results.push(object_resp);
}
}
batch_resp.databases.push(db_resp);
Ok(batch_resp)
}
}

View File

@@ -0,0 +1,420 @@
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use api::v1::{column::Values, Column, InsertBatch, InsertExpr};
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
use table::{requests::InsertRequest, Table};
use crate::error::{ColumnNotFoundSnafu, DecodeInsertSnafu, IllegalInsertDataSnafu, Result};
pub fn insertion_expr_to_request(
insert: InsertExpr,
table: Arc<dyn Table>,
) -> Result<InsertRequest> {
let schema = table.schema();
let table_name = &insert.table_name;
let mut columns_builders = HashMap::with_capacity(schema.column_schemas().len());
let insert_batches = insert_batches(insert.values)?;
for InsertBatch { columns, row_count } in insert_batches {
for Column {
column_name,
values,
null_mask,
..
} in columns
{
let values = match values {
Some(vals) => vals,
None => continue,
};
let column = column_name.clone();
let vector_builder = match columns_builders.entry(column) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let column_schema = schema.column_schema_by_name(&column_name).context(
ColumnNotFoundSnafu {
column_name: &column_name,
table_name,
},
)?;
let data_type = &column_schema.data_type;
entry.insert(VectorBuilder::with_capacity(
data_type.clone(),
row_count as usize,
))
}
};
add_values_to_builder(vector_builder, values, row_count as usize, null_mask)?;
}
}
let columns_values = columns_builders
.into_iter()
.map(|(column_name, mut vector_builder)| (column_name, vector_builder.finish()))
.collect();
Ok(InsertRequest {
table_name: table_name.to_string(),
columns_values,
})
}
fn insert_batches(bytes_vec: Vec<Vec<u8>>) -> Result<Vec<InsertBatch>> {
let mut insert_batches = Vec::with_capacity(bytes_vec.len());
for bytes in bytes_vec {
insert_batches.push(bytes.try_into().context(DecodeInsertSnafu)?);
}
Ok(insert_batches)
}
fn add_values_to_builder(
builder: &mut VectorBuilder,
values: Values,
row_count: usize,
null_mask: impl Into<BitSet>,
) -> Result<()> {
let data_type = builder.data_type();
let null_mask = null_mask.into();
let values = convert_values(&data_type, values);
if null_mask.len() == 0 {
ensure!(values.len() == row_count, IllegalInsertDataSnafu);
values.iter().for_each(|value| {
builder.push(value);
});
} else {
ensure!(
null_mask.set_count() + values.len() == row_count,
IllegalInsertDataSnafu
);
let mut idx_of_values = 0;
for idx in 0..row_count {
if is_null(&null_mask, idx) {
builder.push(&Value::Null);
} else {
builder.push(&values[idx_of_values]);
idx_of_values += 1;
}
}
}
Ok(())
}
fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
// TODO(fys): use macros to optimize code
match data_type {
ConcreteDataType::Int64(_) => values
.i64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float64(_) => values
.f64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::String(_) => values
.string_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Boolean(_) => values
.bool_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Int8(_) => values.i8_values.into_iter().map(|val| val.into()).collect(),
ConcreteDataType::Int16(_) => values
.i16_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Int32(_) => values
.i32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt8(_) => values.u8_values.into_iter().map(|val| val.into()).collect(),
ConcreteDataType::UInt16(_) => values
.u16_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt32(_) => values
.u32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::UInt64(_) => values
.u64_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Float32(_) => values
.f32_values
.into_iter()
.map(|val| val.into())
.collect(),
ConcreteDataType::Binary(_) => values
.binary_values
.into_iter()
.map(|val| val.into())
.collect(),
_ => unimplemented!(),
}
}
fn is_null(null_mask: &BitSet, idx: usize) -> bool {
debug_assert!(idx < null_mask.len, "idx should be less than null_mask.len");
matches!(null_mask.get_bit(idx), Some(true))
}
// TOOD(fys): move BitSet to better location
struct BitSet {
buffer: Vec<u8>,
len: usize,
}
impl BitSet {
fn len(&self) -> usize {
self.len
}
fn set_count(&self) -> usize {
(0..self.len)
.into_iter()
.filter(|&i| matches!(self.get_bit(i), Some(true)))
.count()
}
fn get_bit(&self, idx: usize) -> Option<bool> {
if idx >= self.len {
return None;
}
let byte_idx = idx >> 3;
let bit_idx = idx & 7;
Some((self.buffer[byte_idx] >> bit_idx) & 1 != 0)
}
}
impl From<Vec<u8>> for BitSet {
fn from(data: Vec<u8>) -> Self {
BitSet {
len: data.len() << 3,
buffer: data,
}
}
}
impl From<&[u8]> for BitSet {
fn from(data: &[u8]) -> Self {
BitSet {
buffer: data.into(),
len: data.len() << 3,
}
}
}
#[cfg(test)]
mod tests {
use std::{any::Any, sync::Arc};
use api::v1::{
column::{self, Values},
Column, InsertBatch, InsertExpr,
};
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::{
data_type::ConcreteDataType,
schema::{ColumnSchema, Schema, SchemaRef},
value::Value,
};
use table::error::Result as TableResult;
use table::Table;
use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null, BitSet};
#[test]
fn test_insertion_expr_to_request() {
let insert_expr = InsertExpr {
table_name: "demo".to_string(),
values: mock_insert_batches(),
};
let table: Arc<dyn Table> = Arc::new(DemoTable {});
let insert_req = insertion_expr_to_request(insert_expr, table).unwrap();
assert_eq!("demo", insert_req.table_name);
let host = insert_req.columns_values.get("host").unwrap();
assert_eq!(Value::String("host1".into()), host.get(0));
assert_eq!(Value::String("host2".into()), host.get(1));
let cpu = insert_req.columns_values.get("cpu").unwrap();
assert_eq!(Value::Float64(0.31.into()), cpu.get(0));
assert_eq!(Value::Null, cpu.get(1));
let memory = insert_req.columns_values.get("memory").unwrap();
assert_eq!(Value::Null, memory.get(0));
assert_eq!(Value::Float64(0.1.into()), memory.get(1));
let ts = insert_req.columns_values.get("ts").unwrap();
assert_eq!(Value::Int64(100), ts.get(0));
assert_eq!(Value::Int64(101), ts.get(1));
}
#[test]
fn test_convert_values() {
let data_type = ConcreteDataType::float64_datatype();
let values = Values {
f64_values: vec![0.1, 0.2, 0.3],
..Default::default()
};
let result = convert_values(&data_type, values);
assert_eq!(
vec![
Value::Float64(0.1.into()),
Value::Float64(0.2.into()),
Value::Float64(0.3.into())
],
result
);
}
#[test]
fn test_is_null() {
let null_mask: BitSet = vec![0b0000_0001, 0b0000_1000].into();
assert!(is_null(&null_mask, 0));
assert!(!is_null(&null_mask, 1));
assert!(!is_null(&null_mask, 10));
assert!(is_null(&null_mask, 11));
assert!(!is_null(&null_mask, 12));
}
#[test]
fn test_bit_set() {
let bit_set: BitSet = vec![0b0000_0001, 0b0000_1000].into();
assert!(bit_set.get_bit(0).unwrap());
assert!(!bit_set.get_bit(1).unwrap());
assert!(!bit_set.get_bit(10).unwrap());
assert!(bit_set.get_bit(11).unwrap());
assert!(!bit_set.get_bit(12).unwrap());
assert!(bit_set.get_bit(16).is_none());
assert_eq!(2, bit_set.set_count());
assert_eq!(16, bit_set.len());
let bit_set: BitSet = vec![0b0000_0000, 0b0000_0000].into();
assert_eq!(0, bit_set.set_count());
assert_eq!(16, bit_set.len());
let bit_set: BitSet = vec![0b1111_1111, 0b1111_1111].into();
assert_eq!(16, bit_set.set_count());
let bit_set: BitSet = vec![].into();
assert_eq!(0, bit_set.len());
}
struct DemoTable;
#[async_trait::async_trait]
impl Table for DemoTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true),
ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), true),
];
Arc::new(Schema::with_timestamp_index(column_schemas, 3).unwrap())
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> TableResult<SendableRecordBatchStream> {
unimplemented!();
}
}
fn mock_insert_batches() -> Vec<Vec<u8>> {
const SEMANTIC_TAG: i32 = 0;
const SEMANTIC_FEILD: i32 = 1;
const SEMANTIC_TS: i32 = 2;
let row_count = 2;
let host_vals = column::Values {
string_values: vec!["host1".to_string(), "host2".to_string()],
..Default::default()
};
let host_column = Column {
column_name: "host".to_string(),
semantic_type: SEMANTIC_TAG,
values: Some(host_vals),
null_mask: vec![0],
};
let cpu_vals = column::Values {
f64_values: vec![0.31],
..Default::default()
};
let cpu_column = Column {
column_name: "cpu".to_string(),
semantic_type: SEMANTIC_FEILD,
values: Some(cpu_vals),
null_mask: vec![2],
};
let mem_vals = column::Values {
f64_values: vec![0.1],
..Default::default()
};
let mem_column = Column {
column_name: "memory".to_string(),
semantic_type: SEMANTIC_FEILD,
values: Some(mem_vals),
null_mask: vec![1],
};
let ts_vals = column::Values {
i64_values: vec![100, 101],
..Default::default()
};
let ts_column = Column {
column_name: "ts".to_string(),
semantic_type: SEMANTIC_TS,
values: Some(ts_vals),
null_mask: vec![0],
};
let insert_batch = InsertBatch {
columns: vec![host_column, cpu_column, mem_column, ts_column],
row_count,
};
vec![insert_batch.into()]
}
}

View File

@@ -3,6 +3,8 @@ use tonic::{Request, Response, Status};
use super::handler::BatchHandler;
pub const PROTOCOL_VERSION: u32 = 1;
#[derive(Clone)]
pub struct Server {
handler: BatchHandler,