From 787aab9c009130338bb2365db805961895d672a8 Mon Sep 17 00:00:00 2001 From: egg <92360611+fariygirl@users.noreply.github.com> Date: Tue, 30 Aug 2022 14:49:31 +0800 Subject: [PATCH] wal_benchmark (#188) --- src/storage/benches/bench_main.rs | 4 + .../benches/memtable/util/schema_util.rs | 29 +++++- src/storage/benches/wal/bench_decode.rs | 94 +++++++++++++++++++ src/storage/benches/wal/bench_encode.rs | 65 +++++++++++++ src/storage/benches/wal/bench_wal.rs | 72 ++++++++++++++ src/storage/benches/wal/mod.rs | 4 + src/storage/benches/wal/util/mod.rs | 80 ++++++++++++++++ .../benches/wal/util/write_batch_util.rs | 9 ++ src/storage/src/lib.rs | 4 +- src/storage/src/write_batch.rs | 2 +- 10 files changed, 359 insertions(+), 4 deletions(-) create mode 100644 src/storage/benches/wal/bench_decode.rs create mode 100644 src/storage/benches/wal/bench_encode.rs create mode 100644 src/storage/benches/wal/bench_wal.rs create mode 100644 src/storage/benches/wal/mod.rs create mode 100644 src/storage/benches/wal/util/mod.rs create mode 100644 src/storage/benches/wal/util/write_batch_util.rs diff --git a/src/storage/benches/bench_main.rs b/src/storage/benches/bench_main.rs index ea6b9b86c4..cdaa4eec90 100644 --- a/src/storage/benches/bench_main.rs +++ b/src/storage/benches/bench_main.rs @@ -1,9 +1,13 @@ use criterion::criterion_main; mod memtable; +mod wal; criterion_main! { memtable::bench_memtable_read::benches, memtable::bench_memtable_write::benches, memtable::bench_memtable_read_write_ratio::benches, + wal::bench_wal::benches, + wal::bench_decode::benches, + wal::bench_encode::benches, } diff --git a/src/storage/benches/memtable/util/schema_util.rs b/src/storage/benches/memtable/util/schema_util.rs index eb7bfee3d7..ac0602359d 100644 --- a/src/storage/benches/memtable/util/schema_util.rs +++ b/src/storage/benches/memtable/util/schema_util.rs @@ -1,3 +1,30 @@ -use datatypes::type_id::LogicalTypeId; +use std::sync::Arc; +use datatypes::prelude::*; +use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef}; + +/// Column definition: (name, datatype, is_nullable) pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool); + +pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option) -> Schema { + let column_schemas = column_defs + .iter() + .map(|column_def| { + let datatype = column_def.1.data_type(); + ColumnSchema::new(column_def.0, datatype, column_def.2) + }) + .collect(); + + if let Some(index) = timestamp_index { + SchemaBuilder::from(column_schemas) + .timestamp_index(index) + .build() + .unwrap() + } else { + Schema::new(column_schemas) + } +} + +pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option) -> SchemaRef { + Arc::new(new_schema(column_defs, timestamp_index)) +} diff --git a/src/storage/benches/wal/bench_decode.rs b/src/storage/benches/wal/bench_decode.rs new file mode 100644 index 0000000000..baab556675 --- /dev/null +++ b/src/storage/benches/wal/bench_decode.rs @@ -0,0 +1,94 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use storage::codec::{Decoder, Encoder}; +use storage::write_batch::{codec, WriteBatch}; + +use super::util::gen_new_batch_and_extras; + +tonic::include_proto!("greptime.storage.wal.v1"); + +/* +------------------------------------- + decode | +------------------------------------- +rows | protobuf | arrow | +------------------------------------ +10 | 8.6485 us | 8.8028 us | +------------------------------------ +100 | 63.850 us | 46.174 us | +------------------------------------ +10000| 654.46 us | 433.58 us | +------------------------------------ +*/ + +fn encode_arrow( + batch: &WriteBatch, + mutation_extras: &[storage::proto::wal::MutationExtra], + dst: &mut Vec, +) { + let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); + let result = encoder.encode(batch, dst); + assert!(result.is_ok()); +} + +fn encode_protobuf(batch: &WriteBatch, dst: &mut Vec) { + let encoder = codec::WriteBatchProtobufEncoder {}; + let result = encoder.encode(batch, dst); + assert!(result.is_ok()); +} + +fn decode_arrow(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) { + let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec()); + let result = decoder.decode(dst); + assert!(result.is_ok()); +} + +fn decode_protobuf(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) { + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec()); + let result = decoder.decode(dst); + assert!(result.is_ok()); +} + +fn bench_wal_decode(c: &mut Criterion) { + let (batch_10, extras_10) = gen_new_batch_and_extras(1); + let (batch_100, extras_100) = gen_new_batch_and_extras(10); + let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + let mut dst_protobuf_10 = vec![]; + let mut dst_protobuf_100 = vec![]; + let mut dst_protobuf_10000 = vec![]; + + let mut dst_arrow_10 = vec![]; + let mut dst_arrow_100 = vec![]; + let mut dst_arrow_10000 = vec![]; + + encode_protobuf(&batch_10, &mut dst_protobuf_10); + encode_protobuf(&batch_100, &mut dst_protobuf_100); + encode_protobuf(&batch_10000, &mut dst_protobuf_10000); + + encode_arrow(&batch_10, &extras_10, &mut dst_arrow_10); + encode_arrow(&batch_100, &extras_100, &mut dst_arrow_100); + encode_arrow(&batch_10000, &extras_10000, &mut dst_arrow_10000); + + let mut group = c.benchmark_group("wal_decode"); + group.bench_function("protobuf_decode_with_10_num_rows", |b| { + b.iter(|| decode_protobuf(&dst_protobuf_10, &extras_10)) + }); + group.bench_function("protobuf_decode_with_100_num_rows", |b| { + b.iter(|| decode_protobuf(&dst_protobuf_100, &extras_100)) + }); + group.bench_function("protobuf_decode_with_10000_num_rows", |b| { + b.iter(|| decode_protobuf(&dst_protobuf_10000, &extras_10000)) + }); + group.bench_function("arrow_decode_with_10_num_rows", |b| { + b.iter(|| decode_arrow(&dst_arrow_10, &extras_10)) + }); + group.bench_function("arrow_decode_with_100_num_rows", |b| { + b.iter(|| decode_arrow(&dst_arrow_100, &extras_100)) + }); + group.bench_function("arrow_decode_with_10000_num_rows", |b| { + b.iter(|| decode_arrow(&dst_arrow_10000, &extras_10000)) + }); + group.finish(); +} + +criterion_group!(benches, bench_wal_decode); +criterion_main!(benches); diff --git a/src/storage/benches/wal/bench_encode.rs b/src/storage/benches/wal/bench_encode.rs new file mode 100644 index 0000000000..707514a48c --- /dev/null +++ b/src/storage/benches/wal/bench_encode.rs @@ -0,0 +1,65 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use storage::codec::Encoder; +use storage::write_batch::{codec, WriteBatch}; + +use super::util::gen_new_batch_and_extras; + +tonic::include_proto!("greptime.storage.wal.v1"); + +/* +------------------------------------- + encode | +------------------------------------- +rows | protobuf | arrow | +------------------------------------ +10 | 4.8732 us | 5.7388 us | +------------------------------------ +100 | 40.928 us | 24.988 us | +------------------------------------ +10000| 425.69 us | 229.74 us | +------------------------------------ +*/ + +fn encode_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { + let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); + let mut dst = vec![]; + let result = encoder.encode(batch, &mut dst); + assert!(result.is_ok()); +} + +fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal::MutationExtra]) { + let encoder = codec::WriteBatchProtobufEncoder {}; + let mut dst = vec![]; + let result = encoder.encode(batch, &mut dst); + assert!(result.is_ok()); +} + +fn bench_wal_encode(c: &mut Criterion) { + let (batch_10, extras_10) = gen_new_batch_and_extras(1); + let (batch_100, extras_100) = gen_new_batch_and_extras(10); + let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + + let mut group = c.benchmark_group("wal_encode"); + group.bench_function("protobuf_encode_with_10_num_rows", |b| { + b.iter(|| encode_protobuf(&batch_10, &extras_10)) + }); + group.bench_function("protobuf_encode_with_100_num_rows", |b| { + b.iter(|| encode_protobuf(&batch_100, &extras_100)) + }); + group.bench_function("protobuf_encode_with_10000_num_rows", |b| { + b.iter(|| encode_protobuf(&batch_10000, &extras_10000)) + }); + group.bench_function("arrow_encode_with_10_num_rows", |b| { + b.iter(|| encode_arrow(&batch_10, &extras_10)) + }); + group.bench_function("arrow_encode_with_100_num_rows", |b| { + b.iter(|| encode_arrow(&batch_100, &extras_100)) + }); + group.bench_function("arrow_encode_with_10000_num_rows", |b| { + b.iter(|| encode_arrow(&batch_10000, &extras_10000)) + }); + group.finish(); +} + +criterion_group!(benches, bench_wal_encode); +criterion_main!(benches); diff --git a/src/storage/benches/wal/bench_wal.rs b/src/storage/benches/wal/bench_wal.rs new file mode 100644 index 0000000000..01659229ad --- /dev/null +++ b/src/storage/benches/wal/bench_wal.rs @@ -0,0 +1,72 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use storage::codec::{Decoder, Encoder}; +use storage::write_batch::{codec, WriteBatch}; + +use super::util::gen_new_batch_and_extras; + +tonic::include_proto!("greptime.storage.wal.v1"); + +/* +------------------------------------- + encode & decode | +------------------------------------- +rows | protobuf | arrow | +------------------------------------ +10 | 13.845 us | 15.093 us | +------------------------------------ +100 | 106.70 us | 73.895 us | +------------------------------------ +10000| 1.0860 ms | 680.12 us | +------------------------------------ +*/ + +fn codec_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { + let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec()); + let mut dst = vec![]; + let result = encoder.encode(batch, &mut dst); + assert!(result.is_ok()); + + let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec()); + let result = decoder.decode(&dst); + assert!(result.is_ok()); +} +fn codec_protobuf(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) { + let encoder = codec::WriteBatchProtobufEncoder {}; + let mut dst = vec![]; + let result = encoder.encode(batch, &mut dst); + assert!(result.is_ok()); + + let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec()); + let result = decoder.decode(&dst); + assert!(result.is_ok()); +} + +fn bench_wal_encode_decode(c: &mut Criterion) { + let (batch_10, extras_10) = gen_new_batch_and_extras(1); + let (batch_100, extras_100) = gen_new_batch_and_extras(10); + let (batch_10000, extras_10000) = gen_new_batch_and_extras(100); + + let mut group = c.benchmark_group("wal_encode_decode"); + group.bench_function("protobuf_encode_decode_with_10_num_rows", |b| { + b.iter(|| codec_protobuf(&batch_10, &extras_10)) + }); + group.bench_function("protobuf_encode_decode_with_100_num_rows", |b| { + b.iter(|| codec_protobuf(&batch_100, &extras_100)) + }); + group.bench_function("protobuf_encode_decode_with_10000_num_rows", |b| { + b.iter(|| codec_protobuf(&batch_10000, &extras_10000)) + }); + group.bench_function("arrow_encode_decode_with_10_num_rows", |b| { + b.iter(|| codec_arrow(&batch_10, &extras_10)) + }); + group.bench_function("arrow_encode_decode_with_100_num_rows", |b| { + b.iter(|| codec_arrow(&batch_100, &extras_100)) + }); + group.bench_function("arrow_encode_decode_with_10000_num_rows", |b| { + b.iter(|| codec_arrow(&batch_10000, &extras_10000)) + }); + group.finish(); +} + +criterion_group!(benches, bench_wal_encode_decode); +criterion_main!(benches); diff --git a/src/storage/benches/wal/mod.rs b/src/storage/benches/wal/mod.rs new file mode 100644 index 0000000000..cac6d037f9 --- /dev/null +++ b/src/storage/benches/wal/mod.rs @@ -0,0 +1,4 @@ +pub mod bench_decode; +pub mod bench_encode; +pub mod bench_wal; +pub mod util; diff --git a/src/storage/benches/wal/util/mod.rs b/src/storage/benches/wal/util/mod.rs new file mode 100644 index 0000000000..e624970a4f --- /dev/null +++ b/src/storage/benches/wal/util/mod.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use datatypes::{ + prelude::ScalarVector, + type_id::LogicalTypeId, + vectors::{BooleanVector, Float64Vector, Int64Vector, StringVector, UInt64Vector}, +}; +use rand::Rng; +use storage::{ + proto, + write_batch::{PutData, WriteBatch}, +}; +use store_api::storage::{consts, PutOperation, WriteRequest}; +pub mod write_batch_util; +pub fn new_test_batch() -> WriteBatch { + write_batch_util::new_write_batch( + &[ + ("k1", LogicalTypeId::UInt64, false), + (consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false), + ("ts", LogicalTypeId::Int64, false), + ("v1", LogicalTypeId::Boolean, true), + ("4", LogicalTypeId::Float64, false), + ("5", LogicalTypeId::Float64, false), + ("6", LogicalTypeId::Float64, false), + ("7", LogicalTypeId::Float64, false), + ("8", LogicalTypeId::Float64, false), + ("9", LogicalTypeId::Float64, false), + ("10", LogicalTypeId::String, false), + ], + Some(2), + ) +} +pub fn gen_new_batch_and_extras( + putdate_nums: usize, +) -> (WriteBatch, Vec) { + let mut batch = new_test_batch(); + let mut rng = rand::thread_rng(); + for _ in 0..putdate_nums { + let mut intvs = [0u64; 10]; + let mut boolvs = [true; 10]; + let mut tsvs = [0i64; 10]; + let mut fvs = [0.0_f64; 10]; + let svs = [ + "value1_string", + "value2_string", + "value3_string", + "value4_string", + "value5_string", + "value6_string", + "value7_string", + "value8_string", + "value9_string", + "value10_string", + ]; + rng.fill(&mut intvs[..]); + rng.fill(&mut boolvs[..]); + rng.fill(&mut tsvs[..]); + rng.fill(&mut fvs[..]); + let intv = Arc::new(UInt64Vector::from_slice(&intvs)); + let boolv = Arc::new(BooleanVector::from(boolvs.to_vec())); + let tsv = Arc::new(Int64Vector::from_slice(&tsvs)); + let fvs = Arc::new(Float64Vector::from_slice(&fvs)); + let svs = Arc::new(StringVector::from_slice(&svs)); + let mut put_data = PutData::default(); + put_data.add_key_column("k1", intv.clone()).unwrap(); + put_data.add_version_column(intv).unwrap(); + put_data.add_value_column("v1", boolv).unwrap(); + put_data.add_key_column("ts", tsv.clone()).unwrap(); + put_data.add_key_column("4", fvs.clone()).unwrap(); + put_data.add_key_column("5", fvs.clone()).unwrap(); + put_data.add_key_column("6", fvs.clone()).unwrap(); + put_data.add_key_column("7", fvs.clone()).unwrap(); + put_data.add_key_column("8", fvs.clone()).unwrap(); + put_data.add_key_column("9", fvs.clone()).unwrap(); + put_data.add_key_column("10", svs.clone()).unwrap(); + batch.put(put_data).unwrap(); + } + let extras = proto::wal::gen_mutation_extras(&batch); + (batch, extras) +} diff --git a/src/storage/benches/wal/util/write_batch_util.rs b/src/storage/benches/wal/util/write_batch_util.rs new file mode 100644 index 0000000000..8bcb167e0f --- /dev/null +++ b/src/storage/benches/wal/util/write_batch_util.rs @@ -0,0 +1,9 @@ +use storage::write_batch::WriteBatch; + +use crate::memtable::util::schema_util::{self, ColumnDef}; + +pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option) -> WriteBatch { + let schema = schema_util::new_schema_ref(column_defs, timestamp_index); + + WriteBatch::new(schema) +} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 627eb37c17..3901f12140 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -3,7 +3,7 @@ mod arrow_stream; mod background; mod bit_vec; mod chunk; -mod codec; +pub mod codec; pub mod config; mod engine; pub mod error; @@ -11,7 +11,7 @@ mod flush; pub mod manifest; pub mod memtable; pub mod metadata; -mod proto; +pub mod proto; mod read; mod region; pub mod schema; diff --git a/src/storage/src/write_batch.rs b/src/storage/src/write_batch.rs index 5176169eba..38c98bae1d 100644 --- a/src/storage/src/write_batch.rs +++ b/src/storage/src/write_batch.rs @@ -739,7 +739,7 @@ pub mod codec { message: "schema required", })?; - let schema: SchemaRef = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?; + let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?; ensure!( write_batch.mutations.len() == self.mutation_extras.len(),