From a457c49d99792af8856238335bf8cdfd32606f92 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 24 Oct 2022 14:53:35 +0800 Subject: [PATCH] refactor: Remove column_null_mask in MutationExtra (#314) * refactor: Remove column_null_mask in MutationExtra MutationExtra::column_null_mask is no longer needed as we could ensure there is no missing column in WriteBatch. * feat(storage): Remove MutationExtra Just stores MutationType in the WalHeader, no longer needs MutationExtra --- src/meta-srv/Cargo.toml | 2 +- src/servers/Cargo.toml | 2 +- src/storage/benches/wal/bench_decode.rs | 42 ++- src/storage/benches/wal/bench_encode.rs | 26 +- src/storage/benches/wal/bench_wal.rs | 30 +-- src/storage/benches/wal/util/mod.rs | 8 +- src/storage/proto/wal.proto | 4 +- src/storage/src/arrow_stream.rs | 226 ---------------- src/storage/src/lib.rs | 1 - src/storage/src/proto/wal.rs | 25 +- src/storage/src/wal.rs | 17 +- src/storage/src/write_batch.rs | 337 +++++++++--------------- 12 files changed, 188 insertions(+), 532 deletions(-) delete mode 100644 src/storage/src/arrow_stream.rs diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 3e7014f706..dfdad5c761 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -13,8 +13,8 @@ common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } etcd-client = "0.10" futures = "0.3" -http-body = "0.4" h2 = "0.3" +http-body = "0.4" serde = "1.0" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 8acc19bd2d..e2e12fa6a5 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -40,9 +40,9 @@ axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", br catalog = { path = "../catalog" } common-base = { path = "../common/base" } mysql_async = { git = "https://github.com/Morranto/mysql_async.git", rev = "127b538" } +query = { path = "../query" } rand = "0.8" script = { path = "../script", features = ["python"] } -query = { path = "../query" } table = { path = "../table" } tokio-postgres = "0.7" tokio-test = "0.4" diff --git a/src/storage/benches/wal/bench_decode.rs b/src/storage/benches/wal/bench_decode.rs index 58263aaa3d..5b4abb8815 100644 --- a/src/storage/benches/wal/bench_decode.rs +++ b/src/storage/benches/wal/bench_decode.rs @@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use storage::codec::{Decoder, Encoder}; use storage::write_batch::{codec, WriteBatch}; -use super::util::gen_new_batch_and_extras; +use crate::wal::util::gen_new_batch_and_types; /* ------------------------------------- @@ -18,12 +18,8 @@ rows | protobuf | arrow | ------------------------------------ */ -fn encode_arrow( - batch: &WriteBatch, - mutation_extras: &[storage::proto::wal::MutationExtra], - dst: &mut Vec, -) { - let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); +fn encode_arrow(batch: &WriteBatch, dst: &mut Vec) { + let encoder = codec::WriteBatchArrowEncoder::new(); let result = encoder.encode(batch, dst); assert!(result.is_ok()); } @@ -34,22 +30,22 @@ fn encode_protobuf(batch: &WriteBatch, dst: &mut Vec) { assert!(result.is_ok()); } -fn decode_arrow(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) { - let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec()); +fn decode_arrow(dst: &[u8], mutation_types: &[i32]) { + let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec()); let result = decoder.decode(dst); assert!(result.is_ok()); } -fn decode_protobuf(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) { - let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec()); +fn decode_protobuf(dst: &[u8], mutation_types: &[i32]) { + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec()); let result = decoder.decode(dst); assert!(result.is_ok()); } fn bench_wal_decode(c: &mut Criterion) { - let (batch_10, extras_10) = gen_new_batch_and_extras(1); - let (batch_100, extras_100) = gen_new_batch_and_extras(10); - let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + let (batch_10, types_10) = gen_new_batch_and_types(1); + let (batch_100, types_100) = gen_new_batch_and_types(10); + let (batch_10000, types_10000) = gen_new_batch_and_types(100); let mut dst_protobuf_10 = vec![]; let mut dst_protobuf_100 = vec![]; let mut dst_protobuf_10000 = vec![]; @@ -62,28 +58,28 @@ fn bench_wal_decode(c: &mut Criterion) { encode_protobuf(&batch_100, &mut dst_protobuf_100); encode_protobuf(&batch_10000, &mut dst_protobuf_10000); - encode_arrow(&batch_10, &extras_10, &mut dst_arrow_10); - encode_arrow(&batch_100, &extras_100, &mut dst_arrow_100); - encode_arrow(&batch_10000, &extras_10000, &mut dst_arrow_10000); + encode_arrow(&batch_10, &mut dst_arrow_10); + encode_arrow(&batch_100, &mut dst_arrow_100); + encode_arrow(&batch_10000, &mut dst_arrow_10000); let mut group = c.benchmark_group("wal_decode"); group.bench_function("protobuf_decode_with_10_num_rows", |b| { - b.iter(|| decode_protobuf(&dst_protobuf_10, &extras_10)) + b.iter(|| decode_protobuf(&dst_protobuf_10, &types_10)) }); group.bench_function("protobuf_decode_with_100_num_rows", |b| { - b.iter(|| decode_protobuf(&dst_protobuf_100, &extras_100)) + b.iter(|| decode_protobuf(&dst_protobuf_100, &types_100)) }); group.bench_function("protobuf_decode_with_10000_num_rows", |b| { - b.iter(|| decode_protobuf(&dst_protobuf_10000, &extras_10000)) + b.iter(|| decode_protobuf(&dst_protobuf_10000, &types_10000)) }); group.bench_function("arrow_decode_with_10_num_rows", |b| { - b.iter(|| decode_arrow(&dst_arrow_10, &extras_10)) + b.iter(|| decode_arrow(&dst_arrow_10, &types_10)) }); group.bench_function("arrow_decode_with_100_num_rows", |b| { - b.iter(|| decode_arrow(&dst_arrow_100, &extras_100)) + b.iter(|| decode_arrow(&dst_arrow_100, &types_100)) }); group.bench_function("arrow_decode_with_10000_num_rows", |b| { - b.iter(|| decode_arrow(&dst_arrow_10000, &extras_10000)) + b.iter(|| decode_arrow(&dst_arrow_10000, &types_10000)) }); group.finish(); } diff --git a/src/storage/benches/wal/bench_encode.rs b/src/storage/benches/wal/bench_encode.rs index 8c8d2f590c..040b8e326a 100644 --- a/src/storage/benches/wal/bench_encode.rs +++ b/src/storage/benches/wal/bench_encode.rs @@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use storage::codec::Encoder; use storage::write_batch::{codec, WriteBatch}; -use super::util::gen_new_batch_and_extras; +use crate::wal::util::gen_new_batch_and_types; /* ------------------------------------- @@ -18,14 +18,14 @@ rows | protobuf | arrow | ------------------------------------ */ -fn encode_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { - let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); +fn encode_arrow(batch: &WriteBatch) { + let encoder = codec::WriteBatchArrowEncoder::new(); let mut dst = vec![]; let result = encoder.encode(batch, &mut dst); assert!(result.is_ok()); } -fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal::MutationExtra]) { +fn encode_protobuf(batch: &WriteBatch) { let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; let result = encoder.encode(batch, &mut dst); @@ -33,28 +33,28 @@ fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal:: } fn bench_wal_encode(c: &mut Criterion) { - let (batch_10, extras_10) = gen_new_batch_and_extras(1); - let (batch_100, extras_100) = gen_new_batch_and_extras(10); - let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + let (batch_10, _) = gen_new_batch_and_types(1); + let (batch_100, _) = gen_new_batch_and_types(10); + let (batch_10000, _) = gen_new_batch_and_types(100); let mut group = c.benchmark_group("wal_encode"); group.bench_function("protobuf_encode_with_10_num_rows", |b| { - b.iter(|| encode_protobuf(&batch_10, &extras_10)) + b.iter(|| encode_protobuf(&batch_10)) }); group.bench_function("protobuf_encode_with_100_num_rows", |b| { - b.iter(|| encode_protobuf(&batch_100, &extras_100)) + b.iter(|| encode_protobuf(&batch_100)) }); group.bench_function("protobuf_encode_with_10000_num_rows", |b| { - b.iter(|| encode_protobuf(&batch_10000, &extras_10000)) + b.iter(|| encode_protobuf(&batch_10000)) }); group.bench_function("arrow_encode_with_10_num_rows", |b| { - b.iter(|| encode_arrow(&batch_10, &extras_10)) + b.iter(|| encode_arrow(&batch_10)) }); group.bench_function("arrow_encode_with_100_num_rows", |b| { - b.iter(|| encode_arrow(&batch_100, &extras_100)) + b.iter(|| encode_arrow(&batch_100)) }); group.bench_function("arrow_encode_with_10000_num_rows", |b| { - b.iter(|| encode_arrow(&batch_10000, &extras_10000)) + b.iter(|| encode_arrow(&batch_10000)) }); group.finish(); } diff --git a/src/storage/benches/wal/bench_wal.rs b/src/storage/benches/wal/bench_wal.rs index ca57e02c69..e0cca5ed86 100644 --- a/src/storage/benches/wal/bench_wal.rs +++ b/src/storage/benches/wal/bench_wal.rs @@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use storage::codec::{Decoder, Encoder}; use storage::write_batch::{codec, WriteBatch}; -use super::util::gen_new_batch_and_extras; +use crate::wal::util::gen_new_batch_and_types; /* ------------------------------------- @@ -18,50 +18,50 @@ rows | protobuf | arrow | ------------------------------------ */ -fn codec_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { - let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); +fn codec_arrow(batch: &WriteBatch, mutation_types: &[i32]) { + let encoder = codec::WriteBatchArrowEncoder::new(); let mut dst = vec![]; let result = encoder.encode(batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec()); + let decoder = codec::WriteBatchArrowDecoder::new(mutation_types.to_vec()); let result = decoder.decode(&dst); assert!(result.is_ok()); } -fn codec_protobuf(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { +fn codec_protobuf(batch: &WriteBatch, mutation_types: &[i32]) { let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; let result = encoder.encode(batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec()); + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types.to_vec()); let result = decoder.decode(&dst); assert!(result.is_ok()); } fn bench_wal_encode_decode(c: &mut Criterion) { - let (batch_10, extras_10) = gen_new_batch_and_extras(1); - let (batch_100, extras_100) = gen_new_batch_and_extras(10); - let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + let (batch_10, types_10) = gen_new_batch_and_types(1); + let (batch_100, types_100) = gen_new_batch_and_types(10); + let (batch_10000, types_10000) = gen_new_batch_and_types(100); let mut group = c.benchmark_group("wal_encode_decode"); group.bench_function("protobuf_encode_decode_with_10_num_rows", |b| { - b.iter(|| codec_protobuf(&batch_10, &extras_10)) + b.iter(|| codec_protobuf(&batch_10, &types_10)) }); group.bench_function("protobuf_encode_decode_with_100_num_rows", |b| { - b.iter(|| codec_protobuf(&batch_100, &extras_100)) + b.iter(|| codec_protobuf(&batch_100, &types_100)) }); group.bench_function("protobuf_encode_decode_with_10000_num_rows", |b| { - b.iter(|| codec_protobuf(&batch_10000, &extras_10000)) + b.iter(|| codec_protobuf(&batch_10000, &types_10000)) }); group.bench_function("arrow_encode_decode_with_10_num_rows", |b| { - b.iter(|| codec_arrow(&batch_10, &extras_10)) + b.iter(|| codec_arrow(&batch_10, &types_10)) }); group.bench_function("arrow_encode_decode_with_100_num_rows", |b| { - b.iter(|| codec_arrow(&batch_100, &extras_100)) + b.iter(|| codec_arrow(&batch_100, &types_100)) }); group.bench_function("arrow_encode_decode_with_10000_num_rows", |b| { - b.iter(|| codec_arrow(&batch_10000, &extras_10000)) + b.iter(|| codec_arrow(&batch_10000, &types_10000)) }); group.finish(); } diff --git a/src/storage/benches/wal/util/mod.rs b/src/storage/benches/wal/util/mod.rs index 64491c85ff..ebff52cceb 100644 --- a/src/storage/benches/wal/util/mod.rs +++ b/src/storage/benches/wal/util/mod.rs @@ -33,9 +33,7 @@ pub fn new_test_batch() -> WriteBatch { ) } -pub fn gen_new_batch_and_extras( - putdate_nums: usize, -) -> (WriteBatch, Vec) { +pub fn gen_new_batch_and_types(putdate_nums: usize) -> (WriteBatch, Vec) { let mut batch = new_test_batch(); let mut rng = rand::thread_rng(); for _ in 0..putdate_nums { @@ -78,6 +76,6 @@ pub fn gen_new_batch_and_extras( put_data.add_key_column("10", svs.clone()).unwrap(); batch.put(put_data).unwrap(); } - let extras = proto::wal::gen_mutation_extras(&batch); - (batch, extras) + let types = proto::wal::gen_mutation_types(&batch); + (batch, types) } diff --git a/src/storage/proto/wal.proto b/src/storage/proto/wal.proto index 8fa4bc530e..c8cfb75d50 100644 --- a/src/storage/proto/wal.proto +++ b/src/storage/proto/wal.proto @@ -5,7 +5,8 @@ package greptime.storage.wal.v1; message WalHeader { PayloadType payload_type = 1; uint64 last_manifest_version = 2; - repeated MutationExtra mutation_extras = 3; + // Type of each mutation in payload, now only arrow payload uses this field. + repeated MutationType mutation_types = 3; } enum PayloadType { @@ -16,7 +17,6 @@ enum PayloadType { message MutationExtra { MutationType mutation_type = 1; - bytes column_null_mask = 2; } enum MutationType { diff --git a/src/storage/src/arrow_stream.rs b/src/storage/src/arrow_stream.rs deleted file mode 100644 index 3c49bd2d5f..0000000000 --- a/src/storage/src/arrow_stream.rs +++ /dev/null @@ -1,226 +0,0 @@ -//! Forked from [arrow2](https://github.com/jorgecarleitao/arrow2/blob/v0.10.1/src/io/ipc/read/stream.rs), -//! and I made a slight change because arrow2 can only use the same schema to read all data chunks, -//! which doesn't solve the none column problem, so I added a `column_null_mask` parameter to the -//! `StreamReader#maybe_next` method to solve the none column problem. -use std::io::Read; - -use arrow_format::{self, ipc::planus::ReadAsRoot}; -use common_base::BitVec; -use datatypes::arrow::{ - datatypes::Schema, - error::{ArrowError, Result}, - io::ipc::{ - read::{read_dictionary, read_record_batch, Dictionaries, StreamMetadata, StreamState}, - IpcSchema, - }, -}; - -const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; - -pub struct ArrowStreamReader { - reader: R, - metadata: StreamMetadata, - dictionaries: Dictionaries, - finished: bool, - data_buffer: Vec, - message_buffer: Vec, -} - -impl ArrowStreamReader { - pub fn new(reader: R, metadata: StreamMetadata) -> Self { - Self { - reader, - metadata, - dictionaries: Default::default(), - finished: false, - data_buffer: vec![], - message_buffer: vec![], - } - } - - /// Return the schema of the stream - pub fn metadata(&self) -> &StreamMetadata { - &self.metadata - } - - /// Check if the stream is finished - pub fn is_finished(&self) -> bool { - self.finished - } - - /// Check if the stream is exactly finished - pub fn check_exactly_finished(&mut self) -> Result { - if self.is_finished() { - return Ok(false); - } - - let _ = self.maybe_next(&[])?; - - Ok(self.is_finished()) - } - - pub fn maybe_next(&mut self, column_null_mask: &[u8]) -> Result> { - if self.finished { - return Ok(None); - } - - let batch = if column_null_mask.is_empty() { - read_next( - &mut self.reader, - &self.metadata, - &mut self.dictionaries, - &mut self.message_buffer, - &mut self.data_buffer, - )? - } else { - read_next( - &mut self.reader, - &valid_metadata(&self.metadata, column_null_mask), - &mut self.dictionaries, - &mut self.message_buffer, - &mut self.data_buffer, - )? - }; - - if batch.is_none() { - self.finished = true; - } - - Ok(batch) - } -} - -fn valid_metadata(metadata: &StreamMetadata, column_null_mask: &[u8]) -> StreamMetadata { - let column_null_mask = BitVec::from_slice(column_null_mask); - - let schema = Schema::from( - metadata - .schema - .fields - .iter() - .zip(&column_null_mask) - .filter(|(_, is_null)| !**is_null) - .map(|(field, _)| field.clone()) - .collect::>(), - ) - .with_metadata(metadata.schema.metadata.clone()); - - let ipc_schema = IpcSchema { - fields: metadata - .ipc_schema - .fields - .iter() - .zip(&column_null_mask) - .filter(|(_, is_null)| !**is_null) - .map(|(ipc_field, _)| ipc_field.clone()) - .collect::>(), - is_little_endian: metadata.ipc_schema.is_little_endian, - }; - - StreamMetadata { - schema, - version: metadata.version, - ipc_schema, - } -} - -fn read_next( - reader: &mut R, - metadata: &StreamMetadata, - dictionaries: &mut Dictionaries, - message_buffer: &mut Vec, - data_buffer: &mut Vec, -) -> Result> { - // determine metadata length - let mut meta_length: [u8; 4] = [0; 4]; - - match reader.read_exact(&mut meta_length) { - Ok(()) => (), - Err(e) => { - return if e.kind() == std::io::ErrorKind::UnexpectedEof { - // Handle EOF without the "0xFFFFFFFF 0x00000000" - // valid according to: - // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format - Ok(Some(StreamState::Waiting)) - } else { - Err(ArrowError::from(e)) - }; - } - } - - let meta_length = { - // If a continuation marker is encountered, skip over it and read - // the size from the next four bytes. - if meta_length == CONTINUATION_MARKER { - reader.read_exact(&mut meta_length)?; - } - i32::from_le_bytes(meta_length) as usize - }; - - if meta_length == 0 { - // the stream has ended, mark the reader as finished - return Ok(None); - } - - message_buffer.clear(); - message_buffer.resize(meta_length, 0); - reader.read_exact(message_buffer)?; - - let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| { - ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) - })?; - let header = message.header()?.ok_or_else(|| { - ArrowError::OutOfSpec( - "IPC: unable to fetch the message header. The file or stream is corrupted.".to_string(), - ) - })?; - - match header { - arrow_format::ipc::MessageHeaderRef::Schema(_) => { - Err(ArrowError::OutOfSpec("A stream ".to_string())) - } - arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - // read the block that makes up the record batch into a buffer - data_buffer.clear(); - data_buffer.resize(message.body_length()? as usize, 0); - reader.read_exact(data_buffer)?; - - let mut reader = std::io::Cursor::new(data_buffer); - - read_record_batch( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - None, - dictionaries, - metadata.version, - &mut reader, - 0, - ) - .map(|x| Some(StreamState::Some(x))) - } - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - // read the block that makes up the dictionary batch into a buffer - let mut buf = vec![0; message.body_length()? as usize]; - reader.read_exact(&mut buf)?; - - let mut dict_reader = std::io::Cursor::new(buf); - - read_dictionary( - batch, - &metadata.schema.fields, - &metadata.ipc_schema, - dictionaries, - &mut dict_reader, - 0, - )?; - - // read the next message until we encounter a RecordBatch message - read_next(reader, metadata, dictionaries, message_buffer, data_buffer) - } - t => Err(ArrowError::OutOfSpec(format!( - "Reading types other than record batches not yet supported, unable to read {:?} ", - t - ))), - } -} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 9c731cc62d..769a191851 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -1,6 +1,5 @@ //! Storage engine implementation. #![feature(map_first_last)] -mod arrow_stream; mod background; mod chunk; pub mod codec; diff --git a/src/storage/src/proto/wal.rs b/src/storage/src/proto/wal.rs index 012ba34c33..557f772525 100644 --- a/src/storage/src/proto/wal.rs +++ b/src/storage/src/proto/wal.rs @@ -1,34 +1,13 @@ #![allow(clippy::all)] tonic::include_proto!("greptime.storage.wal.v1"); -use common_base::BitVec; - use crate::write_batch::{Mutation, WriteBatch}; -pub fn gen_mutation_extras(write_batch: &WriteBatch) -> Vec { - let column_schemas = write_batch.schema().column_schemas(); +pub fn gen_mutation_types(write_batch: &WriteBatch) -> Vec { write_batch .iter() .map(|m| match m { - Mutation::Put(put) => { - if put.num_columns() == column_schemas.len() { - MutationExtra { - mutation_type: MutationType::Put.into(), - column_null_mask: Default::default(), - } - } else { - let mut column_null_mask = BitVec::repeat(false, column_schemas.len()); - for (i, cs) in column_schemas.iter().enumerate() { - if put.column_by_name(&cs.name).is_none() { - column_null_mask.set(i, true); - } - } - MutationExtra { - mutation_type: MutationType::Put.into(), - column_null_mask: column_null_mask.into_vec(), - } - } - } + Mutation::Put(_) => MutationType::Put.into(), }) .collect::>() } diff --git a/src/storage/src/wal.rs b/src/storage/src/wal.rs index 4f2dae1dab..d48101a9f6 100644 --- a/src/storage/src/wal.rs +++ b/src/storage/src/wal.rs @@ -72,7 +72,7 @@ impl Wal { /// v v /// +---------------------+----------------------------------------------------+--------------+-------------+--------------+ /// | | Header | | | | - /// | Header Len(varint) | (last_manifest_version + mutation_extras + ...) | Data Chunk0 | Data Chunk1 | ... | + /// | Header Len(varint) | (last_manifest_version + mutation_types + ...) | Data Chunk0 | Data Chunk1 | ... | /// | | | | | | /// +---------------------+----------------------------------------------------+--------------+-------------+--------------+ /// ``` @@ -84,9 +84,8 @@ impl Wal { payload: Payload<'_>, ) -> Result<(u64, usize)> { header.payload_type = payload.payload_type(); - if let Payload::WriteBatchArrow(batch) = payload { - header.mutation_extras = wal::gen_mutation_extras(batch); + header.mutation_types = wal::gen_mutation_types(batch); } let mut buf = vec![]; @@ -97,7 +96,7 @@ impl Wal { if let Payload::WriteBatchArrow(batch) = payload { // entry - let encoder = WriteBatchArrowEncoder::new(header.mutation_extras); + let encoder = WriteBatchArrowEncoder::new(); // TODO(jiachun): provide some way to compute data size before encode, so we can preallocate an exactly sized buf. encoder .encode(batch, &mut buf) @@ -185,8 +184,8 @@ impl Wal { match PayloadType::from_i32(header.payload_type) { Some(PayloadType::None) => Ok((seq_num, header, None)), Some(PayloadType::WriteBatchArrow) => { - let mutation_extras = std::mem::take(&mut header.mutation_extras); - let decoder = WriteBatchArrowDecoder::new(mutation_extras); + let mutation_types = std::mem::take(&mut header.mutation_types); + let decoder = WriteBatchArrowDecoder::new(mutation_types); let write_batch = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) @@ -197,8 +196,8 @@ impl Wal { Ok((seq_num, header, Some(write_batch))) } Some(PayloadType::WriteBatchProto) => { - let mutation_extras = std::mem::take(&mut header.mutation_extras); - let decoder = WriteBatchProtobufDecoder::new(mutation_extras); + let mutation_types = std::mem::take(&mut header.mutation_types); + let decoder = WriteBatchProtobufDecoder::new(mutation_types); let write_batch = decoder .decode(&input[data_pos..]) .map_err(BoxedError::new) @@ -320,7 +319,7 @@ mod tests { let wal_header = WalHeader { payload_type: 1, last_manifest_version: 99999999, - mutation_extras: vec![], + mutation_types: vec![], }; let mut buf: Vec = vec![]; diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 0bc64c2b86..020d2b3107 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -105,7 +105,7 @@ pub enum Error { }, #[snafu(display("Failed to decode, in stream waiting state"))] - StreamWaiting, + StreamWaiting { backtrace: Backtrace }, #[snafu(display("Failed to decode, corrupted data {}", message))] DataCorrupted { @@ -477,17 +477,14 @@ pub mod codec { use std::{io::Cursor, sync::Arc}; - use common_base::BitVec; use datatypes::{ arrow::{ chunk::Chunk as ArrowChunk, io::ipc::{ - self, - read::{self, StreamState}, + read::{self, StreamReader, StreamState}, write::{StreamWriter, WriteOptions}, }, }, - error::Result as DataTypesResult, schema::{Schema, SchemaRef}, vectors::Helper, }; @@ -495,32 +492,26 @@ pub mod codec { use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::WriteRequest; - use super::{ - DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, - Error as WriteBatchError, FromProtobufSnafu, Mutation, ParseSchemaSnafu, Result, - ToProtobufSnafu, WriteBatch, - }; + use crate::codec::{Decoder, Encoder}; use crate::proto::{ - wal::{MutationExtra, MutationType}, + wal::MutationType, write_batch::{self, gen_columns, gen_put_data_vector}, }; - use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData}; - use crate::{ - arrow_stream::ArrowStreamReader, - codec::{Decoder, Encoder}, + use crate::write_batch::{ + DataCorruptedSnafu, DecodeArrowSnafu, DecodeVectorSnafu, EncodeArrowSnafu, + Error as WriteBatchError, FromProtobufSnafu, MissingColumnSnafu, Mutation, + ParseSchemaSnafu, Result, StreamWaitingSnafu, ToProtobufSnafu, WriteBatch, }; + use crate::write_batch::{DecodeProtobufSnafu, EncodeProtobufSnafu, PutData}; - // TODO(jiachun): The codec logic is too complex, maybe we should use protobuf to - // serialize/deserialize all our data. - // And we can make a comparison with protobuf, including performance, storage cost, + // TODO(jiachun): We can make a comparison with protobuf, including performance, storage cost, // CPU consumption, etc - pub struct WriteBatchArrowEncoder { - mutation_extras: Vec, - } + #[derive(Default)] + pub struct WriteBatchArrowEncoder {} impl WriteBatchArrowEncoder { - pub fn new(mutation_extras: Vec) -> Self { - Self { mutation_extras } + pub fn new() -> Self { + Self::default() } } @@ -529,52 +520,36 @@ pub mod codec { type Error = WriteBatchError; fn encode(&self, item: &WriteBatch, dst: &mut Vec) -> Result<()> { - let schema = item.schema().arrow_schema(); - - let column_names = item - .schema() - .column_schemas() - .iter() - .map(|column_schema| column_schema.name.clone()) - .collect::>(); - - let data = item - .iter() - .zip(self.mutation_extras.iter()) - .map(|(mtn, ext)| match mtn { - Mutation::Put(put) => { - let arrays = column_names - .iter() - .filter_map(|column_name| put.column_by_name(column_name)) - .map(|vector| vector.to_arrow_array()) - .collect::>(); - - (arrays, &ext.column_null_mask) - } - }); + let item_schema = item.schema(); + let arrow_schema = item_schema.arrow_schema(); let opts = WriteOptions { compression: None }; let mut writer = StreamWriter::new(dst, opts); - let ipc_fields = ipc::write::default_ipc_fields(&schema.fields); - writer - .start(schema, Some(ipc_fields.clone())) - .context(EncodeArrowSnafu)?; - for (arrays, column_null_mask) in data { - let chunk = ArrowChunk::try_new(arrays).context(EncodeArrowSnafu)?; - if column_null_mask.is_empty() { - writer.write(&chunk, None).context(EncodeArrowSnafu)?; - } else { - let valid_ipc_fields = ipc_fields - .iter() - .zip(BitVec::from_slice(column_null_mask)) - .filter(|(_, is_null)| !*is_null) - .map(|(ipc_field, _)| ipc_field.clone()) - .collect::>(); - writer - .write(&chunk, Some(&valid_ipc_fields)) - .context(EncodeArrowSnafu)?; - } + writer.start(arrow_schema, None).context(EncodeArrowSnafu)?; + + for mutation in item.iter() { + let chunk = match mutation { + Mutation::Put(put) => { + let arrays = item_schema + .column_schemas() + .iter() + .map(|column_schema| { + let vector = put.column_by_name(&column_schema.name).context( + MissingColumnSnafu { + name: &column_schema.name, + }, + )?; + Ok(vector.to_arrow_array()) + }) + .collect::>>()?; + + ArrowChunk::try_new(arrays).context(EncodeArrowSnafu)? + } + }; + + writer.write(&chunk, None).context(EncodeArrowSnafu)?; } + writer.finish().context(EncodeArrowSnafu)?; Ok(()) @@ -582,12 +557,12 @@ pub mod codec { } pub struct WriteBatchArrowDecoder { - mutation_extras: Vec, + mutation_types: Vec, } impl WriteBatchArrowDecoder { - pub fn new(mutation_extras: Vec) -> Self { - Self { mutation_extras } + pub fn new(mutation_types: Vec) -> Self { + Self { mutation_types } } } @@ -598,120 +573,67 @@ pub mod codec { fn decode(&self, src: &[u8]) -> Result { let mut reader = Cursor::new(src); let metadata = read::read_stream_metadata(&mut reader).context(DecodeArrowSnafu)?; - let mut reader = ArrowStreamReader::new(reader, metadata); - let schema = reader.metadata().schema.clone(); + let mut reader = StreamReader::new(reader, metadata); + let arrow_schema = reader.metadata().schema.clone(); - let stream_states = self - .mutation_extras - .iter() - .map(|ext| { - reader - .maybe_next(&ext.column_null_mask) - .context(DecodeArrowSnafu) - }) - .collect::>>()?; + let mut chunks = Vec::with_capacity(self.mutation_types.len()); + for stream_state in reader.by_ref() { + let stream_state = stream_state.context(DecodeArrowSnafu)?; + let chunk = match stream_state { + StreamState::Some(chunk) => chunk, + StreamState::Waiting => return StreamWaitingSnafu {}.fail(), + }; + + chunks.push(chunk); + } // check if exactly finished ensure!( - reader.check_exactly_finished().context(DecodeArrowSnafu)?, + reader.is_finished(), DataCorruptedSnafu { message: "Impossible, the num of data chunks is different than expected." } ); - let mut chunks = Vec::with_capacity(self.mutation_extras.len()); - - for state_opt in stream_states.into_iter().flatten() { - match state_opt { - StreamState::Some(chunk) => chunks.push(chunk), - StreamState::Waiting => return Err(WriteBatchError::StreamWaiting), - } - } - - // chunks -> mutations - let chunks = chunks - .iter() - .map(|chunk| chunk.arrays()) - .map(|arrays| { - arrays - .iter() - .map(Helper::try_into_vector) - .collect::>>() - .context(DecodeVectorSnafu) - }) - .collect::>>()?; - ensure!( - chunks.len() == self.mutation_extras.len(), + chunks.len() == self.mutation_types.len(), DataCorruptedSnafu { message: format!( "expected {} mutations, but got {}", - self.mutation_extras.len(), + self.mutation_types.len(), chunks.len() ) } ); - let schema = Schema::try_from(Arc::new(schema)).context(ParseSchemaSnafu)?; + let schema = Arc::new(Schema::try_from(arrow_schema).context(ParseSchemaSnafu)?); + let mut write_batch = WriteBatch::new(schema.clone()); - let column_names = schema - .column_schemas() - .iter() - .map(|column| column.name.clone()) - .collect::>(); - - let mutations = self - .mutation_extras - .iter() - .zip(chunks.iter()) - .map( - |(ext, mtn)| match MutationType::from_i32(ext.mutation_type) { - Some(MutationType::Put) => { - let gen_mutation_put = |valid_columns: &[String]| { - let mut put_data = PutData::with_num_columns(valid_columns.len()); - - let res = valid_columns - .iter() - .zip(mtn) - .map(|(name, vector)| { - put_data.add_column_by_name(name, vector.clone()) - }) - .collect::>>(); - - res.map(|_| Mutation::Put(put_data)) - }; - - if ext.column_null_mask.is_empty() { - gen_mutation_put(&column_names) - } else { - let valid_columns = BitVec::from_slice(&ext.column_null_mask) - .into_iter() - .zip(column_names.iter()) - .filter(|(is_null, _)| !*is_null) - .map(|(_, column_name)| column_name.clone()) - .collect::>(); - - gen_mutation_put(&valid_columns) - } + for (mutation_type, chunk) in self.mutation_types.iter().zip(chunks.into_iter()) { + match MutationType::from_i32(*mutation_type) { + Some(MutationType::Put) => { + let mut put_data = PutData::with_num_columns(schema.num_columns()); + for (column_schema, array) in + schema.column_schemas().iter().zip(chunk.arrays().iter()) + { + let vector = + Helper::try_into_vector(array).context(DecodeVectorSnafu)?; + put_data.add_column_by_name(&column_schema.name, vector)?; } - Some(MutationType::Delete) => { - todo!("delete mutation") - } - _ => DataCorruptedSnafu { - message: format!("Unexpceted mutation type: {}", ext.mutation_type), - } - .fail(), - }, - ) - .collect::>>()?; - let mut write_batch = WriteBatch::new(Arc::new(schema)); - - mutations - .into_iter() - .try_for_each(|mutation| match mutation { - Mutation::Put(put_data) => write_batch.put(put_data), - })?; + write_batch.put(put_data)?; + } + Some(MutationType::Delete) => { + unimplemented!("delete mutation is not implemented") + } + _ => { + return DataCorruptedSnafu { + message: format!("Unexpceted mutation type: {}", mutation_type), + } + .fail() + } + } + } Ok(write_batch) } @@ -733,12 +655,15 @@ pub mod codec { .schema() .column_schemas() .iter() - .filter_map(|cs| put_data.column_by_name(&cs.name)) - .map(gen_columns) - .collect::>>(), + .map(|cs| { + let vector = put_data + .column_by_name(&cs.name) + .context(MissingColumnSnafu { name: &cs.name })?; + gen_columns(vector).context(ToProtobufSnafu) + }) + .collect::>>(), }) - .collect::>>() - .context(ToProtobufSnafu {})? + .collect::>>()? .into_iter() .map(|columns| write_batch::Mutation { mutation: Some(write_batch::mutation::Mutation::Put(write_batch::Put { @@ -757,13 +682,13 @@ pub mod codec { } pub struct WriteBatchProtobufDecoder { - mutation_extras: Vec, + mutation_types: Vec, } impl WriteBatchProtobufDecoder { #[allow(dead_code)] - pub fn new(mutation_extras: Vec) -> Self { - Self { mutation_extras } + pub fn new(mutation_types: Vec) -> Self { + Self { mutation_types } } } @@ -781,47 +706,34 @@ pub mod codec { let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?; ensure!( - write_batch.mutations.len() == self.mutation_extras.len(), + write_batch.mutations.len() == self.mutation_types.len(), DataCorruptedSnafu { message: &format!( "expected {} mutations, but got {}", - self.mutation_extras.len(), + self.mutation_types.len(), write_batch.mutations.len() ) } ); - let mutations = self - .mutation_extras - .iter() - .zip(write_batch.mutations.into_iter()) - .map(|(ext, mtn)| match mtn.mutation { + let mutations = write_batch + .mutations + .into_iter() + .map(|mtn| match mtn.mutation { Some(write_batch::mutation::Mutation::Put(put)) => { - let column_schemas = schema.column_schemas(); - let valid_columns = if ext.column_null_mask.is_empty() { - column_schemas - .iter() - .map(|column| (column.name.clone(), column.data_type.clone())) - .collect::>() - } else { - BitVec::from_slice(&ext.column_null_mask) - .into_iter() - .zip(column_schemas.iter()) - .filter(|(is_null, _)| !*is_null) - .map(|(_, column)| (column.name.clone(), column.data_type.clone())) - .collect::>() - }; - let mut put_data = PutData::with_num_columns(put.columns.len()); - let res = valid_columns - .into_iter() + let res = schema + .column_schemas() + .iter() + .map(|column| (column.name.clone(), column.data_type.clone())) .zip(put.columns.into_iter()) .map(|((name, data_type), column)| { - gen_put_data_vector(data_type, column).map(|vector| (name, vector)) + gen_put_data_vector(data_type, column) + .map(|vector| (name, vector)) + .context(FromProtobufSnafu) }) - .collect::>>() - .context(FromProtobufSnafu {})? + .collect::>>()? .into_iter() .map(|(name, vector)| put_data.add_column_by_name(&name, vector)) .collect::>>(); @@ -861,7 +773,6 @@ mod tests { use super::*; use crate::codec::{Decoder, Encoder}; use crate::proto; - use crate::proto::wal::MutationExtra; use crate::test_util::write_batch_util; #[test] @@ -1113,7 +1024,7 @@ mod tests { ) } - fn gen_new_batch_and_extras() -> (WriteBatch, Vec) { + fn gen_new_batch_and_types() -> (WriteBatch, Vec) { let mut batch = new_test_batch(); for i in 0..10 { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); @@ -1129,21 +1040,21 @@ mod tests { batch.put(put_data).unwrap(); } - let extras = proto::wal::gen_mutation_extras(&batch); + let types = proto::wal::gen_mutation_types(&batch); - (batch, extras) + (batch, types) } #[test] fn test_codec_arrow() -> Result<()> { - let (batch, mutation_extras) = gen_new_batch_and_extras(); + let (batch, mutation_types) = gen_new_batch_and_types(); - let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone()); + let encoder = codec::WriteBatchArrowEncoder::new(); let mut dst = vec![]; let result = encoder.encode(&batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras); + let decoder = codec::WriteBatchArrowDecoder::new(mutation_types); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); @@ -1153,14 +1064,14 @@ mod tests { #[test] fn test_codec_protobuf() -> Result<()> { - let (batch, mutation_extras) = gen_new_batch_and_extras(); + let (batch, mutation_types) = gen_new_batch_and_types(); let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; let result = encoder.encode(&batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras); + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); @@ -1168,7 +1079,7 @@ mod tests { Ok(()) } - fn gen_new_batch_and_extras_with_none_column() -> (WriteBatch, Vec) { + fn gen_new_batch_and_types_with_none_column() -> (WriteBatch, Vec) { let mut batch = new_test_batch(); for _ in 0..10 { let intv = Arc::new(UInt64Vector::from_slice(&[1, 2, 3])); @@ -1182,21 +1093,21 @@ mod tests { batch.put(put_data).unwrap(); } - let extras = proto::wal::gen_mutation_extras(&batch); + let types = proto::wal::gen_mutation_types(&batch); - (batch, extras) + (batch, types) } #[test] fn test_codec_with_none_column_arrow() -> Result<()> { - let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column(); + let (batch, mutation_types) = gen_new_batch_and_types_with_none_column(); - let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.clone()); + let encoder = codec::WriteBatchArrowEncoder::new(); let mut dst = vec![]; let result = encoder.encode(&batch, &mut dst); assert!(result.is_ok()); - let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras); + let decoder = codec::WriteBatchArrowDecoder::new(mutation_types); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows); @@ -1206,13 +1117,13 @@ mod tests { #[test] fn test_codec_with_none_column_protobuf() -> Result<()> { - let (batch, mutation_extras) = gen_new_batch_and_extras_with_none_column(); + let (batch, mutation_types) = gen_new_batch_and_types_with_none_column(); let encoder = codec::WriteBatchProtobufEncoder {}; let mut dst = vec![]; encoder.encode(&batch, &mut dst).unwrap(); - let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras); + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_types); let result = decoder.decode(&dst); let batch2 = result?; assert_eq!(batch.num_rows, batch2.num_rows);