From 8d51ad3429ea79bef5b463a161fb02310bd985db Mon Sep 17 00:00:00 2001 From: egg <92360611+fariygirl@users.noreply.github.com> Date: Tue, 9 Aug 2022 19:57:51 +0800 Subject: [PATCH] feat: write_batch proto codec (#122) * feat: protobuf codec * chore: minor fix * chore: beatify the macro code * chore: minor fix * chore: by cr * chore: by cr and impl wal with proto * bugfix: invalid num_rows for multi put_data in mutations Co-authored-by: jiachun --- Cargo.lock | 1 + src/datatypes/src/macros.rs | 2 +- src/storage/Cargo.toml | 3 +- src/storage/build.rs | 4 +- src/storage/proto/write_batch.proto | 82 +++++++ src/storage/src/proto.rs | 45 +--- src/storage/src/proto/wal.rs | 42 ++++ src/storage/src/proto/write_batch.rs | 306 +++++++++++++++++++++++++++ src/storage/src/region/writer.rs | 2 +- src/storage/src/wal.rs | 28 ++- src/storage/src/write_batch.rs | 297 ++++++++++++++++++++++---- 11 files changed, 715 insertions(+), 97 deletions(-) create mode 100644 src/storage/proto/write_batch.proto create mode 100644 src/storage/src/proto/wal.rs create mode 100644 src/storage/src/proto/write_batch.rs diff --git a/Cargo.lock b/Cargo.lock index 1e4699abb0..c899734a27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3609,6 +3609,7 @@ dependencies = [ "lazy_static", "log-store", "object-store", + "paste", "planus", "prost 0.11.0", "rand 0.8.5", diff --git a/src/datatypes/src/macros.rs b/src/datatypes/src/macros.rs index d1b8d4db77..f194cbf482 100644 --- a/src/datatypes/src/macros.rs +++ b/src/datatypes/src/macros.rs @@ -20,7 +20,7 @@ macro_rules! for_all_scalar_types { } #[macro_export] -macro_rules! for_all_primitive_types{ +macro_rules! for_all_primitive_types { ($macro:tt $(, $x:tt)*) => { $macro! { [$($x),*], diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 6214a1c10d..c1ede084ef 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] arc-swap = "1.0" arrow-format = { version = "0.4", features = ["ipc"] } -async-trait = "0.1" async-stream = "0.3" +async-trait = "0.1" bit-vec = "0.6" bytes = "1.1" common-error = { path = "../common/error" } @@ -22,6 +22,7 @@ futures-util = "0.3" lazy_static = "1.4" log-store = { path = "../log-store" } object-store = { path = "../object-store" } +paste = "1.0" planus = "0.2" prost = "0.11" serde = { version = "1.0", features = ["derive"] } diff --git a/src/storage/build.rs b/src/storage/build.rs index 014d1dfe5c..04107376ed 100644 --- a/src/storage/build.rs +++ b/src/storage/build.rs @@ -1,5 +1,5 @@ fn main() { tonic_build::configure() - .compile(&["proto/wal.proto"], &["."]) - .expect("compile wal proto"); + .compile(&["proto/wal.proto", "proto/write_batch.proto"], &["."]) + .expect("compile proto"); } diff --git a/src/storage/proto/write_batch.proto b/src/storage/proto/write_batch.proto new file mode 100644 index 0000000000..941f1ab790 --- /dev/null +++ b/src/storage/proto/write_batch.proto @@ -0,0 +1,82 @@ +syntax = "proto3"; + +package greptime.storage.write_batch.v1; + +message WriteBatch { + Schema schema = 1; + repeated Mutation mutations = 2; +} + +message Schema { + repeated ColumnSchema column_schemas = 1; + TimestampIndex timestamp_index = 2; +} + +message TimestampIndex { + uint64 value = 1; +} + +message ColumnSchema { + string name = 1; + DataType data_type = 2; + bool is_nullable = 3; +} + +message Mutation { + oneof mutation { + Put put = 1; + Delete delete = 2; + } +} + +message Put { + repeated Column columns = 1; +} + +message Delete { + // TODO(zxy) +} + +message Column { + Values values = 1; + bytes value_null_mask = 2; + uint64 num_rows = 3; +} + +// TODO(jiachun): Enum might be insufficient to represent some composite data type such as list, struct. +// In the future, may be we can refer to https://github.com/apache/arrow/blob/master/format/Schema.fbs#L398 +enum DataType { + NULL = 0; + BOOLEAN = 1; + INT8 = 2; + INT16 = 3; + INT32 = 4; + INT64 = 5; + UINT8 = 6; + UINT16 = 7; + UINT32 = 8; + UINT64 = 9; + FLOAT32 = 10; + FLOAT64 = 11; + STRING = 12; + BINARY = 13; +} + +message Values { + repeated int32 i8_values = 1; + repeated int32 i16_values = 2; + repeated int32 i32_values = 3; + repeated int64 i64_values = 4; + + repeated uint32 u8_values = 5; + repeated uint32 u16_values = 6; + repeated uint32 u32_values = 7; + repeated uint64 u64_values = 8; + + repeated float f32_values = 9; + repeated double f64_values = 10; + + repeated bool bool_values = 11; + repeated bytes binary_values = 12; + repeated string string_values = 13; +} diff --git a/src/storage/src/proto.rs b/src/storage/src/proto.rs index 355623b4a8..f047bdd16c 100644 --- a/src/storage/src/proto.rs +++ b/src/storage/src/proto.rs @@ -1,43 +1,2 @@ -#![allow(clippy::all)] - -tonic::include_proto!("greptime.storage.wal.v1"); - -use crate::write_batch::{Mutation, WriteBatch}; - -pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec { - let column_schemas = write_batch.schema().column_schemas(); - write_batch - .iter() - .map(|m| match m { - Mutation::Put(put) => { - if put.num_columns() == column_schemas.len() { - MutationExtra { - mutation_type: MutationType::Put.into(), - column_null_mask: Default::default(), - } - } else { - let mut column_null_mask = - bit_vec::BitVec::from_elem(column_schemas.len(), false); - for (i, cs) in column_schemas.iter().enumerate() { - if put.column_by_name(&cs.name).is_none() { - column_null_mask.set(i, true); - } - } - MutationExtra { - mutation_type: MutationType::Put.into(), - column_null_mask: column_null_mask.to_bytes(), - } - } - } - }) - .collect::>() -} - -impl WalHeader { - pub fn with_last_manifest_version(last_manifest_version: u64) -> Self { - Self { - last_manifest_version, - ..Default::default() - } - } -} +pub mod wal; +pub mod write_batch; diff --git a/src/storage/src/proto/wal.rs b/src/storage/src/proto/wal.rs new file mode 100644 index 0000000000..150bc8fa15 --- /dev/null +++ b/src/storage/src/proto/wal.rs @@ -0,0 +1,42 @@ +#![allow(clippy::all)] +tonic::include_proto!("greptime.storage.wal.v1"); + +use crate::write_batch::{Mutation, WriteBatch}; + +pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec { + let column_schemas = write_batch.schema().column_schemas(); + write_batch + .iter() + .map(|m| match m { + Mutation::Put(put) => { + if put.num_columns() == column_schemas.len() { + MutationExtra { + mutation_type: MutationType::Put.into(), + column_null_mask: Default::default(), + } + } else { + let mut column_null_mask = + bit_vec::BitVec::from_elem(column_schemas.len(), false); + for (i, cs) in column_schemas.iter().enumerate() { + if put.column_by_name(&cs.name).is_none() { + column_null_mask.set(i, true); + } + } + MutationExtra { + mutation_type: MutationType::Put.into(), + column_null_mask: column_null_mask.to_bytes(), + } + } + } + }) + .collect::>() +} + +impl WalHeader { + pub fn with_last_manifest_version(last_manifest_version: u64) -> Self { + Self { + last_manifest_version, + ..Default::default() + } + } +} diff --git a/src/storage/src/proto/write_batch.rs b/src/storage/src/proto/write_batch.rs new file mode 100644 index 0000000000..e60677a770 --- /dev/null +++ b/src/storage/src/proto/write_batch.rs @@ -0,0 +1,306 @@ +#![allow(clippy::all)] +tonic::include_proto!("greptime.storage.write_batch.v1"); + +use std::sync::Arc; + +use common_error::prelude::*; +use datatypes::schema; +use datatypes::{ + data_type::ConcreteDataType, + prelude::{ScalarVector, ScalarVectorBuilder}, + vectors::{ + BinaryVector, BinaryVectorBuilder, BooleanVector, BooleanVectorBuilder, Float32Vector, + Float32VectorBuilder, Float64Vector, Float64VectorBuilder, Int16Vector, Int16VectorBuilder, + Int32Vector, Int32VectorBuilder, Int64Vector, Int64VectorBuilder, Int8Vector, + Int8VectorBuilder, StringVector, StringVectorBuilder, UInt16Vector, UInt16VectorBuilder, + UInt32Vector, UInt32VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8Vector, + UInt8VectorBuilder, Vector, VectorRef, + }, +}; +use paste::paste; +use snafu::OptionExt; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Failed to convert datafusion type: {}", from))] + Conversion { from: String, backtrace: Backtrace }, + + #[snafu(display("Empty column values read"))] + EmptyColumnValues { backtrace: Backtrace }, + + #[snafu(display("Invalid data type: {}", data_type))] + InvalidDataType { + data_type: i32, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid timestamp index: {}", index))] + InvalidTimestampIndex { + index: usize, + source: datatypes::error::Error, + backtrace: Backtrace, + }, +} + +pub type Result = std::result::Result; + +impl TimestampIndex { + pub fn new(value: u64) -> Self { + Self { value } + } +} + +impl From<&schema::SchemaRef> for Schema { + fn from(schema: &schema::SchemaRef) -> Self { + let column_schemas = schema + .column_schemas() + .iter() + .map(|column_schema| column_schema.into()) + .collect(); + + Schema { + column_schemas, + timestamp_index: schema + .timestamp_index() + .map(|index| TimestampIndex::new(index as u64)), + } + } +} + +impl TryFrom for schema::SchemaRef { + type Error = Error; + + fn try_from(schema: Schema) -> Result { + let column_schemas = schema + .column_schemas + .iter() + .map(schema::ColumnSchema::try_from) + .collect::>>()?; + + let schema: schema::SchemaRef = match schema.timestamp_index { + Some(index) => Arc::new( + schema::Schema::with_timestamp_index(column_schemas, index.value as usize) + .context(InvalidTimestampIndexSnafu { + index: index.value as usize, + })?, + ), + None => Arc::new(schema::Schema::new(column_schemas)), + }; + + Ok(schema) + } +} + +impl From<&schema::ColumnSchema> for ColumnSchema { + fn from(cs: &schema::ColumnSchema) -> Self { + Self { + name: cs.name.clone(), + data_type: DataType::from(&cs.data_type).into(), + is_nullable: cs.is_nullable, + } + } +} + +impl TryFrom<&ColumnSchema> for schema::ColumnSchema { + type Error = Error; + + fn try_from(column_schema: &ColumnSchema) -> Result { + if let Some(data_type) = DataType::from_i32(column_schema.data_type) { + Ok(schema::ColumnSchema::new( + column_schema.name.clone(), + data_type.into(), + column_schema.is_nullable, + )) + } else { + InvalidDataTypeSnafu { + data_type: column_schema.data_type, + } + .fail() + } + } +} + +impl From<&ConcreteDataType> for DataType { + fn from(data_type: &ConcreteDataType) -> Self { + match data_type { + ConcreteDataType::Boolean(_) => DataType::Boolean, + ConcreteDataType::Int8(_) => DataType::Int8, + ConcreteDataType::Int16(_) => DataType::Int16, + ConcreteDataType::Int32(_) => DataType::Int32, + ConcreteDataType::Int64(_) => DataType::Int64, + ConcreteDataType::UInt8(_) => DataType::Uint8, + ConcreteDataType::UInt16(_) => DataType::Uint16, + ConcreteDataType::UInt32(_) => DataType::Uint32, + ConcreteDataType::UInt64(_) => DataType::Uint64, + ConcreteDataType::Float32(_) => DataType::Float64, + ConcreteDataType::Float64(_) => DataType::Float64, + ConcreteDataType::String(_) => DataType::String, + ConcreteDataType::Null(_) => DataType::Null, + ConcreteDataType::Binary(_) => DataType::Binary, + _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. + } + } +} + +impl From for ConcreteDataType { + fn from(data_type: DataType) -> Self { + match data_type { + DataType::Boolean => ConcreteDataType::boolean_datatype(), + DataType::Int8 => ConcreteDataType::int8_datatype(), + DataType::Int16 => ConcreteDataType::int16_datatype(), + DataType::Int32 => ConcreteDataType::int32_datatype(), + DataType::Int64 => ConcreteDataType::int64_datatype(), + DataType::Uint8 => ConcreteDataType::uint8_datatype(), + DataType::Uint16 => ConcreteDataType::uint16_datatype(), + DataType::Uint32 => ConcreteDataType::uint32_datatype(), + DataType::Uint64 => ConcreteDataType::uint64_datatype(), + DataType::Float32 => ConcreteDataType::float32_datatype(), + DataType::Float64 => ConcreteDataType::float64_datatype(), + DataType::String => ConcreteDataType::string_datatype(), + DataType::Binary => ConcreteDataType::binary_datatype(), + DataType::Null => ConcreteDataType::null_datatype(), + } + } +} + +#[macro_export] +macro_rules! gen_columns { + ($key: tt, $vec_ty: ty, $vari: ident, $cast: expr) => { + paste! { + pub fn [](vector: &VectorRef) -> Result { + let mut column = Column::default(); + let mut values = Values::default(); + + let vector_ref = + vector + .as_any() + .downcast_ref::<$vec_ty>() + .with_context(|| ConversionSnafu { + from: std::format!("{:?}", vector.as_ref().data_type()), + })?; + + let mut value_null_mask = bit_vec::BitVec::from_elem(vector_ref.len(), false); + + vector_ref + .iter_data() + .enumerate() + .for_each(|(i, value)| match value { + Some($vari) => values.[<$key _values>].push($cast), + None => value_null_mask.set(i, true), + }); + + let null_mask = if vector_ref.len() == values.[<$key _values>].len() { + vec![] + } else { + value_null_mask.to_bytes() + }; + + column.values = Some(values); + column.value_null_mask = null_mask; + column.num_rows = vector_ref.len() as u64; + + Ok(column) + } + } + }; +} + +gen_columns!(i8, Int8Vector, v, v as i32); +gen_columns!(i16, Int16Vector, v, v as i32); +gen_columns!(i32, Int32Vector, v, v as i32); +gen_columns!(i64, Int64Vector, v, v as i64); +gen_columns!(u8, UInt8Vector, v, v as u32); +gen_columns!(u16, UInt16Vector, v, v as u32); +gen_columns!(u32, UInt32Vector, v, v as u32); +gen_columns!(u64, UInt64Vector, v, v as u64); +gen_columns!(f32, Float32Vector, v, v); +gen_columns!(f64, Float64Vector, v, v); +gen_columns!(bool, BooleanVector, v, v); +gen_columns!(binary, BinaryVector, v, v.to_vec()); +gen_columns!(string, StringVector, v, v.to_string()); + +#[macro_export] +macro_rules! gen_put_data { + ($key: tt, $builder_type: ty, $vari: ident, $cast: expr) => { + paste! { + pub fn [](column: Column) -> Result { + let values = column.values.context(EmptyColumnValuesSnafu {})?; + let mut vector_iter = values.[<$key _values>].iter(); + let num_rows = column.num_rows as usize; + let mut builder = <$builder_type>::with_capacity(num_rows); + if column.value_null_mask.is_empty() { + (0..num_rows) + .for_each(|_| builder.push(vector_iter.next().map(|$vari| $cast))); + } else { + bit_vec::BitVec::from_bytes(&column.value_null_mask) + .iter() + .take(num_rows) + .for_each(|is_null| { + if is_null { + builder.push(None); + } else { + builder.push(vector_iter.next().map(|$vari| $cast)); + } + }); + } + + + Ok(Arc::new(builder.finish())) + } + } + }; +} + +gen_put_data!(i8, Int8VectorBuilder, v, *v as i8); +gen_put_data!(i16, Int16VectorBuilder, v, *v as i16); +gen_put_data!(i32, Int32VectorBuilder, v, *v); +gen_put_data!(i64, Int64VectorBuilder, v, *v); +gen_put_data!(u8, UInt8VectorBuilder, v, *v as u8); +gen_put_data!(u16, UInt16VectorBuilder, v, *v as u16); +gen_put_data!(u32, UInt32VectorBuilder, v, *v as u32); +gen_put_data!(u64, UInt64VectorBuilder, v, *v as u64); +gen_put_data!(f32, Float32VectorBuilder, v, *v as f32); +gen_put_data!(f64, Float64VectorBuilder, v, *v as f64); +gen_put_data!(bool, BooleanVectorBuilder, v, *v); +gen_put_data!(binary, BinaryVectorBuilder, v, v.as_slice()); +gen_put_data!(string, StringVectorBuilder, v, v.as_str()); + +pub fn gen_columns(vector: &VectorRef) -> Result { + match vector.data_type() { + ConcreteDataType::Boolean(_) => gen_columns_bool(vector), + ConcreteDataType::Int8(_) => gen_columns_i8(vector), + ConcreteDataType::Int16(_) => gen_columns_i16(vector), + ConcreteDataType::Int32(_) => gen_columns_i32(vector), + ConcreteDataType::Int64(_) => gen_columns_i64(vector), + ConcreteDataType::UInt8(_) => gen_columns_u8(vector), + ConcreteDataType::UInt16(_) => gen_columns_u16(vector), + ConcreteDataType::UInt32(_) => gen_columns_u32(vector), + ConcreteDataType::UInt64(_) => gen_columns_u64(vector), + ConcreteDataType::Float32(_) => gen_columns_f32(vector), + ConcreteDataType::Float64(_) => gen_columns_f64(vector), + ConcreteDataType::Binary(_) => gen_columns_binary(vector), + ConcreteDataType::String(_) => gen_columns_string(vector), + _ => { + unimplemented!() // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. + } + } +} + +pub fn gen_put_data_vector(data_type: ConcreteDataType, column: Column) -> Result { + match data_type { + ConcreteDataType::Boolean(_) => gen_put_data_bool(column), + ConcreteDataType::Int8(_) => gen_put_data_i8(column), + ConcreteDataType::Int16(_) => gen_put_data_i16(column), + ConcreteDataType::Int32(_) => gen_put_data_i32(column), + ConcreteDataType::Int64(_) => gen_put_data_i64(column), + ConcreteDataType::UInt8(_) => gen_put_data_u8(column), + ConcreteDataType::UInt16(_) => gen_put_data_u16(column), + ConcreteDataType::UInt32(_) => gen_put_data_u32(column), + ConcreteDataType::UInt64(_) => gen_put_data_u64(column), + ConcreteDataType::Float32(_) => gen_put_data_f32(column), + ConcreteDataType::Float64(_) => gen_put_data_f64(column), + ConcreteDataType::Binary(_) => gen_put_data_binary(column), + ConcreteDataType::String(_) => gen_put_data_string(column), + _ => unimplemented!(), // TODO(jiachun): Maybe support some composite types in the future , such as list, struct, etc. + } +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index f3493619c3..2ded3e72d3 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -12,7 +12,7 @@ use crate::background::JobHandle; use crate::error::{self, Result}; use crate::flush::{FlushJob, FlushSchedulerRef, FlushStrategyRef}; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableSet}; -use crate::proto::WalHeader; +use crate::proto::wal::WalHeader; use crate::region::RegionManifest; use crate::region::SharedDataRef; use crate::sst::AccessLayerRef; diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 01f1c5343c..6341878389 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -13,9 +13,12 @@ use store_api::{ use crate::{ codec::{Decoder, Encoder}, error::{self, Error, Result}, - proto::{self, PayloadType, WalHeader}, + proto::wal::{self, PayloadType, WalHeader}, write_batch::{ - codec::{WriteBatchArrowDecoder, WriteBatchArrowEncoder}, + codec::{ + WriteBatchArrowDecoder, WriteBatchArrowEncoder, WriteBatchProtobufDecoder, + WriteBatchProtobufEncoder, + }, WriteBatch, }, }; @@ -78,7 +81,7 @@ impl Wal { header.payload_type = payload.payload_type(); if let Payload::WriteBatchArrow(batch) = payload { - header.mutation_extras = proto::gen_mutation_extras(batch); + header.mutation_extras = wal::gen_mutation_extras(batch); } let mut buf = vec![]; @@ -95,10 +98,16 @@ impl Wal { .encode(batch, &mut buf) .map_err(BoxedError::new) .context(error::WriteWalSnafu { name: self.name() })?; + } else if let Payload::WriteBatchProto(batch) = payload { + // entry + let encoder = WriteBatchProtobufEncoder {}; + // TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf. + encoder + .encode(batch, &mut buf) + .map_err(BoxedError::new) + .context(error::WriteWalSnafu { name: self.name() })?; } - // TODO(jiachun): encode protobuf payload - // write bytes to wal self.write(seq, &buf).await } @@ -173,7 +182,14 @@ impl Wal { Ok((seq_num, header, Some(write_batch))) } Some(PayloadType::WriteBatchProto) => { - todo!("protobuf decoder") + let mutation_extras = std::mem::take(&mut header.mutation_extras); + let decoder = WriteBatchProtobufDecoder::new(mutation_extras); + let write_batch = decoder + .decode(&input[data_pos..]) + .map_err(BoxedError::new) + .context(error::ReadWalSnafu { name: self.name() })?; + + Ok((seq_num, header, Some(write_batch))) } _ => error::WalDataCorruptedSnafu { name: self.name(), diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 4bc6d32614..b3aa725968 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -14,9 +14,12 @@ use datatypes::{ schema::SchemaRef, vectors::{Int64Vector, VectorRef}, }; +use prost::{DecodeError, EncodeError}; use snafu::ensure; use store_api::storage::{consts, PutOperation, WriteRequest}; +use crate::proto; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Duplicate column {} in same request", name))] @@ -82,6 +85,18 @@ pub enum Error { source: ArrowError, }, + #[snafu(display("Failed to encode into protobuf, source: {}", source))] + EncodeProtobuf { + backtrace: Backtrace, + source: EncodeError, + }, + + #[snafu(display("Failed to decode from protobuf, source: {}", source))] + DecodeProtobuf { + backtrace: Backtrace, + source: DecodeError, + }, + #[snafu(display("Failed to parse schema, source: {}", source))] ParseSchema { backtrace: Backtrace, @@ -102,6 +117,18 @@ pub enum Error { backtrace: Backtrace, source: datatypes::error::Error, }, + + #[snafu(display("Failed to convert into protobuf struct, source {}", source))] + ToProtobuf { + source: proto::write_batch::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to convert from protobuf struct, source {}", source))] + FromProtobuf { + source: proto::write_batch::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -402,9 +429,9 @@ impl PutData { } pub mod codec { + use std::{io::Cursor, sync::Arc}; - use common_error::prelude::*; use datatypes::{ arrow::{ chunk::Chunk as ArrowChunk, @@ -415,24 +442,27 @@ pub mod codec { }, }, error::Result as DataTypesResult, - schema::Schema, + schema::{Schema, SchemaRef}, vectors::Helper, }; - use snafu::ensure; + use prost::Message; + use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::WriteRequest; use super::{ DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, - Error as WriteBatchError, Mutation, ParseSchemaSnafu, Result, WriteBatch, + Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result, + ToProtobufSnafu, WriteBatch, }; + use crate::proto::{ + wal::{MutationExtra, MutationType}, + write_batch::{self, gen_columns, gen_put_data_vector}, + }; + use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData}; use crate::{ arrow_stream::ArrowStreamReader, codec::{Decoder, Encoder}, }; - use crate::{ - proto::{MutationExtra, MutationType}, - write_batch::PutData, - }; // TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to // serialize/deserialize all our data. @@ -491,7 +521,7 @@ pub mod codec { let valid_ipc_fields = ipc_fields .iter() .zip(bit_vec::BitVec::from_bytes(column_null_mask)) - .filter(|(_, mask)| !*mask) + .filter(|(_, is_null)| !*is_null) .map(|(ipc_field, _)| ipc_field.clone()) .collect::>(); writer @@ -600,7 +630,7 @@ pub mod codec { bit_vec::BitVec::from_bytes(&ext.column_null_mask) .iter() .zip(column_names.iter()) - .filter(|(mask, _)| !*mask) + .filter(|(is_null, _)| !*is_null) .map(|(_, column_name)| column_name.clone()) .collect::>() }; @@ -639,8 +669,138 @@ pub mod codec { Ok(write_batch) } } -} + pub struct WriteBatchProtobufEncoder {} + + impl Encoder for WriteBatchProtobufEncoder { + type Item = WriteBatch; + type Error = WriteBatchError; + + fn encode(&self, item: &WriteBatch, dst: &mut Vec) -> Result<()> { + let schema = item.schema().into(); + + let mutations = item + .iter() + .map(|mtn| match mtn { + Mutation::Put(put_data) => item + .schema() + .column_schemas() + .iter() + .filter_map(|cs| put_data.column_by_name(&cs.name)) + .map(gen_columns) + .collect::>>(), + }) + .collect::>>() + .context(ToProtobufSnafu {})? + .into_iter() + .map(|columns| write_batch::Mutation { + mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put { + columns, + })), + }) + .collect(); + + let write_batch = write_batch::WriteBatch { + schema: Some(schema), + mutations, + }; + + write_batch.encode(dst).context(EncodeProtobufSnafu) + } + } + + pub struct WriteBatchProtobufDecoder { + mutation_extras: Vec, + } + + impl WriteBatchProtobufDecoder { + #[allow(dead_code)] + pub fn new(mutation_extras: Vec) -> Self { + Self { mutation_extras } + } + } + + impl Decoder for WriteBatchProtobufDecoder { + type Item = WriteBatch; + type Error = WriteBatchError; + + fn decode(&self, src: &[u8]) -> Result { + let write_batch = write_batch::WriteBatch::decode(src).context(DecodeProtobufSnafu)?; + + let schema = write_batch.schema.context(DataCorruptedSnafu { + message: "schema required", + })?; + + let schema: SchemaRef = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?; + + ensure!( + write_batch.mutations.len() == self.mutation_extras.len(), + DataCorruptedSnafu { + message: &format!( + "expected {} mutations, but got {}", + self.mutation_extras.len(), + write_batch.mutations.len() + ) + } + ); + + let mutations = self + .mutation_extras + .iter() + .zip(write_batch.mutations.into_iter()) + .map(|(ext, mtn)| match mtn.mutation { + Some(write_batch::mutation::Mutation::Put(put)) => { + let column_schemas = schema.column_schemas(); + let valid_columns = if ext.column_null_mask.is_empty() { + column_schemas + .iter() + .map(|column| (column.name.clone(), column.data_type.clone())) + .collect::>() + } else { + bit_vec::BitVec::from_bytes(&ext.column_null_mask) + .iter() + .zip(column_schemas.iter()) + .filter(|(is_null, _)| !*is_null) + .map(|(_, column)| (column.name.clone(), column.data_type.clone())) + .collect::>() + }; + + let mut put_data = PutData::with_num_columns(put.columns.len()); + + let res = valid_columns + .into_iter() + .zip(put.columns.into_iter()) + .map(|((name, data_type), column)| { + gen_put_data_vector(data_type, column).map(|vector| (name, vector)) + }) + .collect::>>() + .context(FromProtobufSnafu {})? + .into_iter() + .map(|(name, vector)| put_data.add_column_by_name(&name, vector)) + .collect::>>(); + + res.map(|_| Mutation::Put(put_data)) + } + Some(write_batch::mutation::Mutation::Delete(_)) => todo!(), + _ => DataCorruptedSnafu { + message: "invalid mutation type", + } + .fail(), + }) + .collect::>>()?; + + let mut write_batch = WriteBatch::new(schema); + + mutations + .into_iter() + .try_for_each(|mutation| match mutation { + Mutation::Put(put_data) => write_batch.put(put_data), + })?; + + Ok(write_batch) + } + } +} #[cfg(test)] mod tests { use std::iter; @@ -652,6 +812,7 @@ mod tests { use super::*; use crate::codec::{Decoder, Encoder}; use crate::proto; + use crate::proto::wal::MutationExtra; use crate::test_util::write_batch_util; #[test] @@ -873,29 +1034,37 @@ mod tests { ) } - #[test] - fn test_codec() -> Result<()> { - let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); - let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])); - let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); - - let mut put_data = PutData::new(); - put_data.add_key_column("k1", intv.clone()).unwrap(); - put_data.add_version_column(intv).unwrap(); - put_data.add_value_column("v1", boolv).unwrap(); - put_data.add_key_column("ts", tsv).unwrap(); - + fn gen_new_batch_and_extras() -> (WriteBatch, Vec) { let mut batch = new_test_batch(); - assert!(batch.is_empty()); - batch.put(put_data).unwrap(); - assert!(!batch.is_empty()); + for i in 0..10 { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + let boolv = Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])); + let tsv = Arc::new(Int64Vector::from_vec(vec![i, i, i])); - let encoder = codec::WriteBatchArrowEncoder::new(proto::gen_mutation_extras(&batch)); + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_value_column("v1", boolv).unwrap(); + put_data.add_key_column("ts", tsv).unwrap(); + + batch.put(put_data).unwrap(); + } + + let extras = proto::wal::gen_mutation_extras(&batch); + + (batch, extras) + } + + #[test] + fn test_codec_arrow() -> Result<()> { + let (batch, mutation_extras) = gen_new_batch_and_extras(); + + let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone()); let mut dst = vec![]; let result = encoder.encode(&batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch)); + let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); @@ -904,26 +1073,68 @@ mod tests { } #[test] - fn test_codec_with_none_column() -> Result<()> { - let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); - let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); + fn test_codec_protobuf() -> Result<()> { + let (batch, mutation_extras) = gen_new_batch_and_extras(); - let mut put_data = PutData::new(); - put_data.add_key_column("k1", intv.clone()).unwrap(); - put_data.add_version_column(intv).unwrap(); - put_data.add_key_column("ts", tsv).unwrap(); - - let mut batch = new_test_batch(); - assert!(batch.is_empty()); - batch.put(put_data).unwrap(); - assert!(!batch.is_empty()); - - let encoder = codec::WriteBatchArrowEncoder::new(proto::gen_mutation_extras(&batch)); + let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; let result = encoder.encode(&batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchArrowDecoder::new(proto::gen_mutation_extras(&batch)); + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras); + let result = decoder.decode(&dst); + let batch2 = result?; + assert_eq!(batch.num_rows, batch2.num_rows); + + Ok(()) + } + + fn gen_new_batch_and_extras_with_none_column() -> (WriteBatch, Vec) { + let mut batch = new_test_batch(); + for _ in 0..10 { + let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); + let tsv = Arc::new(Int64Vector::from_vec(vec![0, 0, 0])); + + let mut put_data = PutData::new(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_key_column("ts", tsv).unwrap(); + + batch.put(put_data).unwrap(); + } + + let extras = proto::wal::gen_mutation_extras(&batch); + + (batch, extras) + } + + #[test] + fn test_codec_with_none_column_arrow() -> Result<()> { + let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column(); + + let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone()); + let mut dst = vec![]; + let result = encoder.encode(&batch, &mut dst); + assert!(result.is_ok()); + + let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras); + let result = decoder.decode(&dst); + let batch2 = result?; + assert_eq!(batch.num_rows, batch2.num_rows); + + Ok(()) + } + + #[test] + fn test_codec_with_none_column_protobuf() -> Result<()> { + let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column(); + + let encoder = codec::WriteBatchProtobufEncoder {}; + let mut dst = vec![]; + let result = encoder.encode(&batch, &mut dst); + assert!(result.is_ok()); + + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows);