mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix: Fix broken wal and memtable benchmarks (#320)
This commit is contained in:
@@ -114,14 +114,24 @@ fn bench_memtable_read_write_ratio(c: &mut Criterion) {
|
||||
// the time is a little different the real time
|
||||
let read_num = READ_NUM.load(Ordering::Relaxed);
|
||||
let read_time = READ_SECS.load(Ordering::Relaxed);
|
||||
let read_tps = read_num as f64 / read_time as f64;
|
||||
let read_tps = if read_time != 0.0 {
|
||||
read_num as f64 / read_time as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
let write_num = WRITE_NUM.load(Ordering::Relaxed);
|
||||
let write_time = WRITE_SECS.load(Ordering::Relaxed);
|
||||
let write_tps = write_num as f64 / write_time as f64;
|
||||
println!(
|
||||
"\nread numbers: {}, read thrpt: {}\nwrite numbers: {}, write thrpt {}\n",
|
||||
read_num, read_tps, write_num, write_tps
|
||||
);
|
||||
let write_tps = if write_time != 0.0 {
|
||||
write_num as f64 / write_time as f64
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
if read_num != 0 || write_num != 0 {
|
||||
println!(
|
||||
"\nread numbers: {}, read thrpt: {}\nwrite numbers: {}, write thrpt {}\n",
|
||||
read_num, read_tps, write_num, write_tps
|
||||
);
|
||||
}
|
||||
}
|
||||
group.finish();
|
||||
}
|
||||
|
||||
@@ -8,9 +8,10 @@ use std::sync::{
|
||||
Arc,
|
||||
};
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::{
|
||||
prelude::ScalarVectorBuilder,
|
||||
vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder},
|
||||
vectors::{StringVectorBuilder, TimestampVectorBuilder, UInt64VectorBuilder},
|
||||
};
|
||||
use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng};
|
||||
use storage::memtable::KeyValues;
|
||||
@@ -56,11 +57,11 @@ fn kvs_with_index(
|
||||
values: &[(Option<u64>, String)],
|
||||
) -> KeyValues {
|
||||
let mut key_builders = (
|
||||
Int64VectorBuilder::with_capacity(keys.len()),
|
||||
TimestampVectorBuilder::with_capacity(keys.len()),
|
||||
UInt64VectorBuilder::with_capacity(keys.len()),
|
||||
);
|
||||
for key in keys {
|
||||
key_builders.0.push(Some(key.0));
|
||||
key_builders.0.push(Some(Timestamp::from_millis(key.0)));
|
||||
key_builders.1.push(Some(key.1));
|
||||
}
|
||||
let row_keys = vec![
|
||||
|
||||
@@ -15,6 +15,7 @@ pub const TIMESTAMP_NAME: &str = "timestamp";
|
||||
|
||||
pub fn schema_for_test() -> RegionSchemaRef {
|
||||
let desc = RegionDescBuilder::new("bench")
|
||||
.enable_version_column(true)
|
||||
.push_value_column(("v1", LogicalTypeId::UInt64, true))
|
||||
.push_value_column(("v2", LogicalTypeId::String, true))
|
||||
.build();
|
||||
|
||||
@@ -5,6 +5,7 @@ use store_api::storage::{
|
||||
};
|
||||
|
||||
use super::{schema_util::ColumnDef, TIMESTAMP_NAME};
|
||||
|
||||
pub struct RegionDescBuilder {
|
||||
name: String,
|
||||
last_column_id: ColumnId,
|
||||
@@ -15,20 +16,29 @@ pub struct RegionDescBuilder {
|
||||
impl RegionDescBuilder {
|
||||
pub fn new<T: Into<String>>(name: T) -> Self {
|
||||
let key_builder = RowKeyDescriptorBuilder::new(
|
||||
ColumnDescriptorBuilder::new(2, TIMESTAMP_NAME, ConcreteDataType::int64_datatype())
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
ColumnDescriptorBuilder::new(
|
||||
1,
|
||||
TIMESTAMP_NAME,
|
||||
ConcreteDataType::timestamp_millis_datatype(),
|
||||
)
|
||||
.is_nullable(false)
|
||||
.build()
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
Self {
|
||||
name: name.into(),
|
||||
last_column_id: 2,
|
||||
last_column_id: 1,
|
||||
key_builder,
|
||||
default_cf_builder: ColumnFamilyDescriptorBuilder::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn enable_version_column(mut self, enable: bool) -> Self {
|
||||
self.key_builder = self.key_builder.enable_version_column(enable);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn push_value_column(mut self, column_def: ColumnDef) -> Self {
|
||||
let column = self.new_column(column_def);
|
||||
self.default_cf_builder = self.default_cf_builder.push_column(column);
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
pub mod write_batch_util;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::{
|
||||
prelude::ScalarVector,
|
||||
type_id::LogicalTypeId,
|
||||
vectors::{BooleanVector, Float64Vector, Int64Vector, StringVector, UInt64Vector},
|
||||
vectors::{BooleanVector, Float64Vector, StringVector, TimestampVector, UInt64Vector},
|
||||
};
|
||||
use rand::Rng;
|
||||
use storage::{
|
||||
@@ -11,13 +13,13 @@ use storage::{
|
||||
write_batch::{PutData, WriteBatch},
|
||||
};
|
||||
use store_api::storage::{consts, PutOperation, WriteRequest};
|
||||
pub mod write_batch_util;
|
||||
|
||||
pub fn new_test_batch() -> WriteBatch {
|
||||
write_batch_util::new_write_batch(
|
||||
&[
|
||||
("k1", LogicalTypeId::UInt64, false),
|
||||
(consts::VERSION_COLUMN_NAME, LogicalTypeId::UInt64, false),
|
||||
("ts", LogicalTypeId::Int64, false),
|
||||
("ts", LogicalTypeId::Timestamp, false),
|
||||
("v1", LogicalTypeId::Boolean, true),
|
||||
("4", LogicalTypeId::Float64, false),
|
||||
("5", LogicalTypeId::Float64, false),
|
||||
@@ -30,6 +32,7 @@ pub fn new_test_batch() -> WriteBatch {
|
||||
Some(2),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn gen_new_batch_and_extras(
|
||||
putdate_nums: usize,
|
||||
) -> (WriteBatch, Vec<storage::proto::wal::MutationExtra>) {
|
||||
@@ -58,7 +61,7 @@ pub fn gen_new_batch_and_extras(
|
||||
rng.fill(&mut fvs[..]);
|
||||
let intv = Arc::new(UInt64Vector::from_slice(&intvs));
|
||||
let boolv = Arc::new(BooleanVector::from(boolvs.to_vec()));
|
||||
let tsv = Arc::new(Int64Vector::from_slice(&tsvs));
|
||||
let tsv = Arc::new(TimestampVector::from_values(tsvs));
|
||||
let fvs = Arc::new(Float64Vector::from_slice(&fvs));
|
||||
let svs = Arc::new(StringVector::from_slice(&svs));
|
||||
let mut put_data = PutData::default();
|
||||
|
||||
@@ -166,15 +166,16 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
fn validate_input_and_memtable_schemas(batch: &WriteBatch, memtables: &MemtableSet) {
|
||||
let batch_schema = batch.schema();
|
||||
for (_, memtable) in memtables.iter() {
|
||||
let memtable_schema = memtable.schema();
|
||||
let user_schema = memtable_schema.user_schema();
|
||||
debug_assert_eq!(batch_schema.version(), user_schema.version());
|
||||
// Only validate column schemas.
|
||||
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
|
||||
if cfg!(debug_assertions) {
|
||||
let batch_schema = batch.schema();
|
||||
for (_, memtable) in memtables.iter() {
|
||||
let memtable_schema = memtable.schema();
|
||||
let user_schema = memtable_schema.user_schema();
|
||||
debug_assert_eq!(batch_schema.version(), user_schema.version());
|
||||
// Only validate column schemas.
|
||||
debug_assert_eq!(batch_schema.column_schemas(), user_schema.column_schemas());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user