From bd4fe1f5bc653c2012e552f73cb5de2f06f5b9d2 Mon Sep 17 00:00:00 2001 From: "Lei, Huang" Date: Tue, 17 May 2022 17:01:00 +0800 Subject: [PATCH] feat: RecordBatch serialization (#26) --- Cargo.lock | 1 + src/common/recordbatch/Cargo.toml | 2 + src/common/recordbatch/src/recordbatch.rs | 101 ++++++++++----------- src/datanode/tests/http_test.rs | 3 +- src/datatypes/src/error.rs | 3 + src/datatypes/src/vectors.rs | 105 ++++++++++++++++++++++ src/datatypes/src/vectors/primitive.rs | 19 +++- 7 files changed, 177 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 160af4a404..af0627f1d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -544,6 +544,7 @@ dependencies = [ "futures", "paste", "serde", + "serde_json", "snafu", "tokio", ] diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index da17d5c90d..1fe8f3bb87 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -17,11 +17,13 @@ paste = "1.0" serde = "1.0" snafu = { version = "0.7", features = ["backtraces"] } + [dev-dependencies.arrow] package = "arrow2" version="0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] [dev-dependencies] +serde_json = "1.0.81" tokio = { version = "1.18", features = ["full"] } diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index dcbc54eeee..ff58d7e3d9 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -1,14 +1,9 @@ use std::sync::Arc; -use arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array, -}; -use arrow::datatypes::DataType; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::schema::Schema; -use paste::paste; -use serde::ser::SerializeStruct; +use datatypes::serialize::Serializable; +use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; #[derive(Clone, Debug, PartialEq)] @@ -17,43 +12,6 @@ pub struct RecordBatch { pub df_recordbatch: DfRecordBatch, } -macro_rules! collect_columns { - ($array: ident, $columns: ident, $($data_type: expr), +) => { - paste! { - match $array.data_type() { - $(DataType::$data_type => { - if let Some(array) = $array.as_any().downcast_ref::<[<$data_type Array>]>() { - $columns.push(Column::$data_type(array.values().as_slice())); - } - })+, - DataType::Utf8 => { - if let Some(array) = $array.as_any().downcast_ref::>() { - $columns.push(Column::Utf8(array.values().as_slice())); - } - }, - _ => unimplemented!(), - } - } - }; -} - -#[derive(Serialize)] -enum Column<'a> { - Int64(&'a [i64]), - Int32(&'a [i32]), - Int16(&'a [i16]), - Int8(&'a [i8]), - UInt64(&'a [u64]), - UInt32(&'a [u32]), - UInt16(&'a [u16]), - UInt8(&'a [u8]), - Float64(&'a [f64]), - Float32(&'a [f32]), - Boolean((&'a [u8], usize, usize)), - Utf8(&'a [u8]), -} - -/// TODO(dennis): should be implemented in datatypes impl Serialize for RecordBatch { fn serialize(&self, serializer: S) -> Result where @@ -63,17 +21,54 @@ impl Serialize for RecordBatch { s.serialize_field("schema", &self.schema.arrow_schema())?; let df_columns = self.df_recordbatch.columns(); - let mut columns: Vec = Vec::with_capacity(df_columns.len()); - for array in df_columns { - collect_columns!( - array, columns, Int64, Int32, Int16, Int8, UInt64, UInt32, UInt16, UInt8, Float64, - Float32, Boolean - ); - } - - s.serialize_field("columns", &columns)?; + let vec = df_columns + .iter() + .map(|c| c.serialize_to_json()) + .collect::, _>>() + .map_err(S::Error::custom)?; + s.serialize_field("columns", &vec)?; s.end() } } + +#[cfg(test)] +mod tests { + use arrow::array::UInt32Array; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion_common::field_util::SchemaExt; + use datafusion_common::record_batch::RecordBatch as DfRecordBatch; + + use super::*; + + #[test] + pub fn test_serialize_recordbatch() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "number", + DataType::UInt32, + false, + )])); + let schema = Arc::new(Schema::new(arrow_schema.clone())); + + let numbers: Vec = (0..10).collect(); + let df_batch = DfRecordBatch::try_new( + arrow_schema, + vec![Arc::new(UInt32Array::from_slice(&numbers))], + ) + .unwrap(); + + let batch = RecordBatch { + schema, + df_recordbatch: df_batch, + }; + + let mut output = vec![]; + let mut serializer = serde_json::Serializer::new(&mut output); + batch.serialize(&mut serializer).unwrap(); + assert_eq!( + r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#, + String::from_utf8_lossy(&output) + ); + } +} diff --git a/src/datanode/tests/http_test.rs b/src/datanode/tests/http_test.rs index fcc4c60975..c767c7bb4a 100644 --- a/src/datanode/tests/http_test.rs +++ b/src/datanode/tests/http_test.rs @@ -23,7 +23,6 @@ async fn test_sql_api() { let res = client.get("/sql").send().await; assert_eq!(res.status(), StatusCode::OK); - // TODO(dennis): deserialize to json response let body = res.text().await; assert_eq!( body, @@ -39,7 +38,7 @@ async fn test_sql_api() { let body = res.text().await; assert_eq!( body, - r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[{"UInt32":[0,1,2,3,4,5,6,7,8,9]}]}]}}"# + r#"{"success":true,"output":{"Rows":[{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}]}}"# ); } diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 038a912f4c..3671775340 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -11,6 +11,9 @@ pub enum Error { source: serde_json::Error, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert datafusion type: {}", from))] + Conversion { from: String, backtrace: Backtrace }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index efdfc66591..ce1837e71a 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -4,11 +4,20 @@ pub mod primitive; use std::any::Any; use std::sync::Arc; +use arrow::array::Array; use arrow::array::ArrayRef; +use arrow::datatypes::DataType; pub use binary::*; +use paste::paste; pub use primitive::*; +use serde_json::Value; use crate::data_type::DataTypeRef; +use crate::serialize::Serializable; +use crate::vectors::{ + Float32Vector, Float64Vector, Int16Vector, Int32Vector, Int64Vector, Int8Vector, UInt16Vector, + UInt32Vector, UInt64Vector, UInt8Vector, +}; /// Vector of data values. pub trait Vector: Send + Sync { @@ -34,3 +43,99 @@ pub trait Vector: Send + Sync { } pub type VectorRef = Arc; + +pub trait TryIntoVector { + fn try_into_vector(self) -> crate::error::Result; +} + +macro_rules! impl_try_into_vector_for_arrow_array { + ( $($ty: expr),+ ) => { + paste! { + impl TryIntoVector for A +where + A: AsRef, +{ + fn try_into_vector(self) -> Result { + match self.as_ref().data_type() { + $( + DataType::$ty => Ok(Arc::new(<[<$ty Vector>]>::try_from_arrow_array(self.as_ref())?)), + )+ + _ => { + unimplemented!() + } + } + }} + } + } +} + +macro_rules! impl_arrow_array_serialize { + ( $($ty: expr),+ ) => { + impl Serializable for A +where + A: AsRef + Send + Sync, +{ + fn serialize_to_json(&self) -> crate::error::Result> { + paste! { + match self.as_ref().data_type() { + $( + DataType::$ty => <[<$ty Vector>]>::try_from_arrow_array(self.as_ref())?.serialize_to_json(), + )+ + _ => { + unimplemented!() + } + } + } + } + } + }; +} + +// todo(hl): implement more type to vector conversion +impl_try_into_vector_for_arrow_array![ + Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64 +]; + +// todo(hl): implement serializations for more types +impl_arrow_array_serialize![ + Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64 +]; + +#[cfg(test)] +mod tests { + use arrow::array::{Array, PrimitiveArray}; + use serde::Serialize; + + use super::*; + use crate::types::DataTypeBuilder; + + #[test] + pub fn test_df_columns_to_vector() { + let df_column: Arc = Arc::new(PrimitiveArray::from_slice(vec![1, 2, 3])); + let vector = df_column.try_into_vector().unwrap(); + assert_eq!( + i32::build_data_type().as_arrow_type(), + vector.data_type().as_arrow_type() + ); + } + + #[test] + pub fn test_serialize_i32_vector() { + let df_column: Arc = Arc::new(PrimitiveArray::from_slice(vec![1, 2, 3])); + let json_value = df_column.serialize_to_json().unwrap(); + let mut output = vec![]; + let mut serializer = serde_json::ser::Serializer::new(&mut output); + json_value.serialize(&mut serializer).unwrap(); + assert_eq!(b"[1,2,3]", output.as_slice()); + } + + #[test] + pub fn test_serialize_i8_vector() { + let df_column: Arc = Arc::new(PrimitiveArray::from_slice(vec![1u8, 2u8, 3u8])); + let json_value = df_column.serialize_to_json().unwrap(); + let mut output = vec![]; + let mut serializer = serde_json::ser::Serializer::new(&mut output); + json_value.serialize(&mut serializer).unwrap(); + assert_eq!(b"[1,2,3]", output.as_slice()); + } +} diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 911cd3ebf0..37b7f8f0a4 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -2,12 +2,14 @@ use std::any::Any; use std::slice::Iter; use std::sync::Arc; -use arrow::array::{ArrayRef, MutablePrimitiveArray, PrimitiveArray}; +use arrow::array::{Array, ArrayRef, MutablePrimitiveArray, PrimitiveArray}; use arrow::bitmap::utils::ZipValidity; use serde_json::Value as JsonValue; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::data_type::DataTypeRef; +use crate::error; +use crate::error::ConversionSnafu; use crate::scalars::{ScalarVector, ScalarVectorBuilder}; use crate::types::{DataTypeBuilder, Primitive}; use crate::vectors::Vector; @@ -47,6 +49,19 @@ impl<'a, T: Primitive> PrimitiveVector { pub fn iter(&'a self) -> std::slice::Iter<'a, T> { self.array.values().iter() } + + /// Convert an Arrow array to PrimitiveVector. + pub fn try_from_arrow_array(array: &dyn Array) -> Result { + Ok(Self::new( + array + .as_any() + .downcast_ref::>() + .with_context(|| ConversionSnafu { + from: format!("{:?}", array.data_type()), + })? + .clone(), + )) + } } impl ScalarVector for PrimitiveVector {