mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
wal_benchmark (#188)
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
use criterion::criterion_main;
|
||||
|
||||
mod memtable;
|
||||
mod wal;
|
||||
|
||||
criterion_main! {
|
||||
memtable::bench_memtable_read::benches,
|
||||
memtable::bench_memtable_write::benches,
|
||||
memtable::bench_memtable_read_write_ratio::benches,
|
||||
wal::bench_wal::benches,
|
||||
wal::bench_decode::benches,
|
||||
wal::bench_encode::benches,
|
||||
}
|
||||
|
||||
@@ -1,3 +1,30 @@
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::*;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder, SchemaRef};
|
||||
|
||||
/// Column definition: (name, datatype, is_nullable)
|
||||
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
|
||||
|
||||
pub fn new_schema(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> Schema {
|
||||
let column_schemas = column_defs
|
||||
.iter()
|
||||
.map(|column_def| {
|
||||
let datatype = column_def.1.data_type();
|
||||
ColumnSchema::new(column_def.0, datatype, column_def.2)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if let Some(index) = timestamp_index {
|
||||
SchemaBuilder::from(column_schemas)
|
||||
.timestamp_index(index)
|
||||
.build()
|
||||
.unwrap()
|
||||
} else {
|
||||
Schema::new(column_schemas)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_schema_ref(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> SchemaRef {
|
||||
Arc::new(new_schema(column_defs, timestamp_index))
|
||||
}
|
||||
|
||||
94
src/storage/benches/wal/bench_decode.rs
Normal file
94
src/storage/benches/wal/bench_decode.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use storage::codec::{Decoder, Encoder};
|
||||
use storage::write_batch::{codec, WriteBatch};
|
||||
|
||||
use super::util::gen_new_batch_and_extras;
|
||||
|
||||
tonic::include_proto!("greptime.storage.wal.v1");
|
||||
|
||||
/*
|
||||
-------------------------------------
|
||||
decode |
|
||||
-------------------------------------
|
||||
rows | protobuf | arrow |
|
||||
------------------------------------
|
||||
10 | 8.6485 us | 8.8028 us |
|
||||
------------------------------------
|
||||
100 | 63.850 us | 46.174 us |
|
||||
------------------------------------
|
||||
10000| 654.46 us | 433.58 us |
|
||||
------------------------------------
|
||||
*/
|
||||
|
||||
fn encode_arrow(
|
||||
batch: &WriteBatch,
|
||||
mutation_extras: &[storage::proto::wal::MutationExtra],
|
||||
dst: &mut Vec<u8>,
|
||||
) {
|
||||
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
|
||||
let result = encoder.encode(batch, dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn encode_protobuf(batch: &WriteBatch, dst: &mut Vec<u8>) {
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let result = encoder.encode(batch, dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn decode_arrow(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec());
|
||||
let result = decoder.decode(dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn decode_protobuf(dst: &[u8], mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec());
|
||||
let result = decoder.decode(dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn bench_wal_decode(c: &mut Criterion) {
|
||||
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
|
||||
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
|
||||
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
|
||||
let mut dst_protobuf_10 = vec![];
|
||||
let mut dst_protobuf_100 = vec![];
|
||||
let mut dst_protobuf_10000 = vec![];
|
||||
|
||||
let mut dst_arrow_10 = vec![];
|
||||
let mut dst_arrow_100 = vec![];
|
||||
let mut dst_arrow_10000 = vec![];
|
||||
|
||||
encode_protobuf(&batch_10, &mut dst_protobuf_10);
|
||||
encode_protobuf(&batch_100, &mut dst_protobuf_100);
|
||||
encode_protobuf(&batch_10000, &mut dst_protobuf_10000);
|
||||
|
||||
encode_arrow(&batch_10, &extras_10, &mut dst_arrow_10);
|
||||
encode_arrow(&batch_100, &extras_100, &mut dst_arrow_100);
|
||||
encode_arrow(&batch_10000, &extras_10000, &mut dst_arrow_10000);
|
||||
|
||||
let mut group = c.benchmark_group("wal_decode");
|
||||
group.bench_function("protobuf_decode_with_10_num_rows", |b| {
|
||||
b.iter(|| decode_protobuf(&dst_protobuf_10, &extras_10))
|
||||
});
|
||||
group.bench_function("protobuf_decode_with_100_num_rows", |b| {
|
||||
b.iter(|| decode_protobuf(&dst_protobuf_100, &extras_100))
|
||||
});
|
||||
group.bench_function("protobuf_decode_with_10000_num_rows", |b| {
|
||||
b.iter(|| decode_protobuf(&dst_protobuf_10000, &extras_10000))
|
||||
});
|
||||
group.bench_function("arrow_decode_with_10_num_rows", |b| {
|
||||
b.iter(|| decode_arrow(&dst_arrow_10, &extras_10))
|
||||
});
|
||||
group.bench_function("arrow_decode_with_100_num_rows", |b| {
|
||||
b.iter(|| decode_arrow(&dst_arrow_100, &extras_100))
|
||||
});
|
||||
group.bench_function("arrow_decode_with_10000_num_rows", |b| {
|
||||
b.iter(|| decode_arrow(&dst_arrow_10000, &extras_10000))
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_wal_decode);
|
||||
criterion_main!(benches);
|
||||
65
src/storage/benches/wal/bench_encode.rs
Normal file
65
src/storage/benches/wal/bench_encode.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use storage::codec::Encoder;
|
||||
use storage::write_batch::{codec, WriteBatch};
|
||||
|
||||
use super::util::gen_new_batch_and_extras;
|
||||
|
||||
tonic::include_proto!("greptime.storage.wal.v1");
|
||||
|
||||
/*
|
||||
-------------------------------------
|
||||
encode |
|
||||
-------------------------------------
|
||||
rows | protobuf | arrow |
|
||||
------------------------------------
|
||||
10 | 4.8732 us | 5.7388 us |
|
||||
------------------------------------
|
||||
100 | 40.928 us | 24.988 us |
|
||||
------------------------------------
|
||||
10000| 425.69 us | 229.74 us |
|
||||
------------------------------------
|
||||
*/
|
||||
|
||||
fn encode_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn encode_protobuf(batch: &WriteBatch, _mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn bench_wal_encode(c: &mut Criterion) {
|
||||
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
|
||||
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
|
||||
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
|
||||
|
||||
let mut group = c.benchmark_group("wal_encode");
|
||||
group.bench_function("protobuf_encode_with_10_num_rows", |b| {
|
||||
b.iter(|| encode_protobuf(&batch_10, &extras_10))
|
||||
});
|
||||
group.bench_function("protobuf_encode_with_100_num_rows", |b| {
|
||||
b.iter(|| encode_protobuf(&batch_100, &extras_100))
|
||||
});
|
||||
group.bench_function("protobuf_encode_with_10000_num_rows", |b| {
|
||||
b.iter(|| encode_protobuf(&batch_10000, &extras_10000))
|
||||
});
|
||||
group.bench_function("arrow_encode_with_10_num_rows", |b| {
|
||||
b.iter(|| encode_arrow(&batch_10, &extras_10))
|
||||
});
|
||||
group.bench_function("arrow_encode_with_100_num_rows", |b| {
|
||||
b.iter(|| encode_arrow(&batch_100, &extras_100))
|
||||
});
|
||||
group.bench_function("arrow_encode_with_10000_num_rows", |b| {
|
||||
b.iter(|| encode_arrow(&batch_10000, &extras_10000))
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_wal_encode);
|
||||
criterion_main!(benches);
|
||||
72
src/storage/benches/wal/bench_wal.rs
Normal file
72
src/storage/benches/wal/bench_wal.rs
Normal file
@@ -0,0 +1,72 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use storage::codec::{Decoder, Encoder};
|
||||
use storage::write_batch::{codec, WriteBatch};
|
||||
|
||||
use super::util::gen_new_batch_and_extras;
|
||||
|
||||
tonic::include_proto!("greptime.storage.wal.v1");
|
||||
|
||||
/*
|
||||
-------------------------------------
|
||||
encode & decode |
|
||||
-------------------------------------
|
||||
rows | protobuf | arrow |
|
||||
------------------------------------
|
||||
10 | 13.845 us | 15.093 us |
|
||||
------------------------------------
|
||||
100 | 106.70 us | 73.895 us |
|
||||
------------------------------------
|
||||
10000| 1.0860 ms | 680.12 us |
|
||||
------------------------------------
|
||||
*/
|
||||
|
||||
fn codec_arrow(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let encoder = codec::WriteBatchArrowEncoder::new(mutation_extras.to_vec());
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = codec::WriteBatchArrowDecoder::new(mutation_extras.to_vec());
|
||||
let result = decoder.decode(&dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
fn codec_protobuf(batch: &WriteBatch, mutation_extras: &[storage::proto::wal::MutationExtra]) {
|
||||
let encoder = codec::WriteBatchProtobufEncoder {};
|
||||
let mut dst = vec![];
|
||||
let result = encoder.encode(batch, &mut dst);
|
||||
assert!(result.is_ok());
|
||||
|
||||
let decoder = codec::WriteBatchProtobufDecoder::new(mutation_extras.to_vec());
|
||||
let result = decoder.decode(&dst);
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
fn bench_wal_encode_decode(c: &mut Criterion) {
|
||||
let (batch_10, extras_10) = gen_new_batch_and_extras(1);
|
||||
let (batch_100, extras_100) = gen_new_batch_and_extras(10);
|
||||
let (batch_10000, extras_10000) = gen_new_batch_and_extras(100);
|
||||
|
||||
let mut group = c.benchmark_group("wal_encode_decode");
|
||||
group.bench_function("protobuf_encode_decode_with_10_num_rows", |b| {
|
||||
b.iter(|| codec_protobuf(&batch_10, &extras_10))
|
||||
});
|
||||
group.bench_function("protobuf_encode_decode_with_100_num_rows", |b| {
|
||||
b.iter(|| codec_protobuf(&batch_100, &extras_100))
|
||||
});
|
||||
group.bench_function("protobuf_encode_decode_with_10000_num_rows", |b| {
|
||||
b.iter(|| codec_protobuf(&batch_10000, &extras_10000))
|
||||
});
|
||||
group.bench_function("arrow_encode_decode_with_10_num_rows", |b| {
|
||||
b.iter(|| codec_arrow(&batch_10, &extras_10))
|
||||
});
|
||||
group.bench_function("arrow_encode_decode_with_100_num_rows", |b| {
|
||||
b.iter(|| codec_arrow(&batch_100, &extras_100))
|
||||
});
|
||||
group.bench_function("arrow_encode_decode_with_10000_num_rows", |b| {
|
||||
b.iter(|| codec_arrow(&batch_10000, &extras_10000))
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_wal_encode_decode);
|
||||
criterion_main!(benches);
|
||||
4
src/storage/benches/wal/mod.rs
Normal file
4
src/storage/benches/wal/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
pub mod bench_decode;
|
||||
pub mod bench_encode;
|
||||
pub mod bench_wal;
|
||||
pub mod util;
|
||||
80
src/storage/benches/wal/util/mod.rs
Normal file
80
src/storage/benches/wal/util/mod.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::{
|
||||
prelude::ScalarVector,
|
||||
type_id::LogicalTypeId,
|
||||
vectors::{BooleanVector, Float64Vector, Int64Vector, StringVector, UInt64Vector},
|
||||
};
|
||||
use rand::Rng;
|
||||
use storage::{
|
||||
proto,
|
||||
write_batch::{PutData, WriteBatch},
|
||||
};
|
||||
use store_api::storage::{consts, PutOperation, WriteRequest};
|
||||
pub mod write_batch_util;
|
||||
pub fn new_test_batch() -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k1", LogicalTypeId::UInt64, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("ts", LogicalTypeId::Int64, false),
|
||||
("v1", LogicalTypeId::Boolean, true),
|
||||
("4", LogicalTypeId::Float64, false),
|
||||
("5", LogicalTypeId::Float64, false),
|
||||
("6", LogicalTypeId::Float64, false),
|
||||
("7", LogicalTypeId::Float64, false),
|
||||
("8", LogicalTypeId::Float64, false),
|
||||
("9", LogicalTypeId::Float64, false),
|
||||
("10", LogicalTypeId::String, false),
|
||||
],
|
||||
Some(2),
|
||||
)
|
||||
}
|
||||
pub fn gen_new_batch_and_extras(
|
||||
putdate_nums: usize,
|
||||
) -> (WriteBatch, Vec<storage::proto::wal::MutationExtra>) {
|
||||
let mut batch = new_test_batch();
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 0..putdate_nums {
|
||||
let mut intvs = [0u64; 10];
|
||||
let mut boolvs = [true; 10];
|
||||
let mut tsvs = [0i64; 10];
|
||||
let mut fvs = [0.0_f64; 10];
|
||||
let svs = [
|
||||
"value1_string",
|
||||
"value2_string",
|
||||
"value3_string",
|
||||
"value4_string",
|
||||
"value5_string",
|
||||
"value6_string",
|
||||
"value7_string",
|
||||
"value8_string",
|
||||
"value9_string",
|
||||
"value10_string",
|
||||
];
|
||||
rng.fill(&mut intvs[..]);
|
||||
rng.fill(&mut boolvs[..]);
|
||||
rng.fill(&mut tsvs[..]);
|
||||
rng.fill(&mut fvs[..]);
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&intvs));
|
||||
let boolv = Arc::new(BooleanVector::from(boolvs.to_vec()));
|
||||
let tsv = Arc::new(Int64Vector::from_slice(&tsvs));
|
||||
let fvs = Arc::new(Float64Vector::from_slice(&fvs));
|
||||
let svs = Arc::new(StringVector::from_slice(&svs));
|
||||
let mut put_data = PutData::default();
|
||||
put_data.add_key_column("k1", intv.clone()).unwrap();
|
||||
put_data.add_version_column(intv).unwrap();
|
||||
put_data.add_value_column("v1", boolv).unwrap();
|
||||
put_data.add_key_column("ts", tsv.clone()).unwrap();
|
||||
put_data.add_key_column("4", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("5", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("6", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("7", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("8", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("9", fvs.clone()).unwrap();
|
||||
put_data.add_key_column("10", svs.clone()).unwrap();
|
||||
batch.put(put_data).unwrap();
|
||||
}
|
||||
let extras = proto::wal::gen_mutation_extras(&batch);
|
||||
(batch, extras)
|
||||
}
|
||||
9
src/storage/benches/wal/util/write_batch_util.rs
Normal file
9
src/storage/benches/wal/util/write_batch_util.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use storage::write_batch::WriteBatch;
|
||||
|
||||
use crate::memtable::util::schema_util::{self, ColumnDef};
|
||||
|
||||
pub fn new_write_batch(column_defs: &[ColumnDef], timestamp_index: Option<usize>) -> WriteBatch {
|
||||
let schema = schema_util::new_schema_ref(column_defs, timestamp_index);
|
||||
|
||||
WriteBatch::new(schema)
|
||||
}
|
||||
@@ -3,7 +3,7 @@ mod arrow_stream;
|
||||
mod background;
|
||||
mod bit_vec;
|
||||
mod chunk;
|
||||
mod codec;
|
||||
pub mod codec;
|
||||
pub mod config;
|
||||
mod engine;
|
||||
pub mod error;
|
||||
@@ -11,7 +11,7 @@ mod flush;
|
||||
pub mod manifest;
|
||||
pub mod memtable;
|
||||
pub mod metadata;
|
||||
mod proto;
|
||||
pub mod proto;
|
||||
mod read;
|
||||
mod region;
|
||||
pub mod schema;
|
||||
|
||||
@@ -739,7 +739,7 @@ pub mod codec {
|
||||
message: "schema required",
|
||||
})?;
|
||||
|
||||
let schema: SchemaRef = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
|
||||
let schema = SchemaRef::try_from(schema).context(FromProtobufSnafu {})?;
|
||||
|
||||
ensure!(
|
||||
write_batch.mutations.len() == self.mutation_extras.len(),
|
||||
|
||||
Reference in New Issue
Block a user