feat: impl select (grpc) (#138)

* SelectExpr: change to oneof expr

* Convert between Vec<u8> and SelectResult

* Chore: use encode_to_vec and decode, instead of encode_length_delimited_to_vec and decode_length_delimited

* Chore: move bitset into separate file

* Grpc select impl
This commit is contained in:
fys
2022-08-15 18:31:47 +08:00
committed by GitHub
parent 60dc77d1d9
commit 34133fae5a
22 changed files with 981 additions and 180 deletions

View File

@@ -5,12 +5,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.arrow]
package = "arrow2"
version = "0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
axum = "0.5"
axum-macros = "0.2"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
@@ -37,6 +43,7 @@ tower-http = { version ="0.3", features = ["full"]}
[dev-dependencies]
axum-test-helper = "0.1"
common-query = { path = "../common/query" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
tempdir = "0.3"
[dev-dependencies.arrow]

View File

@@ -115,6 +115,12 @@ pub enum Error {
source: std::io::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String },
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -139,7 +145,9 @@ impl ErrorExt for Error {
| Error::ParseAddr { .. }
| Error::TcpBind { .. }
| Error::StartGrpc { .. }
| Error::CreateDir { .. } => StatusCode::Internal,
| Error::CreateDir { .. }
| Error::Conversion { .. }
| Error::UnsupportedExpr { .. } => StatusCode::Internal,
Error::InitBackend { .. } => StatusCode::StorageUnavailable,
Error::OpenLogStore { source } => source.status_code(),
Error::OpenStorageEngine { source } => source.status_code(),

View File

@@ -1,5 +1,6 @@
mod handler;
pub mod insert;
mod select;
mod server;
use common_telemetry::logging::info;

View File

@@ -1,8 +1,14 @@
use api::v1::*;
use api::v1::{
codec::SelectResult, object_expr, object_result, select_expr, BatchRequest, BatchResponse,
DatabaseResponse, InsertExpr, MutateResult, ObjectResult, ResultHeader, SelectExpr,
SelectResult as SelectResultRaw,
};
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use query::Output;
use crate::server::grpc::server::PROTOCOL_VERSION;
use crate::{error::Result, instance::InstanceRef};
use crate::server::grpc::{select::to_object_result, server::PROTOCOL_VERSION};
use crate::{error::Result, error::UnsupportedExprSnafu, instance::InstanceRef};
#[derive(Clone)]
pub struct BatchHandler {
@@ -23,37 +29,20 @@ impl BatchHandler {
let exprs = req.exprs;
for obj_expr in exprs {
let mut object_resp = ObjectResult::default();
match obj_expr.expr {
let object_resp = match obj_expr.expr {
Some(object_expr::Expr::Insert(insert_expr)) => {
match self.instance.execute_grpc_insert(insert_expr).await {
Ok(Output::AffectedRows(rows)) => {
object_resp.header = Some(ResultHeader {
version: PROTOCOL_VERSION,
// TODO(fys): Only one success code (200) was provided
// in the early stage and we will refine it later
code: 200,
success: rows as u32,
..Default::default()
});
}
Err(err) => {
object_resp.header = Some(ResultHeader {
version: PROTOCOL_VERSION,
// TODO(fys): Only one error code (500) was provided
// in the early stage and we will refine it later
code: 500,
err_msg: err.to_string(),
// TODO(fys): failure count
..Default::default()
})
}
_ => unreachable!(),
}
self.handle_insert(insert_expr).await
}
_ => unimplemented!(),
}
Some(object_expr::Expr::Select(select_expr)) => {
self.handle_select(select_expr).await
}
other => {
return UnsupportedExprSnafu {
name: format!("{:?}", other),
}
.fail();
}
};
db_resp.results.push(object_resp);
}
@@ -61,4 +50,158 @@ impl BatchHandler {
batch_resp.databases.push(db_resp);
Ok(batch_resp)
}
pub async fn handle_insert(&self, insert_expr: InsertExpr) -> ObjectResult {
match self.instance.execute_grpc_insert(insert_expr).await {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Err(err) => {
// TODO(fys): failure count
build_err_result(&err)
}
_ => unreachable!(),
}
}
pub async fn handle_select(&self, select_expr: SelectExpr) -> ObjectResult {
let expr = match select_expr.expr {
Some(expr) => expr,
None => return ObjectResult::default(),
};
match expr {
select_expr::Expr::Sql(sql) => {
let result = self.instance.execute_sql(&sql).await;
to_object_result(result).await
}
}
}
}
pub type Success = u32;
pub type Failure = u32;
#[derive(Default)]
pub(crate) struct ObjectResultBuilder {
version: u32,
code: u32,
err_msg: Option<String>,
result: Option<Body>,
}
pub(crate) enum Body {
Mutate((Success, Failure)),
Select(SelectResult),
}
impl ObjectResultBuilder {
pub fn new() -> Self {
Self {
version: PROTOCOL_VERSION,
..Default::default()
}
}
#[allow(dead_code)]
pub fn version(mut self, version: u32) -> Self {
self.version = version;
self
}
pub fn status_code(mut self, code: u32) -> Self {
self.code = code;
self
}
pub fn err_msg(mut self, err_msg: String) -> Self {
self.err_msg = Some(err_msg);
self
}
pub fn mutate_result(mut self, success: u32, failure: u32) -> Self {
self.result = Some(Body::Mutate((success, failure)));
self
}
pub fn select_result(mut self, select_result: SelectResult) -> Self {
self.result = Some(Body::Select(select_result));
self
}
pub fn build(self) -> ObjectResult {
let header = Some(ResultHeader {
version: self.version,
code: self.code,
err_msg: self.err_msg.unwrap_or_default(),
});
let result = match self.result {
Some(Body::Mutate((success, failure))) => {
Some(object_result::Result::Mutate(MutateResult {
success,
failure,
}))
}
Some(Body::Select(select)) => Some(object_result::Result::Select(SelectResultRaw {
raw_data: select.into(),
})),
None => None,
};
ObjectResult { header, result }
}
}
pub(crate) fn build_err_result(err: &impl ErrorExt) -> ObjectResult {
ObjectResultBuilder::new()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build()
}
#[cfg(test)]
mod tests {
use api::v1::{object_result, MutateResult};
use common_error::status_code::StatusCode;
use super::{build_err_result, ObjectResultBuilder};
use crate::server::grpc::handler::UnsupportedExprSnafu;
use crate::server::grpc::server::PROTOCOL_VERSION;
#[test]
fn test_object_result_builder() {
let obj_result = ObjectResultBuilder::new()
.version(101)
.status_code(500)
.err_msg("Failed to read this file!".to_string())
.mutate_result(100, 20)
.build();
let header = obj_result.header.unwrap();
assert_eq!(101, header.version);
assert_eq!(500, header.code);
assert_eq!("Failed to read this file!", header.err_msg);
let result = obj_result.result.unwrap();
assert_eq!(
object_result::Result::Mutate(MutateResult {
success: 100,
failure: 20,
}),
result
);
}
#[test]
fn test_build_err_result() {
let err = UnsupportedExprSnafu { name: "select" }.build();
let err_result = build_err_result(&err);
let header = err_result.header.unwrap();
let result = err_result.result;
assert_eq!(PROTOCOL_VERSION, header.version);
assert_eq!(StatusCode::Internal as u32, header.code);
assert_eq!("Unsupported expr type: select", header.err_msg);
assert!(result.is_none());
}
}

View File

@@ -3,7 +3,8 @@ use std::{
sync::Arc,
};
use api::v1::{column::Values, Column, InsertBatch, InsertExpr};
use api::v1::{codec::InsertBatch, column::Values, Column, InsertExpr};
use common_base::bitset::BitSet;
use datatypes::{data_type::ConcreteDataType, value::Value, vectors::VectorBuilder};
use snafu::{ensure, OptionExt, ResultExt};
use table::{requests::InsertRequest, Table};
@@ -80,10 +81,10 @@ fn add_values_to_builder(
null_mask: impl Into<BitSet>,
) -> Result<()> {
let data_type = builder.data_type();
let null_mask = null_mask.into();
let null_mask: BitSet = null_mask.into();
let values = convert_values(&data_type, values);
if null_mask.len() == 0 {
if null_mask.is_empty() {
ensure!(values.len() == row_count, IllegalInsertDataSnafu);
values.iter().for_each(|value| {
@@ -91,17 +92,18 @@ fn add_values_to_builder(
});
} else {
ensure!(
null_mask.set_count() + values.len() == row_count,
null_mask.ones_count() + values.len() == row_count,
IllegalInsertDataSnafu
);
let mut idx_of_values = 0;
for idx in 0..row_count {
if is_null(&null_mask, idx) {
builder.push(&Value::Null);
} else {
builder.push(&values[idx_of_values]);
idx_of_values += 1;
match is_null(&null_mask, idx) {
Some(true) => builder.push(&Value::Null),
_ => {
builder.push(&values[idx_of_values]);
idx_of_values += 1
}
}
}
}
@@ -173,57 +175,8 @@ fn convert_values(data_type: &ConcreteDataType, values: Values) -> Vec<Value> {
}
}
fn is_null(null_mask: &BitSet, idx: usize) -> bool {
debug_assert!(idx < null_mask.len, "idx should be less than null_mask.len");
matches!(null_mask.get_bit(idx), Some(true))
}
// TOOD(fys): move BitSet to better location
struct BitSet {
buffer: Vec<u8>,
len: usize,
}
impl BitSet {
fn len(&self) -> usize {
self.len
}
fn set_count(&self) -> usize {
(0..self.len)
.into_iter()
.filter(|&i| matches!(self.get_bit(i), Some(true)))
.count()
}
fn get_bit(&self, idx: usize) -> Option<bool> {
if idx >= self.len {
return None;
}
let byte_idx = idx >> 3;
let bit_idx = idx & 7;
Some((self.buffer[byte_idx] >> bit_idx) & 1 != 0)
}
}
impl From<Vec<u8>> for BitSet {
fn from(data: Vec<u8>) -> Self {
BitSet {
len: data.len() << 3,
buffer: data,
}
}
}
impl From<&[u8]> for BitSet {
fn from(data: &[u8]) -> Self {
BitSet {
buffer: data.into(),
len: data.len() << 3,
}
}
fn is_null(null_mask: &BitSet, idx: usize) -> Option<bool> {
null_mask.get(idx)
}
#[cfg(test)]
@@ -231,9 +184,11 @@ mod tests {
use std::{any::Any, sync::Arc};
use api::v1::{
codec::InsertBatch,
column::{self, Values},
Column, InsertBatch, InsertExpr,
Column, InsertExpr,
};
use common_base::bitset::BitSet;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::{
@@ -244,7 +199,7 @@ mod tests {
use table::error::Result as TableResult;
use table::Table;
use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null, BitSet};
use crate::server::grpc::insert::{convert_values, insertion_expr_to_request, is_null};
#[test]
fn test_insertion_expr_to_request() {
@@ -299,37 +254,14 @@ mod tests {
fn test_is_null() {
let null_mask: BitSet = vec![0b0000_0001, 0b0000_1000].into();
assert!(is_null(&null_mask, 0));
assert!(!is_null(&null_mask, 1));
assert!(!is_null(&null_mask, 10));
assert!(is_null(&null_mask, 11));
assert!(!is_null(&null_mask, 12));
}
assert_eq!(Some(true), is_null(&null_mask, 0));
assert_eq!(Some(false), is_null(&null_mask, 1));
assert_eq!(Some(false), is_null(&null_mask, 10));
assert_eq!(Some(true), is_null(&null_mask, 11));
assert_eq!(Some(false), is_null(&null_mask, 12));
#[test]
fn test_bit_set() {
let bit_set: BitSet = vec![0b0000_0001, 0b0000_1000].into();
assert!(bit_set.get_bit(0).unwrap());
assert!(!bit_set.get_bit(1).unwrap());
assert!(!bit_set.get_bit(10).unwrap());
assert!(bit_set.get_bit(11).unwrap());
assert!(!bit_set.get_bit(12).unwrap());
assert!(bit_set.get_bit(16).is_none());
assert_eq!(2, bit_set.set_count());
assert_eq!(16, bit_set.len());
let bit_set: BitSet = vec![0b0000_0000, 0b0000_0000].into();
assert_eq!(0, bit_set.set_count());
assert_eq!(16, bit_set.len());
let bit_set: BitSet = vec![0b1111_1111, 0b1111_1111].into();
assert_eq!(16, bit_set.set_count());
let bit_set: BitSet = vec![].into();
assert_eq!(0, bit_set.len());
assert_eq!(None, is_null(&null_mask, 16));
assert_eq!(None, is_null(&null_mask, 99));
}
struct DemoTable;

View File

@@ -0,0 +1,283 @@
use std::sync::Arc;
use api::v1::{codec::SelectResult, column::Values, Column, ObjectResult};
use arrow::array::{Array, BooleanArray, PrimitiveArray};
use common_base::bitset::BitSet;
use common_error::prelude::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::{util, RecordBatch, SendableRecordBatchStream};
use datatypes::arrow_array::{BinaryArray, StringArray};
use query::Output;
use snafu::OptionExt;
use crate::error::{ConversionSnafu, Result};
use crate::server::grpc::handler::{build_err_result, ObjectResultBuilder};
pub(crate) async fn to_object_result(result: Result<Output>) -> ObjectResult {
match result {
Ok(Output::AffectedRows(rows)) => ObjectResultBuilder::new()
.status_code(StatusCode::Success as u32)
.mutate_result(rows as u32, 0)
.build(),
Ok(Output::RecordBatch(stream)) => record_batchs(stream).await,
Err(err) => ObjectResultBuilder::new()
.status_code(err.status_code() as u32)
.err_msg(err.to_string())
.build(),
}
}
async fn record_batchs(stream: SendableRecordBatchStream) -> ObjectResult {
let builder = ObjectResultBuilder::new();
match util::collect(stream).await {
Ok(record_batches) => match try_convert(record_batches) {
Ok(select_result) => builder
.status_code(StatusCode::Success as u32)
.select_result(select_result)
.build(),
Err(err) => build_err_result(&err),
},
Err(err) => build_err_result(&err),
}
}
// All schemas of record_batches must be the same.
fn try_convert(record_batches: Vec<RecordBatch>) -> Result<SelectResult> {
let first = if let Some(r) = record_batches.get(0) {
r
} else {
return Ok(SelectResult::default());
};
let row_count: usize = record_batches
.iter()
.map(|r| r.df_recordbatch.num_rows())
.sum();
let schemas = first.schema.column_schemas();
let mut columns = Vec::with_capacity(schemas.len());
for (idx, schema) in schemas.iter().enumerate() {
let column_name = schema.name.clone();
let arrays: Vec<Arc<dyn Array>> = record_batches
.iter()
.map(|r| r.df_recordbatch.columns()[idx].clone())
.collect();
let column = Column {
column_name,
values: Some(values(&arrays)?),
null_mask: null_mask(&arrays, row_count),
..Default::default()
};
columns.push(column);
}
Ok(SelectResult {
columns,
row_count: row_count as u32,
})
}
fn null_mask(arrays: &Vec<Arc<dyn Array>>, row_count: usize) -> Vec<u8> {
let null_count: usize = arrays.iter().map(|a| a.null_count()).sum();
if null_count == 0 {
return Vec::default();
}
let mut nulls_set = BitSet::with_capacity(row_count);
for array in arrays {
let validity = array.validity();
// TODO(fys): Improve in the future, better way: repeat(false, len).
if let Some(v) = validity {
let nulls: Vec<bool> = v.iter().map(|x| !x).collect();
nulls_set.append(&nulls);
} else {
nulls_set.append(&vec![false; array.len()]);
}
}
nulls_set.buffer()
}
macro_rules! convert_arrow_array_to_grpc_vals {
($data_type: expr, $arrays: ident, $(($Type: ident, $CastType: ty, $field: ident, $MapFunction: expr)), +) => {
match $data_type {
$(
arrow::datatypes::DataType::$Type => {
let mut vals = Values::default();
for array in $arrays {
let array = array.as_any().downcast_ref::<$CastType>().with_context(|| ConversionSnafu {
from: format!("{:?}", $data_type),
})?;
vals.$field.extend(array
.iter()
.filter_map(|i| i.map($MapFunction))
.collect::<Vec<_>>());
}
return Ok(vals);
},
)+
_ => unimplemented!(),
}
};
}
fn values(arrays: &[Arc<dyn Array>]) -> Result<Values> {
if arrays.is_empty() {
return Ok(Values::default());
}
let data_type = arrays[0].data_type();
convert_arrow_array_to_grpc_vals!(
data_type, arrays,
(Boolean, BooleanArray, bool_values, |x| {x}),
(Int8, PrimitiveArray<i8>, i8_values, |x| {*x as i32}),
(Int16, PrimitiveArray<i16>, i16_values, |x| {*x as i32}),
(Int32, PrimitiveArray<i32>, i32_values, |x| {*x}),
(Int64, PrimitiveArray<i64>, i64_values, |x| {*x}),
(UInt8, PrimitiveArray<u8>, u8_values, |x| {*x as u32}),
(UInt16, PrimitiveArray<u16>, u16_values, |x| {*x as u32}),
(UInt32, PrimitiveArray<u32>, u32_values, |x| {*x}),
(UInt64, PrimitiveArray<u64>, u64_values, |x| {*x}),
(Float32, PrimitiveArray<f32>, f32_values, |x| {*x}),
(Float64, PrimitiveArray<f64>, f64_values, |x| {*x}),
(Binary, BinaryArray, binary_values, |x| {x.into()}),
(LargeBinary, BinaryArray, binary_values, |x| {x.into()}),
(Utf8, StringArray, string_values, |x| {x.into()}),
(LargeUtf8, StringArray, string_values, |x| {x.into()})
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::{
array::{Array, BooleanArray, PrimitiveArray},
datatypes::{DataType, Field},
};
use common_recordbatch::RecordBatch;
use datafusion::field_util::SchemaExt;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::{
arrow_array::StringArray,
schema::Schema,
vectors::{UInt32Vector, VectorRef},
};
use crate::server::grpc::select::{null_mask, try_convert, values};
#[test]
fn test_convert_record_batches_to_select_result() {
let r1 = mock_record_batch();
let r2 = mock_record_batch();
let record_batches = vec![r1, r2];
let s = try_convert(record_batches).unwrap();
let c1 = s.columns.get(0).unwrap();
let c2 = s.columns.get(1).unwrap();
assert_eq!("c1", c1.column_name);
assert_eq!("c2", c2.column_name);
assert_eq!(vec![0b0010_0100], c1.null_mask);
assert_eq!(vec![0b0011_0110], c2.null_mask);
assert_eq!(vec![1, 2, 1, 2], c1.values.as_ref().unwrap().u32_values);
assert_eq!(vec![1, 1], c2.values.as_ref().unwrap().u32_values);
}
#[test]
fn test_convert_arrow_arrays_i32() {
let array: PrimitiveArray<i32> =
PrimitiveArray::from(vec![Some(1), Some(2), None, Some(3)]);
let array: Arc<dyn Array> = Arc::new(array);
let values = values(&[array]).unwrap();
assert_eq!(vec![1, 2, 3], values.i32_values);
}
#[test]
fn test_convert_arrow_arrays_string() {
let array = StringArray::from(vec![
Some("1".to_string()),
Some("2".to_string()),
None,
Some("3".to_string()),
None,
]);
let array: Arc<dyn Array> = Arc::new(array);
let values = values(&[array]).unwrap();
assert_eq!(vec!["1", "2", "3"], values.string_values);
}
#[test]
fn test_convert_arrow_arrays_bool() {
let array = BooleanArray::from(vec![Some(true), Some(false), None, Some(false), None]);
let array: Arc<dyn Array> = Arc::new(array);
let values = values(&[array]).unwrap();
assert_eq!(vec![true, false, false], values.bool_values);
}
#[test]
fn test_convert_arrow_arrays_empty() {
let array = BooleanArray::from(vec![None, None, None, None, None]);
let array: Arc<dyn Array> = Arc::new(array);
let values = values(&[array]).unwrap();
assert_eq!(Vec::<bool>::default(), values.bool_values);
}
#[test]
fn test_null_mask() {
let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![None, Some(2), None]));
let a2: Arc<dyn Array> =
Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), None, Some(4)]));
let mask = null_mask(&vec![a1, a2], 3 + 4);
assert_eq!(vec![0b0010_0101], mask);
let empty: Arc<dyn Array> = Arc::new(PrimitiveArray::<i32>::from(vec![None, None, None]));
let mask = null_mask(&vec![empty.clone(), empty.clone(), empty], 9);
assert_eq!(vec![0b1111_1111, 0b0000_0001], mask);
let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), Some(3)]));
let a2: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(4), Some(5), Some(6)]));
let mask = null_mask(&vec![a1, a2], 3 + 3);
assert_eq!(Vec::<u8>::default(), mask);
let a1: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(1), Some(2), Some(3)]));
let a2: Arc<dyn Array> = Arc::new(PrimitiveArray::from(vec![Some(4), Some(5), None]));
let mask = null_mask(&vec![a1, a2], 3 + 3);
assert_eq!(vec![0b0010_0000], mask);
}
fn mock_record_batch() -> RecordBatch {
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt32, false),
]));
let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
let v1 = Arc::new(UInt32Vector::from(vec![Some(1), Some(2), None]));
let v2 = Arc::new(UInt32Vector::from(vec![Some(1), None, None]));
let columns: Vec<VectorRef> = vec![v1, v2];
RecordBatch::new(schema, columns).unwrap()
}
}