feat: write_batch proto codec (#122)

* feat: protobuf codec

* chore: minor fix

* chore: beatify the macro code

* chore: minor fix

* chore: by cr

* chore: by cr and impl wal with proto

* bugfix: invalid num_rows for multi put_data in mutations

Co-authored-by: jiachun <jiachun_fjc@163.com>
This commit is contained in:
egg
2022-08-09 19:57:51 +08:00
committed by GitHub
parent 567510fa3e
commit 8d51ad3429
11 changed files with 715 additions and 97 deletions

1
Cargo.lock generated
View File

@@ -3609,6 +3609,7 @@ dependencies = [
"lazy_static",
"log-store",
"object-store",
"paste",
"planus",
"prost 0.11.0",
"rand 0.8.5",

View File

@@ -20,7 +20,7 @@ macro_rules! for_all_scalar_types {
}
#[macro_export]
macro_rules! for_all_primitive_types{
macro_rules! for_all_primitive_types {
($macro:tt $(, $x:tt)*) => {
$macro! {
[$($x),*],

View File

@@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
arc-swap = "1.0"
arrow-format = { version = "0.4", features = ["ipc"] }
async-trait = "0.1"
async-stream = "0.3"
async-trait = "0.1"
bit-vec = "0.6"
bytes = "1.1"
common-error = { path = "../common/error" }
@@ -22,6 +22,7 @@ futures-util = "0.3"
lazy_static = "1.4"
log-store = { path = "../log-store" }
object-store = { path = "../object-store" }
paste = "1.0"
planus = "0.2"
prost = "0.11"
serde = { version = "1.0", features = ["derive"] }

View File

@@ -1,5 +1,5 @@
fn main() {
tonic_build::configure()
.compile(&["proto/wal.proto"], &["."])
.expect("compile wal proto");
.compile(&["proto/wal.proto", "proto/write_batch.proto"], &["."])
.expect("compile proto");
}

View File

@@ -0,0 +1,82 @@
syntax = "proto3";
package greptime.storage.write_batch.v1;
message WriteBatch {
Schema schema = 1;
repeated Mutation mutations = 2;
}
message Schema {
repeated ColumnSchema column_schemas = 1;
TimestampIndex timestamp_index = 2;
}
message TimestampIndex {
uint64 value = 1;
}
message ColumnSchema {
string name = 1;
DataType data_type = 2;
bool is_nullable = 3;
}
message Mutation {
oneof mutation {
Put put = 1;
Delete delete = 2;
}
}
message Put {
repeated Column columns = 1;
}
message Delete {
// TODO(zxy)
}
message Column {
Values values = 1;
bytes value_null_mask = 2;
uint64 num_rows = 3;
}
// TODO(jiachun): Enum might be insufficient to represent some composite data type such as list, struct.
// In the future, may be we can refer to https://github.com/apache/arrow/blob/master/format/Schema.fbs#L398
enum DataType {
NULL = 0;
BOOLEAN = 1;
INT8 = 2;
INT16 = 3;
INT32 = 4;
INT64 = 5;
UINT8 = 6;
UINT16 = 7;
UINT32 = 8;
UINT64 = 9;
FLOAT32 = 10;
FLOAT64 = 11;
STRING = 12;
BINARY = 13;
}
message Values {
repeated int32 i8_values = 1;
repeated int32 i16_values = 2;
repeated int32 i32_values = 3;
repeated int64 i64_values = 4;
repeated uint32 u8_values = 5;
repeated uint32 u16_values = 6;
repeated uint32 u32_values = 7;
repeated uint64 u64_values = 8;
repeated float f32_values = 9;
repeated double f64_values = 10;
repeated bool bool_values = 11;
repeated bytes binary_values = 12;
repeated string string_values = 13;
}

View File

@@ -1,43 +1,2 @@
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.wal.v1");
use crate::write_batch::{Mutation, WriteBatch};
pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec<MutationExtra> {
let column_schemas = write_batch.schema().column_schemas();
write_batch
.iter()
.map(|m| match m {
Mutation::Put(put) => {
if put.num_columns() == column_schemas.len() {
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: Default::default(),
}
} else {
let mut column_null_mask =
bit_vec::BitVec::from_elem(column_schemas.len(), false);
for (i, cs) in column_schemas.iter().enumerate() {
if put.column_by_name(&cs.name).is_none() {
column_null_mask.set(i, true);
}
}
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: column_null_mask.to_bytes(),
}
}
}
})
.collect::<Vec<_>>()
}
impl WalHeader {
pub fn with_last_manifest_version(last_manifest_version: u64) -> Self {
Self {
last_manifest_version,
..Default::default()
}
}
}
pub mod wal;
pub mod write_batch;

View File

@@ -0,0 +1,42 @@
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.wal.v1");
use crate::write_batch::{Mutation, WriteBatch};
pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec<MutationExtra> {
let column_schemas = write_batch.schema().column_schemas();
write_batch
.iter()
.map(|m| match m {
Mutation::Put(put) => {
if put.num_columns() == column_schemas.len() {
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: Default::default(),
}
} else {
let mut column_null_mask =
bit_vec::BitVec::from_elem(column_schemas.len(), false);
for (i, cs) in column_schemas.iter().enumerate() {
if put.column_by_name(&cs.name).is_none() {
column_null_mask.set(i, true);
}
}
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: column_null_mask.to_bytes(),
}
}
}
})
.collect::<Vec<_>>()
}
impl WalHeader {
pub fn with_last_manifest_version(last_manifest_version: u64) -> Self {
Self {
last_manifest_version,
..Default::default()
}
}
}

View File

@@ -0,0 +1,306 @@
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.write_batch.v1");
use std::sync::Arc;
use common_error::prelude::*;
use datatypes::schema;
use datatypes::{
data_type::ConcreteDataType,
prelude::{ScalarVector, ScalarVectorBuilder},
vectors::{
BinaryVector, BinaryVectorBuilder, BooleanVector, BooleanVectorBuilder, Float32Vector,
Float32VectorBuilder, Float64Vector, Float64VectorBuilder, Int16Vector, Int16VectorBuilder,
Int32Vector, Int32VectorBuilder, Int64Vector, Int64VectorBuilder, Int8Vector,
Int8VectorBuilder, StringVector, StringVectorBuilder, UInt16Vector, UInt16VectorBuilder,
UInt32Vector, UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector,
UInt8VectorBuilder, Vector, VectorRef,
},
};
use paste::paste;
use snafu::OptionExt;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Failed to convert datafusion type: {}", from))]
Conversion { from: String, backtrace: Backtrace },
#[snafu(display("Empty column values read"))]
EmptyColumnValues { backtrace: Backtrace },
#[snafu(display("Invalid data type: {}", data_type))]
InvalidDataType {
data_type: i32,
backtrace: Backtrace,
},
#[snafu(display("Invalid timestamp index: {}", index))]
InvalidTimestampIndex {
index: usize,
source: datatypes::error::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl TimestampIndex {
pub fn new(value: u64) -> Self {
Self { value }
}
}
impl From<&schema::SchemaRef> for Schema {
fn from(schema: &schema::SchemaRef) -> Self {
let column_schemas = schema
.column_schemas()
.iter()
.map(|column_schema| column_schema.into())
.collect();
Schema {
column_schemas,
timestamp_index: schema
.timestamp_index()
.map(|index| TimestampIndex::new(index as u64)),
}
}
}
impl TryFrom<Schema> for schema::SchemaRef {
type Error = Error;
fn try_from(schema: Schema) -> Result<Self> {
let column_schemas = schema
.column_schemas
.iter()
.map(schema::ColumnSchema::try_from)
.collect::<Result<Vec<_>>>()?;
let schema: schema::SchemaRef = match schema.timestamp_index {
Some(index) => Arc::new(
schema::Schema::with_timestamp_index(column_schemas, index.value as usize)
.context(InvalidTimestampIndexSnafu {
index: index.value as usize,
})?,
),
None => Arc::new(schema::Schema::new(column_schemas)),
};
Ok(schema)
}
}
impl From<&schema::ColumnSchema> for ColumnSchema {
fn from(cs: &schema::ColumnSchema) -> Self {
Self {
name: cs.name.clone(),
data_type: DataType::from(&cs.data_type).into(),
is_nullable: cs.is_nullable,
}
}
}
impl TryFrom<&ColumnSchema> for schema::ColumnSchema {
type Error = Error;
fn try_from(column_schema: &ColumnSchema) -> Result<Self> {
if let Some(data_type) = DataType::from_i32(column_schema.data_type) {
Ok(schema::ColumnSchema::new(
column_schema.name.clone(),
data_type.into(),
column_schema.is_nullable,
))
} else {
InvalidDataTypeSnafu {
data_type: column_schema.data_type,
}
.fail()
}
}
}
impl From<&ConcreteDataType> for DataType {
fn from(data_type: &ConcreteDataType) -> Self {
match data_type {
ConcreteDataType::Boolean(_) => DataType::Boolean,
ConcreteDataType::Int8(_) => DataType::Int8,
ConcreteDataType::Int16(_) => DataType::Int16,
ConcreteDataType::Int32(_) => DataType::Int32,
ConcreteDataType::Int64(_) => DataType::Int64,
ConcreteDataType::UInt8(_) => DataType::Uint8,
ConcreteDataType::UInt16(_) => DataType::Uint16,
ConcreteDataType::UInt32(_) => DataType::Uint32,
ConcreteDataType::UInt64(_) => DataType::Uint64,
ConcreteDataType::Float32(_) => DataType::Float64,
ConcreteDataType::Float64(_) => DataType::Float64,
ConcreteDataType::String(_) => DataType::String,
ConcreteDataType::Null(_) => DataType::Null,
ConcreteDataType::Binary(_) => DataType::Binary,
_ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
}
}
}
impl From<DataType> for ConcreteDataType {
fn from(data_type: DataType) -> Self {
match data_type {
DataType::Boolean => ConcreteDataType::boolean_datatype(),
DataType::Int8 => ConcreteDataType::int8_datatype(),
DataType::Int16 => ConcreteDataType::int16_datatype(),
DataType::Int32 => ConcreteDataType::int32_datatype(),
DataType::Int64 => ConcreteDataType::int64_datatype(),
DataType::Uint8 => ConcreteDataType::uint8_datatype(),
DataType::Uint16 => ConcreteDataType::uint16_datatype(),
DataType::Uint32 => ConcreteDataType::uint32_datatype(),
DataType::Uint64 => ConcreteDataType::uint64_datatype(),
DataType::Float32 => ConcreteDataType::float32_datatype(),
DataType::Float64 => ConcreteDataType::float64_datatype(),
DataType::String => ConcreteDataType::string_datatype(),
DataType::Binary => ConcreteDataType::binary_datatype(),
DataType::Null => ConcreteDataType::null_datatype(),
}
}
}
#[macro_export]
macro_rules! gen_columns {
($key: tt, $vec_ty: ty, $vari: ident, $cast: expr) => {
paste! {
pub fn [<gen_columns_ $key>](vector: &VectorRef) -> Result<Column> {
let mut column = Column::default();
let mut values = Values::default();
let vector_ref =
vector
.as_any()
.downcast_ref::<$vec_ty>()
.with_context(|| ConversionSnafu {
from: std::format!("{:?}", vector.as_ref().data_type()),
})?;
let mut value_null_mask = bit_vec::BitVec::from_elem(vector_ref.len(), false);
vector_ref
.iter_data()
.enumerate()
.for_each(|(i, value)| match value {
Some($vari) => values.[<$key _values>].push($cast),
None => value_null_mask.set(i, true),
});
let null_mask = if vector_ref.len() == values.[<$key _values>].len() {
vec![]
} else {
value_null_mask.to_bytes()
};
column.values = Some(values);
column.value_null_mask = null_mask;
column.num_rows = vector_ref.len() as u64;
Ok(column)
}
}
};
}
gen_columns!(i8, Int8Vector, v, v as i32);
gen_columns!(i16, Int16Vector, v, v as i32);
gen_columns!(i32, Int32Vector, v, v as i32);
gen_columns!(i64, Int64Vector, v, v as i64);
gen_columns!(u8, UInt8Vector, v, v as u32);
gen_columns!(u16, UInt16Vector, v, v as u32);
gen_columns!(u32, UInt32Vector, v, v as u32);
gen_columns!(u64, UInt64Vector, v, v as u64);
gen_columns!(f32, Float32Vector, v, v);
gen_columns!(f64, Float64Vector, v, v);
gen_columns!(bool, BooleanVector, v, v);
gen_columns!(binary, BinaryVector, v, v.to_vec());
gen_columns!(string, StringVector, v, v.to_string());
#[macro_export]
macro_rules! gen_put_data {
($key: tt, $builder_type: ty, $vari: ident, $cast: expr) => {
paste! {
pub fn [<gen_put_data_ $key>](column: Column) -> Result<VectorRef> {
let values = column.values.context(EmptyColumnValuesSnafu {})?;
let mut vector_iter = values.[<$key _values>].iter();
let num_rows = column.num_rows as usize;
let mut builder = <$builder_type>::with_capacity(num_rows);
if column.value_null_mask.is_empty() {
(0..num_rows)
.for_each(|_| builder.push(vector_iter.next().map(|$vari| $cast)));
} else {
bit_vec::BitVec::from_bytes(&column.value_null_mask)
.iter()
.take(num_rows)
.for_each(|is_null| {
if is_null {
builder.push(None);
} else {
builder.push(vector_iter.next().map(|$vari| $cast));
}
});
}
Ok(Arc::new(builder.finish()))
}
}
};
}
gen_put_data!(i8, Int8VectorBuilder, v, *v as i8);
gen_put_data!(i16, Int16VectorBuilder, v, *v as i16);
gen_put_data!(i32, Int32VectorBuilder, v, *v);
gen_put_data!(i64, Int64VectorBuilder, v, *v);
gen_put_data!(u8, UInt8VectorBuilder, v, *v as u8);
gen_put_data!(u16, UInt16VectorBuilder, v, *v as u16);
gen_put_data!(u32, UInt32VectorBuilder, v, *v as u32);
gen_put_data!(u64, UInt64VectorBuilder, v, *v as u64);
gen_put_data!(f32, Float32VectorBuilder, v, *v as f32);
gen_put_data!(f64, Float64VectorBuilder, v, *v as f64);
gen_put_data!(bool, BooleanVectorBuilder, v, *v);
gen_put_data!(binary, BinaryVectorBuilder, v, v.as_slice());
gen_put_data!(string, StringVectorBuilder, v, v.as_str());
pub fn gen_columns(vector: &VectorRef) -> Result<Column> {
match vector.data_type() {
ConcreteDataType::Boolean(_) => gen_columns_bool(vector),
ConcreteDataType::Int8(_) => gen_columns_i8(vector),
ConcreteDataType::Int16(_) => gen_columns_i16(vector),
ConcreteDataType::Int32(_) => gen_columns_i32(vector),
ConcreteDataType::Int64(_) => gen_columns_i64(vector),
ConcreteDataType::UInt8(_) => gen_columns_u8(vector),
ConcreteDataType::UInt16(_) => gen_columns_u16(vector),
ConcreteDataType::UInt32(_) => gen_columns_u32(vector),
ConcreteDataType::UInt64(_) => gen_columns_u64(vector),
ConcreteDataType::Float32(_) => gen_columns_f32(vector),
ConcreteDataType::Float64(_) => gen_columns_f64(vector),
ConcreteDataType::Binary(_) => gen_columns_binary(vector),
ConcreteDataType::String(_) => gen_columns_string(vector),
_ => {
unimplemented!() // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
}
}
}
pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Result<VectorRef> {
match data_type {
ConcreteDataType::Boolean(_) => gen_put_data_bool(column),
ConcreteDataType::Int8(_) => gen_put_data_i8(column),
ConcreteDataType::Int16(_) => gen_put_data_i16(column),
ConcreteDataType::Int32(_) => gen_put_data_i32(column),
ConcreteDataType::Int64(_) => gen_put_data_i64(column),
ConcreteDataType::UInt8(_) => gen_put_data_u8(column),
ConcreteDataType::UInt16(_) => gen_put_data_u16(column),
ConcreteDataType::UInt32(_) => gen_put_data_u32(column),
ConcreteDataType::UInt64(_) => gen_put_data_u64(column),
ConcreteDataType::Float32(_) => gen_put_data_f32(column),
ConcreteDataType::Float64(_) => gen_put_data_f64(column),
ConcreteDataType::Binary(_) => gen_put_data_binary(column),
ConcreteDataType::String(_) => gen_put_data_string(column),
_ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc.
}
}

View File

@@ -12,7 +12,7 @@ use crate::background::JobHandle;
use crate::error::{self, Result};
use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet};
use crate::proto::WalHeader;
use crate::proto::wal::WalHeader;
use crate::region::RegionManifest;
use crate::region::SharedDataRef;
use crate::sst::AccessLayerRef;

View File

@@ -13,9 +13,12 @@ use store_api::{
use crate::{
codec::{Decoder, Encoder},
error::{self, Error, Result},
proto::{self, PayloadType, WalHeader},
proto::wal::{self, PayloadType, WalHeader},
write_batch::{
codec::{WriteBatchArrowDecoder, WriteBatchArrowEncoder},
codec::{
WriteBatchArrowDecoder, WriteBatchArrowEncoder, WriteBatchProtobufDecoder,
WriteBatchProtobufEncoder,
},
WriteBatch,
},
};
@@ -78,7 +81,7 @@ impl<S: LogStore> Wal<S> {
header.payload_type = payload.payload_type();
if let Payload::WriteBatchArrow(batch) = payload {
header.mutation_extras = proto::gen_mutation_extras(batch);
header.mutation_extras = wal::gen_mutation_extras(batch);
}
let mut buf = vec![];
@@ -95,10 +98,16 @@ impl<S: LogStore> Wal<S> {
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
} else if let Payload::WriteBatchProto(batch) = payload {
// entry
let encoder = WriteBatchProtobufEncoder {};
// TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf.
encoder
.encode(batch, &mut buf)
.map_err(BoxedError::new)
.context(error::WriteWalSnafu { name: self.name() })?;
}
// TODO(jiachun): encode protobuf payload
// write bytes to wal
self.write(seq, &buf).await
}
@@ -173,7 +182,14 @@ impl<S: LogStore> Wal<S> {
Ok((seq_num, header, Some(write_batch)))
}
Some(PayloadType::WriteBatchProto) => {
todo!("protobuf decoder")
let mutation_extras = std::mem::take(&mut header.mutation_extras);
let decoder = WriteBatchProtobufDecoder::new(mutation_extras);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
.context(error::ReadWalSnafu { name: self.name() })?;
Ok((seq_num, header, Some(write_batch)))
}
_ => error::WalDataCorruptedSnafu {
name: self.name(),

View File

@@ -14,9 +14,12 @@ use datatypes::{
schema::SchemaRef,
vectors::{Int64Vector, VectorRef},
};
use prost::{DecodeError, EncodeError};
use snafu::ensure;
use store_api::storage::{consts, PutOperation, WriteRequest};
use crate::proto;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Duplicate column {} in same request", name))]
@@ -82,6 +85,18 @@ pub enum Error {
source: ArrowError,
},
#[snafu(display("Failed to encode into protobuf, source: {}", source))]
EncodeProtobuf {
backtrace: Backtrace,
source: EncodeError,
},
#[snafu(display("Failed to decode from protobuf, source: {}", source))]
DecodeProtobuf {
backtrace: Backtrace,
source: DecodeError,
},
#[snafu(display("Failed to parse schema, source: {}", source))]
ParseSchema {
backtrace: Backtrace,
@@ -102,6 +117,18 @@ pub enum Error {
backtrace: Backtrace,
source: datatypes::error::Error,
},
#[snafu(display("Failed to convert into protobuf struct, source {}", source))]
ToProtobuf {
source: proto::write_batch::Error,
backtrace: Backtrace,
},
#[snafu(display("Failed to convert from protobuf struct, source {}", source))]
FromProtobuf {
source: proto::write_batch::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -402,9 +429,9 @@ impl PutData {
}
pub mod codec {
use std::{io::Cursor, sync::Arc};
use common_error::prelude::*;
use datatypes::{
arrow::{
chunk::Chunk as ArrowChunk,
@@ -415,24 +442,27 @@ pub mod codec {
},
},
error::Result as DataTypesResult,
schema::Schema,
schema::{Schema, SchemaRef},
vectors::Helper,
};
use snafu::ensure;
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::WriteRequest;
use super::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
Error as WriteBatchError, Mutation, ParseSchemaSnafu, Result, WriteBatch,
Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result,
ToProtobufSnafu, WriteBatch,
};
use crate::proto::{
wal::{MutationExtra, MutationType},
write_batch::{self, gen_columns, gen_put_data_vector},
};
use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData};
use crate::{
arrow_stream::ArrowStreamReader,
codec::{Decoder, Encoder},
};
use crate::{
proto::{MutationExtra, MutationType},
write_batch::PutData,
};
// TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to
// serialize/deserialize all our data.
@@ -491,7 +521,7 @@ pub mod codec {
let valid_ipc_fields = ipc_fields
.iter()
.zip(bit_vec::BitVec::from_bytes(column_null_mask))
.filter(|(_, mask)| !*mask)
.filter(|(_, is_null)| !*is_null)
.map(|(ipc_field, _)| ipc_field.clone())
.collect::<Vec<_>>();
writer
@@ -600,7 +630,7 @@ pub mod codec {
bit_vec::BitVec::from_bytes(&ext.column_null_mask)
.iter()
.zip(column_names.iter())
.filter(|(mask, _)| !*mask)
.filter(|(is_null, _)| !*is_null)
.map(|(_, column_name)| column_name.clone())
.collect::<Vec<_>>()
};
@@ -639,8 +669,138 @@ pub mod codec {
Ok(write_batch)
}
}
}
pub struct WriteBatchProtobufEncoder {}
impl Encoder for WriteBatchProtobufEncoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let schema = item.schema().into();
let mutations = item
.iter()
.map(|mtn| match mtn {
Mutation::Put(put_data) => item
.schema()
.column_schemas()
.iter()
.filter_map(|cs| put_data.column_by_name(&cs.name))
.map(gen_columns)
.collect::<write_batch::Result<Vec<_>>>(),
})
.collect::<write_batch::Result<Vec<_>>>()
.context(ToProtobufSnafu {})?
.into_iter()
.map(|columns| write_batch::Mutation {
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
columns,
})),
})
.collect();
let write_batch = write_batch::WriteBatch {
schema: Some(schema),
mutations,
};
write_batch.encode(dst).context(EncodeProtobufSnafu)
}
}
pub struct WriteBatchProtobufDecoder {
mutation_extras: Vec<MutationExtra>,
}
impl WriteBatchProtobufDecoder {
#[allow(dead_code)]
pub fn new(mutation_extras: Vec<MutationExtra>) -> Self {
Self { mutation_extras }
}
}
impl Decoder for WriteBatchProtobufDecoder {
type Item = WriteBatch;
type Error = WriteBatchError;
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?;
let schema = write_batch.schema.context(DataCorruptedSnafu {
message: "schema required",
})?;
let schema: SchemaRef = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
ensure!(
write_batch.mutations.len() == self.mutation_extras.len(),
DataCorruptedSnafu {
message: &format!(
"expected {} mutations, but got {}",
self.mutation_extras.len(),
write_batch.mutations.len()
)
}
);
let mutations = self
.mutation_extras
.iter()
.zip(write_batch.mutations.into_iter())
.map(|(ext, mtn)| match mtn.mutation {
Some(write_batch::mutation::Mutation::Put(put)) => {
let column_schemas = schema.column_schemas();
let valid_columns = if ext.column_null_mask.is_empty() {
column_schemas
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.collect::<Vec<_>>()
} else {
bit_vec::BitVec::from_bytes(&ext.column_null_mask)
.iter()
.zip(column_schemas.iter())
.filter(|(is_null, _)| !*is_null)
.map(|(_, column)| (column.name.clone(), column.data_type.clone()))
.collect::<Vec<_>>()
};
let mut put_data = PutData::with_num_columns(put.columns.len());
let res = valid_columns
.into_iter()
.zip(put.columns.into_iter())
.map(|((name, data_type), column)| {
gen_put_data_vector(data_type, column).map(|vector| (name, vector))
})
.collect::<write_batch::Result<Vec<_>>>()
.context(FromProtobufSnafu {})?
.into_iter()
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
}
Some(write_batch::mutation::Mutation::Delete(_)) => todo!(),
_ => DataCorruptedSnafu {
message: "invalid mutation type",
}
.fail(),
})
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(schema);
mutations
.into_iter()
.try_for_each(|mutation| match mutation {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
Ok(write_batch)
}
}
}
#[cfg(test)]
mod tests {
use std::iter;
@@ -652,6 +812,7 @@ mod tests {
use super::*;
use crate::codec::{Decoder, Encoder};
use crate::proto;
use crate::proto::wal::MutationExtra;
use crate::test_util::write_batch_util;
#[test]
@@ -873,29 +1034,37 @@ mod tests {
)
}
#[test]
fn test_codec() -> Result<()> {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
fn gen_new_batch_and_extras() -> (WriteBatch, Vec<MutationExtra>) {
let mut batch = new_test_batch();
assert!(batch.is_empty());
batch.put(put_data).unwrap();
assert!(!batch.is_empty());
for i in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None]));
let tsv = Arc::new(Int64Vector::from_vec(vec![i, i, i]));
let encoder = codec::WriteBatchArrowEncoder::new(proto::gen_mutation_extras(&batch));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_value_column("v1", boolv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let extras = proto::wal::gen_mutation_extras(&batch);
(batch, extras)
}
#[test]
fn test_codec_arrow() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras();
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone());
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch));
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
@@ -904,26 +1073,68 @@ mod tests {
}
#[test]
fn test_codec_with_none_column() -> Result<()> {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
fn test_codec_protobuf() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras();
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
let mut batch = new_test_batch();
assert!(batch.is_empty());
batch.put(put_data).unwrap();
assert!(!batch.is_empty());
let encoder = codec::WriteBatchArrowEncoder::new(proto::gen_mutation_extras(&batch));
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch));
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
fn gen_new_batch_and_extras_with_none_column() -> (WriteBatch, Vec<MutationExtra>) {
let mut batch = new_test_batch();
for _ in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0]));
let mut put_data = PutData::new();
put_data.add_key_column("k1", intv.clone()).unwrap();
put_data.add_version_column(intv).unwrap();
put_data.add_key_column("ts", tsv).unwrap();
batch.put(put_data).unwrap();
}
let extras = proto::wal::gen_mutation_extras(&batch);
(batch, extras)
}
#[test]
fn test_codec_with_none_column_arrow() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column();
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone());
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
Ok(())
}
#[test]
fn test_codec_with_none_column_protobuf() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column();
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);