mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-23 16:30:39 +00:00
bench: read/write for memtable (#52)
* benchmark * fix style Co-authored-by: 张心怡 <zhangxinyi@zhangxinyideMacBook-Pro.local>
This commit is contained in:
@@ -14,3 +14,12 @@ datatypes = { path = "../datatypes" }
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
store-api = { path = "../store-api" }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.3"
|
||||
rand = "0.8"
|
||||
atomic_float="0.1"
|
||||
|
||||
[[bench]]
|
||||
name = "bench_main"
|
||||
harness = false
|
||||
|
||||
9
src/storage/benches/bench_main.rs
Normal file
9
src/storage/benches/bench_main.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use criterion::criterion_main;
|
||||
|
||||
mod memtable;
|
||||
|
||||
criterion_main! {
|
||||
memtable::bench_memtable_read::benches,
|
||||
memtable::bench_memtable_write::benches,
|
||||
memtable::bench_memtable_read_write_ratio::benches,
|
||||
}
|
||||
17
src/storage/benches/memtable/bench_memtable_read.rs
Normal file
17
src/storage/benches/memtable/bench_memtable_read.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
|
||||
use crate::memtable::{generate_kvs, util::bench_context::BenchContext};
|
||||
|
||||
fn bench_memtable_read(c: &mut Criterion) {
|
||||
// the length of string in value is 20
|
||||
let kvs = generate_kvs(10, 10000, 20);
|
||||
let ctx = BenchContext::new();
|
||||
kvs.iter().for_each(|kv| ctx.write(kv));
|
||||
let mut group = c.benchmark_group("memtable_read");
|
||||
group.throughput(Throughput::Elements(10 * 10000));
|
||||
group.bench_function("read", |b| b.iter(|| ctx.read(100)));
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_memtable_read);
|
||||
criterion_main!(benches);
|
||||
130
src/storage/benches/memtable/bench_memtable_read_write_ratio.rs
Normal file
130
src/storage/benches/memtable/bench_memtable_read_write_ratio.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use atomic_float::AtomicF64;
|
||||
use criterion::{
|
||||
criterion_group, criterion_main, BatchSize, Bencher, BenchmarkId, Criterion, Throughput,
|
||||
};
|
||||
use rand::Rng;
|
||||
|
||||
use crate::memtable::{generate_kvs, util::bench_context::BenchContext};
|
||||
|
||||
static READ_NUM: AtomicUsize = AtomicUsize::new(0);
|
||||
static WRITE_NUM: AtomicUsize = AtomicUsize::new(0);
|
||||
static READ_SECS: AtomicF64 = AtomicF64::new(0.0);
|
||||
static WRITE_SECS: AtomicF64 = AtomicF64::new(0.0);
|
||||
|
||||
struct Input {
|
||||
ratio: bool,
|
||||
kv_size: usize,
|
||||
batch_size: usize,
|
||||
}
|
||||
|
||||
fn memtable_round(ctx: &BenchContext, input: &Input) {
|
||||
if input.ratio {
|
||||
let now = Instant::now();
|
||||
let read_count = ctx.read(input.batch_size);
|
||||
let d = now.elapsed();
|
||||
READ_SECS.fetch_add(
|
||||
d.as_secs() as f64 + d.subsec_nanos() as f64 * 1e-9,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
READ_NUM.fetch_add(read_count, Ordering::Relaxed);
|
||||
} else {
|
||||
generate_kvs(input.kv_size, input.batch_size, 20)
|
||||
.iter()
|
||||
.for_each(|kv| {
|
||||
let now = Instant::now();
|
||||
ctx.write(kv);
|
||||
let d = now.elapsed();
|
||||
WRITE_SECS.fetch_add(
|
||||
d.as_secs() as f64 + d.subsec_nanos() as f64 * 1e-9,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
WRITE_NUM.fetch_add(kv.len(), Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn bench_read_write_ctx_frac(b: &mut Bencher<'_>, frac: &usize) {
|
||||
let frac = *frac;
|
||||
let ctx = Arc::new(BenchContext::default());
|
||||
let thread_ctx = ctx.clone();
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let thread_stop = stop.clone();
|
||||
|
||||
let handle = thread::spawn(move || {
|
||||
let mut rng = rand::thread_rng();
|
||||
while !thread_stop.load(Ordering::Relaxed) {
|
||||
let f = rng.gen_range(0..=10);
|
||||
let input = Input {
|
||||
ratio: f < frac,
|
||||
kv_size: 100,
|
||||
batch_size: 1000,
|
||||
};
|
||||
memtable_round(&thread_ctx, &input);
|
||||
}
|
||||
});
|
||||
|
||||
let mut rng = rand::thread_rng();
|
||||
b.iter_batched_ref(
|
||||
|| {
|
||||
let f = rng.gen_range(0..=10);
|
||||
Input {
|
||||
ratio: f < frac,
|
||||
kv_size: 100,
|
||||
batch_size: 1000,
|
||||
}
|
||||
},
|
||||
|input| {
|
||||
memtable_round(&ctx, input);
|
||||
},
|
||||
BatchSize::SmallInput,
|
||||
);
|
||||
stop.store(true, Ordering::Relaxed);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
#[allow(clippy::print_stdout)]
|
||||
fn bench_memtable_read_write_ratio(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("memtable_read_write_ratio");
|
||||
for i in 0..=10 {
|
||||
READ_NUM.store(0, Ordering::Relaxed);
|
||||
WRITE_NUM.store(0, Ordering::Relaxed);
|
||||
READ_SECS.store(0.0, Ordering::Relaxed);
|
||||
WRITE_SECS.store(0.0, Ordering::Relaxed);
|
||||
|
||||
group.bench_with_input(
|
||||
BenchmarkId::from_parameter(format!(
|
||||
"read ratio: {:.2}% , write ratio: {:.2}%",
|
||||
i as f64 / 10_f64 * 100.0,
|
||||
(10 - i) as f64 / 10_f64 * 100.0,
|
||||
)),
|
||||
&i,
|
||||
bench_read_write_ctx_frac,
|
||||
);
|
||||
group.throughput(Throughput::Elements(100 * 1000));
|
||||
|
||||
// the time is a little different the real time
|
||||
let read_num = READ_NUM.load(Ordering::Relaxed);
|
||||
let read_time = READ_SECS.load(Ordering::Relaxed);
|
||||
let read_tps = read_num as f64 / read_time as f64;
|
||||
let write_num = WRITE_NUM.load(Ordering::Relaxed);
|
||||
let write_time = WRITE_SECS.load(Ordering::Relaxed);
|
||||
let write_tps = write_num as f64 / write_time as f64;
|
||||
println!(
|
||||
"\nread numbers: {}, read thrpt: {}\nwrite numbers: {}, write thrpt {}\n",
|
||||
read_num, read_tps, write_num, write_tps
|
||||
);
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_memtable_read_write_ratio);
|
||||
criterion_main!(benches);
|
||||
19
src/storage/benches/memtable/bench_memtable_write.rs
Normal file
19
src/storage/benches/memtable/bench_memtable_write.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
|
||||
|
||||
use crate::memtable::generate_kvs;
|
||||
use crate::memtable::util::bench_context::BenchContext;
|
||||
|
||||
pub fn bench_memtable_write(c: &mut Criterion) {
|
||||
// the length of string in value is 20
|
||||
let kvs = generate_kvs(10, 1000, 20);
|
||||
let mut group = c.benchmark_group("memtable_write");
|
||||
group.throughput(Throughput::Elements(10 * 1000));
|
||||
group.bench_function("write", |b| {
|
||||
let ctx = BenchContext::new();
|
||||
b.iter(|| kvs.iter().for_each(|kv| ctx.write(kv)))
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_memtable_write);
|
||||
criterion_main!(benches);
|
||||
106
src/storage/benches/memtable/mod.rs
Normal file
106
src/storage/benches/memtable/mod.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
pub mod bench_memtable_read;
|
||||
pub mod bench_memtable_read_write_ratio;
|
||||
pub mod bench_memtable_write;
|
||||
pub mod util;
|
||||
|
||||
use std::sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use datatypes::{
|
||||
prelude::ScalarVectorBuilder,
|
||||
vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder},
|
||||
};
|
||||
use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng};
|
||||
use storage::memtable::KeyValues;
|
||||
use store_api::storage::{SequenceNumber, ValueType};
|
||||
|
||||
static NEXT_SEQUENCE: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
fn get_sequence() -> SequenceNumber {
|
||||
NEXT_SEQUENCE.fetch_add(1, Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn random_kv(rng: &mut ThreadRng, value_size: usize) -> ((i64, u64), (Option<u64>, String)) {
|
||||
let key0 = rng.gen_range(0..10000);
|
||||
let key1 = rng.gen::<u64>();
|
||||
let value1 = Some(rng.gen::<u64>());
|
||||
let value2 = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(value_size)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
((key0, key1), (value1, value2))
|
||||
}
|
||||
type KeyTuple = (i64, u64);
|
||||
type ValueTuple = (Option<u64>, String);
|
||||
|
||||
fn random_kvs(len: usize, value_size: usize) -> (Vec<KeyTuple>, Vec<ValueTuple>) {
|
||||
let mut keys = Vec::with_capacity(len);
|
||||
let mut values = Vec::with_capacity(len);
|
||||
for _ in 0..len {
|
||||
let mut rng = rand::thread_rng();
|
||||
let (key, value) = random_kv(&mut rng, value_size);
|
||||
keys.push(key);
|
||||
values.push(value);
|
||||
}
|
||||
(keys, values)
|
||||
}
|
||||
|
||||
fn kvs_with_index(
|
||||
sequence: SequenceNumber,
|
||||
value_type: ValueType,
|
||||
start_index_in_batch: usize,
|
||||
keys: &[(i64, u64)],
|
||||
values: &[(Option<u64>, String)],
|
||||
) -> KeyValues {
|
||||
let mut key_builders = (
|
||||
Int64VectorBuilder::with_capacity(keys.len()),
|
||||
UInt64VectorBuilder::with_capacity(keys.len()),
|
||||
);
|
||||
for key in keys {
|
||||
key_builders.0.push(Some(key.0));
|
||||
key_builders.1.push(Some(key.1));
|
||||
}
|
||||
let row_keys = vec![
|
||||
Arc::new(key_builders.0.finish()) as _,
|
||||
Arc::new(key_builders.1.finish()) as _,
|
||||
];
|
||||
let mut value_builders = (
|
||||
UInt64VectorBuilder::with_capacity(values.len()),
|
||||
StringVectorBuilder::with_capacity(values.len()),
|
||||
);
|
||||
for value in values {
|
||||
value_builders.0.push(value.0);
|
||||
value_builders.1.push(Some(&value.1));
|
||||
}
|
||||
let row_values = vec![
|
||||
Arc::new(value_builders.0.finish()) as _,
|
||||
Arc::new(value_builders.1.finish()) as _,
|
||||
];
|
||||
KeyValues {
|
||||
sequence,
|
||||
value_type,
|
||||
start_index_in_batch,
|
||||
keys: row_keys,
|
||||
values: row_values,
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_kv(kv_size: usize, start_index_in_batch: usize, value_size: usize) -> KeyValues {
|
||||
let (keys, values) = random_kvs(kv_size, value_size);
|
||||
kvs_with_index(
|
||||
get_sequence(),
|
||||
ValueType::Put,
|
||||
start_index_in_batch,
|
||||
&keys,
|
||||
&values,
|
||||
)
|
||||
}
|
||||
|
||||
fn generate_kvs(kv_size: usize, size: usize, value_size: usize) -> Vec<KeyValues> {
|
||||
(0..size)
|
||||
.map(|i| generate_kv(kv_size, i, value_size))
|
||||
.collect()
|
||||
}
|
||||
37
src/storage/benches/memtable/util/bench_context.rs
Normal file
37
src/storage/benches/memtable/util/bench_context.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use storage::memtable::{IterContext, KeyValues, MemtableRef};
|
||||
use store_api::storage::SequenceNumber;
|
||||
|
||||
use crate::memtable::util::new_memtable;
|
||||
|
||||
pub struct BenchContext {
|
||||
memtable: MemtableRef,
|
||||
}
|
||||
impl Default for BenchContext {
|
||||
fn default() -> Self {
|
||||
BenchContext::new()
|
||||
}
|
||||
}
|
||||
impl BenchContext {
|
||||
pub fn new() -> BenchContext {
|
||||
BenchContext {
|
||||
memtable: new_memtable(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write(&self, kvs: &KeyValues) {
|
||||
self.memtable.write(kvs).unwrap();
|
||||
}
|
||||
|
||||
pub fn read(&self, batch_size: usize) -> usize {
|
||||
let mut read_count = 0;
|
||||
let iter_ctx = IterContext {
|
||||
batch_size,
|
||||
visible_sequence: SequenceNumber::MAX,
|
||||
};
|
||||
let mut iter = self.memtable.iter(iter_ctx).unwrap();
|
||||
while let Ok(Some(_)) = iter.next() {
|
||||
read_count += batch_size;
|
||||
}
|
||||
read_count
|
||||
}
|
||||
}
|
||||
26
src/storage/benches/memtable/util/mod.rs
Normal file
26
src/storage/benches/memtable/util/mod.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
pub mod bench_context;
|
||||
pub mod regiondesc_util;
|
||||
pub mod schema_util;
|
||||
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
use storage::{
|
||||
memtable::{DefaultMemtableBuilder, MemtableBuilder, MemtableRef, MemtableSchema},
|
||||
metadata::RegionMetadata,
|
||||
};
|
||||
|
||||
use crate::memtable::util::regiondesc_util::RegionDescBuilder;
|
||||
|
||||
pub const TIMESTAMP_NAME: &str = "timestamp";
|
||||
|
||||
pub fn schema_for_test() -> MemtableSchema {
|
||||
let desc = RegionDescBuilder::new("bench")
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_value_column(("v2", LogicalTypeId::String, true))
|
||||
.build();
|
||||
let metadata: RegionMetadata = desc.try_into().unwrap();
|
||||
MemtableSchema::new(metadata.columns_row_key)
|
||||
}
|
||||
|
||||
pub fn new_memtable() -> MemtableRef {
|
||||
DefaultMemtableBuilder {}.build(schema_for_test())
|
||||
}
|
||||
58
src/storage/benches/memtable/util/regiondesc_util.rs
Normal file
58
src/storage/benches/memtable/util/regiondesc_util.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use store_api::storage::{
|
||||
ColumnDescriptor, ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId,
|
||||
RegionDescriptor, RowKeyDescriptorBuilder,
|
||||
};
|
||||
|
||||
use super::{schema_util::ColumnDef, TIMESTAMP_NAME};
|
||||
pub struct RegionDescBuilder {
|
||||
name: String,
|
||||
last_column_id: ColumnId,
|
||||
key_builder: RowKeyDescriptorBuilder,
|
||||
default_cf_builder: ColumnFamilyDescriptorBuilder,
|
||||
}
|
||||
|
||||
impl RegionDescBuilder {
|
||||
pub fn new<T: Into<String>>(name: T) -> Self {
|
||||
let key_builder = RowKeyDescriptorBuilder::new(
|
||||
ColumnDescriptorBuilder::new(2, TIMESTAMP_NAME, ConcreteDataType::int64_datatype())
|
||||
.is_nullable(false)
|
||||
.build(),
|
||||
);
|
||||
|
||||
Self {
|
||||
name: name.into(),
|
||||
last_column_id: 2,
|
||||
key_builder,
|
||||
default_cf_builder: ColumnFamilyDescriptorBuilder::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_value_column(mut self, column_def: ColumnDef) -> Self {
|
||||
let column = self.new_column(column_def);
|
||||
self.default_cf_builder = self.default_cf_builder.push_column(column);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RegionDescriptor {
|
||||
RegionDescriptor {
|
||||
id: 0,
|
||||
name: self.name,
|
||||
row_key: self.key_builder.build(),
|
||||
default_cf: self.default_cf_builder.build(),
|
||||
extra_cfs: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn alloc_column_id(&mut self) -> ColumnId {
|
||||
self.last_column_id += 1;
|
||||
self.last_column_id
|
||||
}
|
||||
|
||||
fn new_column(&mut self, column_def: ColumnDef) -> ColumnDescriptor {
|
||||
let datatype = column_def.1.data_type();
|
||||
ColumnDescriptorBuilder::new(self.alloc_column_id(), column_def.0, datatype)
|
||||
.is_nullable(column_def.2)
|
||||
.build()
|
||||
}
|
||||
}
|
||||
3
src/storage/benches/memtable/util/schema_util.rs
Normal file
3
src/storage/benches/memtable/util/schema_util.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
use datatypes::type_id::LogicalTypeId;
|
||||
|
||||
pub type ColumnDef<'a> = (&'a str, LogicalTypeId, bool);
|
||||
@@ -149,7 +149,6 @@ async fn test_simple_put_scan() {
|
||||
let output = tester.full_scan().await;
|
||||
assert_eq!(data, output);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sequence_increase() {
|
||||
let tester = Tester::default();
|
||||
|
||||
Reference in New Issue
Block a user