feat: RecordBatch serialization (#26)

This commit is contained in:
Lei, Huang
2022-05-17 17:01:00 +08:00
committed by GitHub
parent 3d374cce68
commit bd4fe1f5bc
7 changed files with 177 additions and 57 deletions

View File

@@ -17,11 +17,13 @@ paste = "1.0"
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
serde_json = "1.0.81"
tokio = { version = "1.18", features = ["full"] }

View File

@@ -1,14 +1,9 @@
use std::sync::Arc;
use arrow::array::{
BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array,
};
use arrow::datatypes::DataType;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::Schema;
use paste::paste;
use serde::ser::SerializeStruct;
use datatypes::serialize::Serializable;
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
#[derive(Clone, Debug, PartialEq)]
@@ -17,43 +12,6 @@ pub struct RecordBatch {
pub df_recordbatch: DfRecordBatch,
}
macro_rules! collect_columns {
($array: ident, $columns: ident, $($data_type: expr), +) => {
paste! {
match $array.data_type() {
$(DataType::$data_type => {
if let Some(array) = $array.as_any().downcast_ref::<[<$data_type Array>]>() {
$columns.push(Column::$data_type(array.values().as_slice()));
}
})+,
DataType::Utf8 => {
if let Some(array) = $array.as_any().downcast_ref::<Utf8Array<i32>>() {
$columns.push(Column::Utf8(array.values().as_slice()));
}
},
_ => unimplemented!(),
}
}
};
}
#[derive(Serialize)]
enum Column<'a> {
Int64(&'a [i64]),
Int32(&'a [i32]),
Int16(&'a [i16]),
Int8(&'a [i8]),
UInt64(&'a [u64]),
UInt32(&'a [u32]),
UInt16(&'a [u16]),
UInt8(&'a [u8]),
Float64(&'a [f64]),
Float32(&'a [f32]),
Boolean((&'a [u8], usize, usize)),
Utf8(&'a [u8]),
}
/// TODO(dennis): should be implemented in datatypes
impl Serialize for RecordBatch {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -63,17 +21,54 @@ impl Serialize for RecordBatch {
s.serialize_field("schema", &self.schema.arrow_schema())?;
let df_columns = self.df_recordbatch.columns();
let mut columns: Vec<Column> = Vec::with_capacity(df_columns.len());
for array in df_columns {
collect_columns!(
array, columns, Int64, Int32, Int16, Int8, UInt64, UInt32, UInt16, UInt8, Float64,
Float32, Boolean
);
}
s.serialize_field("columns", &columns)?;
let vec = df_columns
.iter()
.map(|c| c.serialize_to_json())
.collect::<Result<Vec<_>, _>>()
.map_err(S::Error::custom)?;
s.serialize_field("columns", &vec)?;
s.end()
}
}
#[cfg(test)]
mod tests {
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use super::*;
#[test]
pub fn test_serialize_recordbatch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"number",
DataType::UInt32,
false,
)]));
let schema = Arc::new(Schema::new(arrow_schema.clone()));
let numbers: Vec<u32> = (0..10).collect();
let df_batch = DfRecordBatch::try_new(
arrow_schema,
vec![Arc::new(UInt32Array::from_slice(&numbers))],
)
.unwrap();
let batch = RecordBatch {
schema,
df_recordbatch: df_batch,
};
let mut output = vec![];
let mut serializer = serde_json::Serializer::new(&mut output);
batch.serialize(&mut serializer).unwrap();
assert_eq!(
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
String::from_utf8_lossy(&output)
);
}
}