refactor: Remove column_null_mask in MutationExtra (#314)

* refactor: Remove column_null_mask in MutationExtra

MutationExtra::column_null_mask is no longer needed as we could ensure
there is no missing column in WriteBatch.

* feat(storage): Remove MutationExtra

Just stores MutationType in the WalHeader, no longer needs MutationExtra
This commit is contained in:
Yingwen
2022-10-24 14:53:35 +08:00
committed by GitHub
parent b650656ae3
commit a457c49d99
12 changed files with 188 additions and 532 deletions

View File

@@ -13,8 +13,8 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
etcd-client = "0.10"
futures = "0.3"
http-body = "0.4"
h2 = "0.3"
http-body = "0.4"
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }

View File

@@ -40,9 +40,9 @@ axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", br
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
mysql_async = { git = "https://github.com/Morranto/mysql_async.git", rev = "127b538" }
query = { path = "../query" }
rand = "0.8"
script = { path = "../script", features = ["python"] }
query = { path = "../query" }
table = { path = "../table" }
tokio-postgres = "0.7"
tokio-test = "0.4"

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use storage::codec::{Decoder, Encoder};
use storage::write_batch::{codec, WriteBatch};
use super::util::gen_new_batch_and_extras;
use crate::wal::util::gen_new_batch_and_types;
/*
-------------------------------------
@@ -18,12 +18,8 @@ rows | protobuf | arrow |
------------------------------------
*/
fn encode_arrow(
batch: &WriteBatch,
mutation_extras: &[storage::proto::wal::MutationExtra],
dst: &mut Vec<u8>,
) {
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
fn encode_arrow(batch: &WriteBatch, dst: &mut Vec<u8>) {
let encoder = codec::WriteBatchArrowEncoder::new();
let result = encoder.encode(batch, dst);
assert!(result.is_ok());
}
@@ -34,22 +30,22 @@ fn encode_protobuf(batch: &WriteBatch, dst: &mut Vec<u8>) {
assert!(result.is_ok());
}
fn decode_arrow(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) {
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec());
fn decode_arrow(dst: &[u8], mutation_types: &[i32]) {
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec());
let result = decoder.decode(dst);
assert!(result.is_ok());
}
fn decode_protobuf(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) {
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec());
fn decode_protobuf(dst: &[u8], mutation_types: &[i32]) {
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec());
let result = decoder.decode(dst);
assert!(result.is_ok());
}
fn bench_wal_decode(c: &mut Criterion) {
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
let (batch_10, types_10) = gen_new_batch_and_types(1);
let (batch_100, types_100) = gen_new_batch_and_types(10);
let (batch_10000, types_10000) = gen_new_batch_and_types(100);
let mut dst_protobuf_10 = vec![];
let mut dst_protobuf_100 = vec![];
let mut dst_protobuf_10000 = vec![];
@@ -62,28 +58,28 @@ fn bench_wal_decode(c: &mut Criterion) {
encode_protobuf(&batch_100, &mut dst_protobuf_100);
encode_protobuf(&batch_10000, &mut dst_protobuf_10000);
encode_arrow(&batch_10, &extras_10, &mut dst_arrow_10);
encode_arrow(&batch_100, &extras_100, &mut dst_arrow_100);
encode_arrow(&batch_10000, &extras_10000, &mut dst_arrow_10000);
encode_arrow(&batch_10, &mut dst_arrow_10);
encode_arrow(&batch_100, &mut dst_arrow_100);
encode_arrow(&batch_10000, &mut dst_arrow_10000);
let mut group = c.benchmark_group("wal_decode");
group.bench_function("protobuf_decode_with_10_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_10, &extras_10))
b.iter(|| decode_protobuf(&dst_protobuf_10, &types_10))
});
group.bench_function("protobuf_decode_with_100_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_100, &extras_100))
b.iter(|| decode_protobuf(&dst_protobuf_100, &types_100))
});
group.bench_function("protobuf_decode_with_10000_num_rows", |b| {
b.iter(|| decode_protobuf(&dst_protobuf_10000, &extras_10000))
b.iter(|| decode_protobuf(&dst_protobuf_10000, &types_10000))
});
group.bench_function("arrow_decode_with_10_num_rows", |b| {
b.iter(|| decode_arrow(&dst_arrow_10, &extras_10))
b.iter(|| decode_arrow(&dst_arrow_10, &types_10))
});
group.bench_function("arrow_decode_with_100_num_rows", |b| {
b.iter(|| decode_arrow(&dst_arrow_100, &extras_100))
b.iter(|| decode_arrow(&dst_arrow_100, &types_100))
});
group.bench_function("arrow_decode_with_10000_num_rows", |b| {
b.iter(|| decode_arrow(&dst_arrow_10000, &extras_10000))
b.iter(|| decode_arrow(&dst_arrow_10000, &types_10000))
});
group.finish();
}

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use storage::codec::Encoder;
use storage::write_batch::{codec, WriteBatch};
use super::util::gen_new_batch_and_extras;
use crate::wal::util::gen_new_batch_and_types;
/*
-------------------------------------
@@ -18,14 +18,14 @@ rows | protobuf | arrow |
------------------------------------
*/
fn encode_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
fn encode_arrow(batch: &WriteBatch) {
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
assert!(result.is_ok());
}
fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal::MutationExtra]) {
fn encode_protobuf(batch: &WriteBatch) {
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
@@ -33,28 +33,28 @@ fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal::
}
fn bench_wal_encode(c: &mut Criterion) {
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
let (batch_10, _) = gen_new_batch_and_types(1);
let (batch_100, _) = gen_new_batch_and_types(10);
let (batch_10000, _) = gen_new_batch_and_types(100);
let mut group = c.benchmark_group("wal_encode");
group.bench_function("protobuf_encode_with_10_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_10, &extras_10))
b.iter(|| encode_protobuf(&batch_10))
});
group.bench_function("protobuf_encode_with_100_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_100, &extras_100))
b.iter(|| encode_protobuf(&batch_100))
});
group.bench_function("protobuf_encode_with_10000_num_rows", |b| {
b.iter(|| encode_protobuf(&batch_10000, &extras_10000))
b.iter(|| encode_protobuf(&batch_10000))
});
group.bench_function("arrow_encode_with_10_num_rows", |b| {
b.iter(|| encode_arrow(&batch_10, &extras_10))
b.iter(|| encode_arrow(&batch_10))
});
group.bench_function("arrow_encode_with_100_num_rows", |b| {
b.iter(|| encode_arrow(&batch_100, &extras_100))
b.iter(|| encode_arrow(&batch_100))
});
group.bench_function("arrow_encode_with_10000_num_rows", |b| {
b.iter(|| encode_arrow(&batch_10000, &extras_10000))
b.iter(|| encode_arrow(&batch_10000))
});
group.finish();
}

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use storage::codec::{Decoder, Encoder};
use storage::write_batch::{codec, WriteBatch};
use super::util::gen_new_batch_and_extras;
use crate::wal::util::gen_new_batch_and_types;
/*
-------------------------------------
@@ -18,50 +18,50 @@ rows | protobuf | arrow |
------------------------------------
*/
fn codec_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
fn codec_arrow(batch: &WriteBatch, mutation_types: &[i32]) {
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec());
let result = decoder.decode(&dst);
assert!(result.is_ok());
}
fn codec_protobuf(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
fn codec_protobuf(batch: &WriteBatch, mutation_types: &[i32]) {
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec());
let result = decoder.decode(&dst);
assert!(result.is_ok());
}
fn bench_wal_encode_decode(c: &mut Criterion) {
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
let (batch_10, types_10) = gen_new_batch_and_types(1);
let (batch_100, types_100) = gen_new_batch_and_types(10);
let (batch_10000, types_10000) = gen_new_batch_and_types(100);
let mut group = c.benchmark_group("wal_encode_decode");
group.bench_function("protobuf_encode_decode_with_10_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_10, &extras_10))
b.iter(|| codec_protobuf(&batch_10, &types_10))
});
group.bench_function("protobuf_encode_decode_with_100_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_100, &extras_100))
b.iter(|| codec_protobuf(&batch_100, &types_100))
});
group.bench_function("protobuf_encode_decode_with_10000_num_rows", |b| {
b.iter(|| codec_protobuf(&batch_10000, &extras_10000))
b.iter(|| codec_protobuf(&batch_10000, &types_10000))
});
group.bench_function("arrow_encode_decode_with_10_num_rows", |b| {
b.iter(|| codec_arrow(&batch_10, &extras_10))
b.iter(|| codec_arrow(&batch_10, &types_10))
});
group.bench_function("arrow_encode_decode_with_100_num_rows", |b| {
b.iter(|| codec_arrow(&batch_100, &extras_100))
b.iter(|| codec_arrow(&batch_100, &types_100))
});
group.bench_function("arrow_encode_decode_with_10000_num_rows", |b| {
b.iter(|| codec_arrow(&batch_10000, &extras_10000))
b.iter(|| codec_arrow(&batch_10000, &types_10000))
});
group.finish();
}

View File

@@ -33,9 +33,7 @@ pub fn new_test_batch() -> WriteBatch {
)
}
pub fn gen_new_batch_and_extras(
putdate_nums: usize,
) -> (WriteBatch, Vec<storage::proto::wal::MutationExtra>) {
pub fn gen_new_batch_and_types(putdate_nums: usize) -> (WriteBatch, Vec<i32>) {
let mut batch = new_test_batch();
let mut rng = rand::thread_rng();
for _ in 0..putdate_nums {
@@ -78,6 +76,6 @@ pub fn gen_new_batch_and_extras(
put_data.add_key_column("10", svs.clone()).unwrap();
batch.put(put_data).unwrap();
}
let extras = proto::wal::gen_mutation_extras(&batch);
(batch, extras)
let types = proto::wal::gen_mutation_types(&batch);
(batch, types)
}

View File

@@ -5,7 +5,8 @@ package greptime.storage.wal.v1;
message WalHeader {
PayloadType payload_type = 1;
uint64 last_manifest_version = 2;
repeated MutationExtra mutation_extras = 3;
// Type of each mutation in payload, now only arrow payload uses this field.
repeated MutationType mutation_types = 3;
}
enum PayloadType {
@@ -16,7 +17,6 @@ enum PayloadType {
message MutationExtra {
MutationType mutation_type = 1;
bytes column_null_mask = 2;
}
enum MutationType {

View File

@@ -1,226 +0,0 @@
//! Forked from [arrow2](https://github.com/jorgecarleitao/arrow2/blob/v0.10.1/src/io/ipc/read/stream.rs),
//! and I made a slight change because arrow2 can only use the same schema to read all data chunks,
//! which doesn't solve the none column problem, so I added a `column_null_mask` parameter to the
//! `StreamReader#maybe_next` method to solve the none column problem.
use std::io::Read;
use arrow_format::{self, ipc::planus::ReadAsRoot};
use common_base::BitVec;
use datatypes::arrow::{
datatypes::Schema,
error::{ArrowError, Result},
io::ipc::{
read::{read_dictionary, read_record_batch, Dictionaries, StreamMetadata, StreamState},
IpcSchema,
},
};
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
pub struct ArrowStreamReader<R: Read> {
reader: R,
metadata: StreamMetadata,
dictionaries: Dictionaries,
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
}
impl<R: Read> ArrowStreamReader<R> {
pub fn new(reader: R, metadata: StreamMetadata) -> Self {
Self {
reader,
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
}
}
/// Return the schema of the stream
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
/// Check if the stream is finished
pub fn is_finished(&self) -> bool {
self.finished
}
/// Check if the stream is exactly finished
pub fn check_exactly_finished(&mut self) -> Result<bool> {
if self.is_finished() {
return Ok(false);
}
let _ = self.maybe_next(&[])?;
Ok(self.is_finished())
}
pub fn maybe_next(&mut self, column_null_mask: &[u8]) -> Result<Option<StreamState>> {
if self.finished {
return Ok(None);
}
let batch = if column_null_mask.is_empty() {
read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
)?
} else {
read_next(
&mut self.reader,
&valid_metadata(&self.metadata, column_null_mask),
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
)?
};
if batch.is_none() {
self.finished = true;
}
Ok(batch)
}
}
fn valid_metadata(metadata: &StreamMetadata, column_null_mask: &[u8]) -> StreamMetadata {
let column_null_mask = BitVec::from_slice(column_null_mask);
let schema = Schema::from(
metadata
.schema
.fields
.iter()
.zip(&column_null_mask)
.filter(|(_, is_null)| !**is_null)
.map(|(field, _)| field.clone())
.collect::<Vec<_>>(),
)
.with_metadata(metadata.schema.metadata.clone());
let ipc_schema = IpcSchema {
fields: metadata
.ipc_schema
.fields
.iter()
.zip(&column_null_mask)
.filter(|(_, is_null)| !**is_null)
.map(|(ipc_field, _)| ipc_field.clone())
.collect::<Vec<_>>(),
is_little_endian: metadata.ipc_schema.is_little_endian,
};
StreamMetadata {
schema,
version: metadata.version,
ipc_schema,
}
}
fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries: &mut Dictionaries,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
) -> Result<Option<StreamState>> {
// determine metadata length
let mut meta_length: [u8; 4] = [0; 4];
match reader.read_exact(&mut meta_length) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
// Handle EOF without the "0xFFFFFFFF 0x00000000"
// valid according to:
// https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format
Ok(Some(StreamState::Waiting))
} else {
Err(ArrowError::from(e))
};
}
}
let meta_length = {
// If a continuation marker is encountered, skip over it and read
// the size from the next four bytes.
if meta_length == CONTINUATION_MARKER {
reader.read_exact(&mut meta_length)?;
}
i32::from_le_bytes(meta_length) as usize
};
if meta_length == 0 {
// the stream has ended, mark the reader as finished
return Ok(None);
}
message_buffer.clear();
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;
let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let header = message.header()?.ok_or_else(|| {
ArrowError::OutOfSpec(
"IPC: unable to fetch the message header. The file or stream is corrupted.".to_string(),
)
})?;
match header {
arrow_format::ipc::MessageHeaderRef::Schema(_) => {
Err(ArrowError::OutOfSpec("A stream ".to_string()))
}
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
// read the block that makes up the record batch into a buffer
data_buffer.clear();
data_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(data_buffer)?;
let mut reader = std::io::Cursor::new(data_buffer);
read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
None,
dictionaries,
metadata.version,
&mut reader,
0,
)
.map(|x| Some(StreamState::Some(x)))
}
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
// read the block that makes up the dictionary batch into a buffer
let mut buf = vec![0; message.body_length()? as usize];
reader.read_exact(&mut buf)?;
let mut dict_reader = std::io::Cursor::new(buf);
read_dictionary(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
dictionaries,
&mut dict_reader,
0,
)?;
// read the next message until we encounter a RecordBatch message
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
}
t => Err(ArrowError::OutOfSpec(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
))),
}
}

View File

@@ -1,6 +1,5 @@
//! Storage engine implementation.
#![feature(map_first_last)]
mod arrow_stream;
mod background;
mod chunk;
pub mod codec;

View File

@@ -1,34 +1,13 @@
#![allow(clippy::all)]
tonic::include_proto!("greptime.storage.wal.v1");
use common_base::BitVec;
use crate::write_batch::{Mutation, WriteBatch};
pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec<MutationExtra> {
let column_schemas = write_batch.schema().column_schemas();
pub fn gen_mutation_types(write_batch: &WriteBatch) -> Vec<i32> {
write_batch
.iter()
.map(|m| match m {
Mutation::Put(put) => {
if put.num_columns() == column_schemas.len() {
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: Default::default(),
}
} else {
let mut column_null_mask = BitVec::repeat(false, column_schemas.len());
for (i, cs) in column_schemas.iter().enumerate() {
if put.column_by_name(&cs.name).is_none() {
column_null_mask.set(i, true);
}
}
MutationExtra {
mutation_type: MutationType::Put.into(),
column_null_mask: column_null_mask.into_vec(),
}
}
}
Mutation::Put(_) => MutationType::Put.into(),
})
.collect::<Vec<_>>()
}

View File

@@ -72,7 +72,7 @@ impl<S: LogStore> Wal<S> {
/// v v
/// +---------------------+----------------------------------------------------+--------------+-------------+--------------+
/// | | Header | | | |
/// | Header Len(varint) | (last_manifest_version + mutation_extras + ...) | Data Chunk0 | Data Chunk1 | ... |
/// | Header Len(varint) | (last_manifest_version + mutation_types + ...) | Data Chunk0 | Data Chunk1 | ... |
/// | | | | | |
/// +---------------------+----------------------------------------------------+--------------+-------------+--------------+
/// ```
@@ -84,9 +84,8 @@ impl<S: LogStore> Wal<S> {
payload: Payload<'_>,
) -> Result<(u64, usize)> {
header.payload_type = payload.payload_type();
if let Payload::WriteBatchArrow(batch) = payload {
header.mutation_extras = wal::gen_mutation_extras(batch);
header.mutation_types = wal::gen_mutation_types(batch);
}
let mut buf = vec![];
@@ -97,7 +96,7 @@ impl<S: LogStore> Wal<S> {
if let Payload::WriteBatchArrow(batch) = payload {
// entry
let encoder = WriteBatchArrowEncoder::new(header.mutation_extras);
let encoder = WriteBatchArrowEncoder::new();
// TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf.
encoder
.encode(batch, &mut buf)
@@ -185,8 +184,8 @@ impl<S: LogStore> Wal<S> {
match PayloadType::from_i32(header.payload_type) {
Some(PayloadType::None) => Ok((seq_num, header, None)),
Some(PayloadType::WriteBatchArrow) => {
let mutation_extras = std::mem::take(&mut header.mutation_extras);
let decoder = WriteBatchArrowDecoder::new(mutation_extras);
let mutation_types = std::mem::take(&mut header.mutation_types);
let decoder = WriteBatchArrowDecoder::new(mutation_types);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
@@ -197,8 +196,8 @@ impl<S: LogStore> Wal<S> {
Ok((seq_num, header, Some(write_batch)))
}
Some(PayloadType::WriteBatchProto) => {
let mutation_extras = std::mem::take(&mut header.mutation_extras);
let decoder = WriteBatchProtobufDecoder::new(mutation_extras);
let mutation_types = std::mem::take(&mut header.mutation_types);
let decoder = WriteBatchProtobufDecoder::new(mutation_types);
let write_batch = decoder
.decode(&input[data_pos..])
.map_err(BoxedError::new)
@@ -320,7 +319,7 @@ mod tests {
let wal_header = WalHeader {
payload_type: 1,
last_manifest_version: 99999999,
mutation_extras: vec![],
mutation_types: vec![],
};
let mut buf: Vec<u8> = vec![];

View File

@@ -105,7 +105,7 @@ pub enum Error {
},
#[snafu(display("Failed to decode, in stream waiting state"))]
StreamWaiting,
StreamWaiting { backtrace: Backtrace },
#[snafu(display("Failed to decode, corrupted data {}", message))]
DataCorrupted {
@@ -477,17 +477,14 @@ pub mod codec {
use std::{io::Cursor, sync::Arc};
use common_base::BitVec;
use datatypes::{
arrow::{
chunk::Chunk as ArrowChunk,
io::ipc::{
self,
read::{self, StreamState},
read::{self, StreamReader, StreamState},
write::{StreamWriter, WriteOptions},
},
},
error::Result as DataTypesResult,
schema::{Schema, SchemaRef},
vectors::Helper,
};
@@ -495,32 +492,26 @@ pub mod codec {
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::WriteRequest;
use super::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result,
ToProtobufSnafu, WriteBatch,
};
use crate::codec::{Decoder, Encoder};
use crate::proto::{
wal::{MutationExtra, MutationType},
wal::MutationType,
write_batch::{self, gen_columns, gen_put_data_vector},
};
use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData};
use crate::{
arrow_stream::ArrowStreamReader,
codec::{Decoder, Encoder},
use crate::write_batch::{
DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu,
Error as WriteBatchError, FromProtobufSnafu, MissingColumnSnafu, Mutation,
ParseSchemaSnafu, Result, StreamWaitingSnafu, ToProtobufSnafu, WriteBatch,
};
use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData};
// TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to
// serialize/deserialize all our data.
// And we can make a comparison with protobuf, including performance, storage cost,
// TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost,
// CPU consumption, etc
pub struct WriteBatchArrowEncoder {
mutation_extras: Vec<MutationExtra>,
}
#[derive(Default)]
pub struct WriteBatchArrowEncoder {}
impl WriteBatchArrowEncoder {
pub fn new(mutation_extras: Vec<MutationExtra>) -> Self {
Self { mutation_extras }
pub fn new() -> Self {
Self::default()
}
}
@@ -529,52 +520,36 @@ pub mod codec {
type Error = WriteBatchError;
fn encode(&self, item: &WriteBatch, dst: &mut Vec<u8>) -> Result<()> {
let schema = item.schema().arrow_schema();
let column_names = item
.schema()
.column_schemas()
.iter()
.map(|column_schema| column_schema.name.clone())
.collect::<Vec<_>>();
let data = item
.iter()
.zip(self.mutation_extras.iter())
.map(|(mtn, ext)| match mtn {
Mutation::Put(put) => {
let arrays = column_names
.iter()
.filter_map(|column_name| put.column_by_name(column_name))
.map(|vector| vector.to_arrow_array())
.collect::<Vec<_>>();
(arrays, &ext.column_null_mask)
}
});
let item_schema = item.schema();
let arrow_schema = item_schema.arrow_schema();
let opts = WriteOptions { compression: None };
let mut writer = StreamWriter::new(dst, opts);
let ipc_fields = ipc::write::default_ipc_fields(&schema.fields);
writer
.start(schema, Some(ipc_fields.clone()))
.context(EncodeArrowSnafu)?;
for (arrays, column_null_mask) in data {
let chunk = ArrowChunk::try_new(arrays).context(EncodeArrowSnafu)?;
if column_null_mask.is_empty() {
writer.write(&chunk, None).context(EncodeArrowSnafu)?;
} else {
let valid_ipc_fields = ipc_fields
.iter()
.zip(BitVec::from_slice(column_null_mask))
.filter(|(_, is_null)| !*is_null)
.map(|(ipc_field, _)| ipc_field.clone())
.collect::<Vec<_>>();
writer
.write(&chunk, Some(&valid_ipc_fields))
.context(EncodeArrowSnafu)?;
}
writer.start(arrow_schema, None).context(EncodeArrowSnafu)?;
for mutation in item.iter() {
let chunk = match mutation {
Mutation::Put(put) => {
let arrays = item_schema
.column_schemas()
.iter()
.map(|column_schema| {
let vector = put.column_by_name(&column_schema.name).context(
MissingColumnSnafu {
name: &column_schema.name,
},
)?;
Ok(vector.to_arrow_array())
})
.collect::<Result<Vec<_>>>()?;
ArrowChunk::try_new(arrays).context(EncodeArrowSnafu)?
}
};
writer.write(&chunk, None).context(EncodeArrowSnafu)?;
}
writer.finish().context(EncodeArrowSnafu)?;
Ok(())
@@ -582,12 +557,12 @@ pub mod codec {
}
pub struct WriteBatchArrowDecoder {
mutation_extras: Vec<MutationExtra>,
mutation_types: Vec<i32>,
}
impl WriteBatchArrowDecoder {
pub fn new(mutation_extras: Vec<MutationExtra>) -> Self {
Self { mutation_extras }
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
@@ -598,120 +573,67 @@ pub mod codec {
fn decode(&self, src: &[u8]) -> Result<WriteBatch> {
let mut reader = Cursor::new(src);
let metadata = read::read_stream_metadata(&mut reader).context(DecodeArrowSnafu)?;
let mut reader = ArrowStreamReader::new(reader, metadata);
let schema = reader.metadata().schema.clone();
let mut reader = StreamReader::new(reader, metadata);
let arrow_schema = reader.metadata().schema.clone();
let stream_states = self
.mutation_extras
.iter()
.map(|ext| {
reader
.maybe_next(&ext.column_null_mask)
.context(DecodeArrowSnafu)
})
.collect::<Result<Vec<_>>>()?;
let mut chunks = Vec::with_capacity(self.mutation_types.len());
for stream_state in reader.by_ref() {
let stream_state = stream_state.context(DecodeArrowSnafu)?;
let chunk = match stream_state {
StreamState::Some(chunk) => chunk,
StreamState::Waiting => return StreamWaitingSnafu {}.fail(),
};
chunks.push(chunk);
}
// check if exactly finished
ensure!(
reader.check_exactly_finished().context(DecodeArrowSnafu)?,
reader.is_finished(),
DataCorruptedSnafu {
message: "Impossible, the num of data chunks is different than expected."
}
);
let mut chunks = Vec::with_capacity(self.mutation_extras.len());
for state_opt in stream_states.into_iter().flatten() {
match state_opt {
StreamState::Some(chunk) => chunks.push(chunk),
StreamState::Waiting => return Err(WriteBatchError::StreamWaiting),
}
}
// chunks -> mutations
let chunks = chunks
.iter()
.map(|chunk| chunk.arrays())
.map(|arrays| {
arrays
.iter()
.map(Helper::try_into_vector)
.collect::<DataTypesResult<Vec<_>>>()
.context(DecodeVectorSnafu)
})
.collect::<Result<Vec<_>>>()?;
ensure!(
chunks.len() == self.mutation_extras.len(),
chunks.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: format!(
"expected {} mutations, but got {}",
self.mutation_extras.len(),
self.mutation_types.len(),
chunks.len()
)
}
);
let schema = Schema::try_from(Arc::new(schema)).context(ParseSchemaSnafu)?;
let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?);
let mut write_batch = WriteBatch::new(schema.clone());
let column_names = schema
.column_schemas()
.iter()
.map(|column| column.name.clone())
.collect::<Vec<_>>();
let mutations = self
.mutation_extras
.iter()
.zip(chunks.iter())
.map(
|(ext, mtn)| match MutationType::from_i32(ext.mutation_type) {
Some(MutationType::Put) => {
let gen_mutation_put = |valid_columns: &[String]| {
let mut put_data = PutData::with_num_columns(valid_columns.len());
let res = valid_columns
.iter()
.zip(mtn)
.map(|(name, vector)| {
put_data.add_column_by_name(name, vector.clone())
})
.collect::<Result<Vec<_>>>();
res.map(|_| Mutation::Put(put_data))
};
if ext.column_null_mask.is_empty() {
gen_mutation_put(&column_names)
} else {
let valid_columns = BitVec::from_slice(&ext.column_null_mask)
.into_iter()
.zip(column_names.iter())
.filter(|(is_null, _)| !*is_null)
.map(|(_, column_name)| column_name.clone())
.collect::<Vec<_>>();
gen_mutation_put(&valid_columns)
}
for (mutation_type, chunk) in self.mutation_types.iter().zip(chunks.into_iter()) {
match MutationType::from_i32(*mutation_type) {
Some(MutationType::Put) => {
let mut put_data = PutData::with_num_columns(schema.num_columns());
for (column_schema, array) in
schema.column_schemas().iter().zip(chunk.arrays().iter())
{
let vector =
Helper::try_into_vector(array).context(DecodeVectorSnafu)?;
put_data.add_column_by_name(&column_schema.name, vector)?;
}
Some(MutationType::Delete) => {
todo!("delete mutation")
}
_ => DataCorruptedSnafu {
message: format!("Unexpceted mutation type: {}", ext.mutation_type),
}
.fail(),
},
)
.collect::<Result<Vec<_>>>()?;
let mut write_batch = WriteBatch::new(Arc::new(schema));
mutations
.into_iter()
.try_for_each(|mutation| match mutation {
Mutation::Put(put_data) => write_batch.put(put_data),
})?;
write_batch.put(put_data)?;
}
Some(MutationType::Delete) => {
unimplemented!("delete mutation is not implemented")
}
_ => {
return DataCorruptedSnafu {
message: format!("Unexpceted mutation type: {}", mutation_type),
}
.fail()
}
}
}
Ok(write_batch)
}
@@ -733,12 +655,15 @@ pub mod codec {
.schema()
.column_schemas()
.iter()
.filter_map(|cs| put_data.column_by_name(&cs.name))
.map(gen_columns)
.collect::<write_batch::Result<Vec<_>>>(),
.map(|cs| {
let vector = put_data
.column_by_name(&cs.name)
.context(MissingColumnSnafu { name: &cs.name })?;
gen_columns(vector).context(ToProtobufSnafu)
})
.collect::<Result<Vec<_>>>(),
})
.collect::<write_batch::Result<Vec<_>>>()
.context(ToProtobufSnafu {})?
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|columns| write_batch::Mutation {
mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put {
@@ -757,13 +682,13 @@ pub mod codec {
}
pub struct WriteBatchProtobufDecoder {
mutation_extras: Vec<MutationExtra>,
mutation_types: Vec<i32>,
}
impl WriteBatchProtobufDecoder {
#[allow(dead_code)]
pub fn new(mutation_extras: Vec<MutationExtra>) -> Self {
Self { mutation_extras }
pub fn new(mutation_types: Vec<i32>) -> Self {
Self { mutation_types }
}
}
@@ -781,47 +706,34 @@ pub mod codec {
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
ensure!(
write_batch.mutations.len() == self.mutation_extras.len(),
write_batch.mutations.len() == self.mutation_types.len(),
DataCorruptedSnafu {
message: &format!(
"expected {} mutations, but got {}",
self.mutation_extras.len(),
self.mutation_types.len(),
write_batch.mutations.len()
)
}
);
let mutations = self
.mutation_extras
.iter()
.zip(write_batch.mutations.into_iter())
.map(|(ext, mtn)| match mtn.mutation {
let mutations = write_batch
.mutations
.into_iter()
.map(|mtn| match mtn.mutation {
Some(write_batch::mutation::Mutation::Put(put)) => {
let column_schemas = schema.column_schemas();
let valid_columns = if ext.column_null_mask.is_empty() {
column_schemas
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.collect::<Vec<_>>()
} else {
BitVec::from_slice(&ext.column_null_mask)
.into_iter()
.zip(column_schemas.iter())
.filter(|(is_null, _)| !*is_null)
.map(|(_, column)| (column.name.clone(), column.data_type.clone()))
.collect::<Vec<_>>()
};
let mut put_data = PutData::with_num_columns(put.columns.len());
let res = valid_columns
.into_iter()
let res = schema
.column_schemas()
.iter()
.map(|column| (column.name.clone(), column.data_type.clone()))
.zip(put.columns.into_iter())
.map(|((name, data_type), column)| {
gen_put_data_vector(data_type, column).map(|vector| (name, vector))
gen_put_data_vector(data_type, column)
.map(|vector| (name, vector))
.context(FromProtobufSnafu)
})
.collect::<write_batch::Result<Vec<_>>>()
.context(FromProtobufSnafu {})?
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|(name, vector)| put_data.add_column_by_name(&name, vector))
.collect::<Result<Vec<_>>>();
@@ -861,7 +773,6 @@ mod tests {
use super::*;
use crate::codec::{Decoder, Encoder};
use crate::proto;
use crate::proto::wal::MutationExtra;
use crate::test_util::write_batch_util;
#[test]
@@ -1113,7 +1024,7 @@ mod tests {
)
}
fn gen_new_batch_and_extras() -> (WriteBatch, Vec<MutationExtra>) {
fn gen_new_batch_and_types() -> (WriteBatch, Vec<i32>) {
let mut batch = new_test_batch();
for i in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
@@ -1129,21 +1040,21 @@ mod tests {
batch.put(put_data).unwrap();
}
let extras = proto::wal::gen_mutation_extras(&batch);
let types = proto::wal::gen_mutation_types(&batch);
(batch, extras)
(batch, types)
}
#[test]
fn test_codec_arrow() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras();
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone());
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras);
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
@@ -1153,14 +1064,14 @@ mod tests {
#[test]
fn test_codec_protobuf() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras();
let (batch, mutation_types) = gen_new_batch_and_types();
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras);
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
@@ -1168,7 +1079,7 @@ mod tests {
Ok(())
}
fn gen_new_batch_and_extras_with_none_column() -> (WriteBatch, Vec<MutationExtra>) {
fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec<i32>) {
let mut batch = new_test_batch();
for _ in 0..10 {
let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3]));
@@ -1182,21 +1093,21 @@ mod tests {
batch.put(put_data).unwrap();
}
let extras = proto::wal::gen_mutation_extras(&batch);
let types = proto::wal::gen_mutation_types(&batch);
(batch, extras)
(batch, types)
}
#[test]
fn test_codec_with_none_column_arrow() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column();
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone());
let encoder = codec::WriteBatchArrowEncoder::new();
let mut dst = vec![];
let result = encoder.encode(&batch, &mut dst);
assert!(result.is_ok());
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras);
let decoder = codec::WriteBatchArrowDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);
@@ -1206,13 +1117,13 @@ mod tests {
#[test]
fn test_codec_with_none_column_protobuf() -> Result<()> {
let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column();
let (batch, mutation_types) = gen_new_batch_and_types_with_none_column();
let encoder = codec::WriteBatchProtobufEncoder {};
let mut dst = vec![];
encoder.encode(&batch, &mut dst).unwrap();
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras);
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types);
let result = decoder.decode(&dst);
let batch2 = result?;
assert_eq!(batch.num_rows, batch2.num_rows);