mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: employ sparse key encoding for shard lookup (#3410)
* feat: employ short key encoding for shard lookup * fix: license * chore: simplify code * refactor: only enable sparse encoding to speed lookup on metric engine * fix: names
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5346,6 +5346,7 @@ dependencies = [
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"criterion",
|
||||
"dashmap",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
|
||||
@@ -73,5 +73,10 @@ uuid.workspace = true
|
||||
[dev-dependencies]
|
||||
common-procedure-test.workspace = true
|
||||
common-test-util.workspace = true
|
||||
criterion = "0.4"
|
||||
log-store.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_merge_tree"
|
||||
harness = false
|
||||
|
||||
21
src/mito2/benches/bench_merge_tree.rs
Normal file
21
src/mito2/benches/bench_merge_tree.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod merge_tree_bench;
|
||||
|
||||
use criterion::criterion_main;
|
||||
|
||||
criterion_main! {
|
||||
merge_tree_bench::benches
|
||||
}
|
||||
36
src/mito2/benches/merge_tree_bench.rs
Normal file
36
src/mito2/benches/merge_tree_bench.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use mito2::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtable};
|
||||
use mito2::memtable::Memtable;
|
||||
use mito2::test_util::memtable_util;
|
||||
|
||||
fn bench_merge_tree_memtable(c: &mut Criterion) {
|
||||
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
|
||||
let timestamps = (0..100).collect::<Vec<_>>();
|
||||
|
||||
let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());
|
||||
|
||||
let _ = c.bench_function("MergeTreeMemtable", |b| {
|
||||
let kvs =
|
||||
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
|
||||
b.iter(|| {
|
||||
memtable.write(&kvs).unwrap();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
criterion_group!(benches, bench_merge_tree_memtable);
|
||||
criterion_main!(benches);
|
||||
@@ -216,7 +216,7 @@ impl DataBuffer {
|
||||
}
|
||||
|
||||
/// Writes a row to data buffer.
|
||||
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
|
||||
pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
|
||||
self.ts_builder.push_value_ref(kv.timestamp());
|
||||
self.pk_index_builder.push(Some(pk_index));
|
||||
self.sequence_builder.push(Some(kv.sequence()));
|
||||
@@ -953,7 +953,7 @@ impl DataParts {
|
||||
}
|
||||
|
||||
/// Writes a row into parts.
|
||||
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
|
||||
pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
|
||||
self.active.write_row(pk_index, kv)
|
||||
}
|
||||
|
||||
|
||||
@@ -319,7 +319,7 @@ mod tests {
|
||||
);
|
||||
|
||||
for kv in kvs.iter() {
|
||||
buffer.write_row(pk_index, kv);
|
||||
buffer.write_row(pk_index, &kv);
|
||||
}
|
||||
|
||||
*sequence += rows;
|
||||
|
||||
@@ -64,8 +64,10 @@ impl Partition {
|
||||
/// Writes to the partition with a primary key.
|
||||
pub fn write_with_key(
|
||||
&self,
|
||||
primary_key: &[u8],
|
||||
primary_key: &mut Vec<u8>,
|
||||
row_codec: &McmpRowCodec,
|
||||
key_value: KeyValue,
|
||||
re_encode: bool,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
@@ -76,17 +78,30 @@ impl Partition {
|
||||
|
||||
// Finds key in shards, now we ensure one key only exists in one shard.
|
||||
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
|
||||
// Key already in shards.
|
||||
inner.write_to_shard(pk_id, key_value);
|
||||
inner.write_to_shard(pk_id, &key_value);
|
||||
inner.num_rows += 1;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Write to the shard builder.
|
||||
inner
|
||||
.shard_builder
|
||||
.write_with_key(primary_key, key_value, metrics);
|
||||
inner.num_rows += 1;
|
||||
// Key does not yet exist in shard or builder, encode and insert the full primary key.
|
||||
if re_encode {
|
||||
// `primary_key` is sparse, re-encode the full primary key.
|
||||
let sparse_key = primary_key.clone();
|
||||
primary_key.clear();
|
||||
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
|
||||
let pk_id = inner
|
||||
.shard_builder
|
||||
.write_with_key(primary_key, &key_value, metrics);
|
||||
inner.pk_to_pk_id.insert(sparse_key, pk_id);
|
||||
} else {
|
||||
// `primary_key` is already the full primary key.
|
||||
let pk_id = inner
|
||||
.shard_builder
|
||||
.write_with_key(primary_key, &key_value, metrics);
|
||||
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
|
||||
};
|
||||
|
||||
inner.num_rows += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -102,7 +117,7 @@ impl Partition {
|
||||
shard_id: 0,
|
||||
pk_index: 0,
|
||||
};
|
||||
inner.shards[0].write_with_pk_id(pk_id, key_value);
|
||||
inner.shards[0].write_with_pk_id(pk_id, &key_value);
|
||||
inner.num_rows += 1;
|
||||
}
|
||||
|
||||
@@ -583,7 +598,11 @@ impl Inner {
|
||||
self.pk_to_pk_id.get(primary_key).copied()
|
||||
}
|
||||
|
||||
fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) {
|
||||
fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) {
|
||||
if pk_id.shard_id == self.shard_builder.current_shard_id() {
|
||||
self.shard_builder.write_with_pk_id(pk_id, key_value);
|
||||
return;
|
||||
}
|
||||
for shard in &mut self.shards {
|
||||
if shard.shard_id == pk_id.shard_id {
|
||||
shard.write_with_pk_id(pk_id, key_value);
|
||||
|
||||
@@ -55,7 +55,7 @@ impl Shard {
|
||||
}
|
||||
|
||||
/// Writes a key value into the shard.
|
||||
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) {
|
||||
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
|
||||
debug_assert_eq!(self.shard_id, pk_id.shard_id);
|
||||
|
||||
self.data_parts.write_row(pk_id.pk_index, key_value);
|
||||
@@ -417,7 +417,7 @@ mod tests {
|
||||
shard_id: shard.shard_id,
|
||||
pk_index: *pk_index,
|
||||
};
|
||||
shard.write_with_pk_id(pk_id, kv);
|
||||
shard.write_with_pk_id(pk_id, &kv);
|
||||
}
|
||||
}
|
||||
assert!(!shard.is_empty());
|
||||
|
||||
@@ -60,11 +60,26 @@ impl ShardBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a key value with given pk_index (caller must ensure the pk_index exist in dict_builder)
|
||||
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
|
||||
assert_eq!(self.current_shard_id, pk_id.shard_id);
|
||||
self.data_buffer.write_row(pk_id.pk_index, key_value);
|
||||
}
|
||||
|
||||
/// Write a key value with its encoded primary key.
|
||||
pub fn write_with_key(&mut self, key: &[u8], key_value: KeyValue, metrics: &mut WriteMetrics) {
|
||||
pub fn write_with_key(
|
||||
&mut self,
|
||||
primary_key: &[u8],
|
||||
key_value: &KeyValue,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> PkId {
|
||||
// Safety: we check whether the builder need to freeze before.
|
||||
let pk_index = self.dict_builder.insert_key(key, metrics);
|
||||
let pk_index = self.dict_builder.insert_key(primary_key, metrics);
|
||||
self.data_buffer.write_row(pk_index, key_value);
|
||||
PkId {
|
||||
shard_id: self.current_shard_id,
|
||||
pk_index,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the builder need to freeze.
|
||||
@@ -261,7 +276,7 @@ mod tests {
|
||||
for key_values in &input {
|
||||
for kv in key_values.iter() {
|
||||
let key = encode_key_by_kv(&kv);
|
||||
shard_builder.write_with_key(&key, kv, &mut metrics);
|
||||
shard_builder.write_with_key(&key, &kv, &mut metrics);
|
||||
}
|
||||
}
|
||||
let shard = shard_builder
|
||||
@@ -283,7 +298,7 @@ mod tests {
|
||||
for key_values in &input {
|
||||
for kv in key_values.iter() {
|
||||
let key = encode_key_by_kv(&kv);
|
||||
shard_builder.write_with_key(&key, kv, &mut metrics);
|
||||
shard_builder.write_with_key(&key, &kv, &mut metrics);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,12 +22,15 @@ use api::v1::OpType;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::ScalarValue;
|
||||
use snafu::ensure;
|
||||
use datatypes::prelude::ValueRef;
|
||||
use memcomparable::Serializer;
|
||||
use serde::Serialize;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
|
||||
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::merge_tree::metrics::WriteMetrics;
|
||||
@@ -54,6 +57,7 @@ pub struct MergeTree {
|
||||
is_partitioned: bool,
|
||||
/// Manager to report size of the tree.
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
sparse_encoder: Arc<SparseEncoder>,
|
||||
}
|
||||
|
||||
impl MergeTree {
|
||||
@@ -69,6 +73,15 @@ impl MergeTree {
|
||||
.map(|c| SortField::new(c.column_schema.data_type.clone()))
|
||||
.collect(),
|
||||
);
|
||||
let sparse_encoder = SparseEncoder {
|
||||
fields: metadata
|
||||
.primary_key_columns()
|
||||
.map(|c| FieldWithId {
|
||||
field: SortField::new(c.column_schema.data_type.clone()),
|
||||
column_id: c.column_id,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
let is_partitioned = Partition::has_multi_partitions(&metadata);
|
||||
|
||||
MergeTree {
|
||||
@@ -78,6 +91,7 @@ impl MergeTree {
|
||||
partitions: Default::default(),
|
||||
is_partitioned,
|
||||
write_buffer_manager,
|
||||
sparse_encoder: Arc::new(sparse_encoder),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,9 +130,15 @@ impl MergeTree {
|
||||
|
||||
// Encode primary key.
|
||||
pk_buffer.clear();
|
||||
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
|
||||
if self.is_partitioned {
|
||||
// Use sparse encoder for metric engine.
|
||||
self.sparse_encoder
|
||||
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
|
||||
} else {
|
||||
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
|
||||
}
|
||||
|
||||
// Write rows with primary keys.
|
||||
// Write rows with
|
||||
self.write_with_key(pk_buffer, kv, metrics)?;
|
||||
}
|
||||
|
||||
@@ -252,6 +272,7 @@ impl MergeTree {
|
||||
partitions: RwLock::new(forked),
|
||||
is_partitioned: self.is_partitioned,
|
||||
write_buffer_manager: self.write_buffer_manager.clone(),
|
||||
sparse_encoder: self.sparse_encoder.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,14 +283,20 @@ impl MergeTree {
|
||||
|
||||
fn write_with_key(
|
||||
&self,
|
||||
primary_key: &[u8],
|
||||
primary_key: &mut Vec<u8>,
|
||||
key_value: KeyValue,
|
||||
metrics: &mut WriteMetrics,
|
||||
) -> Result<()> {
|
||||
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
|
||||
let partition = self.get_or_create_partition(partition_key);
|
||||
|
||||
partition.write_with_key(primary_key, key_value, metrics)
|
||||
partition.write_with_key(
|
||||
primary_key,
|
||||
&self.row_codec,
|
||||
key_value,
|
||||
self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
|
||||
metrics,
|
||||
)
|
||||
}
|
||||
|
||||
fn write_no_key(&self, key_value: KeyValue) {
|
||||
@@ -324,6 +351,34 @@ impl MergeTree {
|
||||
}
|
||||
}
|
||||
|
||||
struct FieldWithId {
|
||||
field: SortField,
|
||||
column_id: ColumnId,
|
||||
}
|
||||
|
||||
struct SparseEncoder {
|
||||
fields: Vec<FieldWithId>,
|
||||
}
|
||||
|
||||
impl SparseEncoder {
|
||||
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
|
||||
where
|
||||
I: Iterator<Item = ValueRef<'a>>,
|
||||
{
|
||||
let mut serializer = Serializer::new(buffer);
|
||||
for (value, field) in row.zip(self.fields.iter()) {
|
||||
if !value.is_null() {
|
||||
field
|
||||
.column_id
|
||||
.serialize(&mut serializer)
|
||||
.context(SerializeFieldSnafu)?;
|
||||
field.field.serialize(&mut serializer, &value)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TreeIterMetrics {
|
||||
iter_elapsed: Duration,
|
||||
|
||||
@@ -104,7 +104,7 @@ pub(crate) fn metadata_for_test() -> RegionMetadataRef {
|
||||
///
|
||||
/// If `enable_table_id` is false, the schema is `k0, k1, ts, v0, v1`.
|
||||
/// If `enable_table_id` is true, the schema is `k0, __table_id, ts, v0, v1`.
|
||||
pub(crate) fn metadata_with_primary_key(
|
||||
pub fn metadata_with_primary_key(
|
||||
primary_key: Vec<ColumnId>,
|
||||
enable_table_id: bool,
|
||||
) -> RegionMetadataRef {
|
||||
@@ -158,7 +158,7 @@ fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> Sem
|
||||
}
|
||||
|
||||
/// Builds key values with `len` rows for test.
|
||||
pub(crate) fn build_key_values(
|
||||
pub fn build_key_values(
|
||||
schema: &RegionMetadataRef,
|
||||
k0: String,
|
||||
k1: u32,
|
||||
@@ -195,7 +195,7 @@ pub(crate) fn write_rows_to_buffer(
|
||||
);
|
||||
|
||||
for kv in kvs.iter() {
|
||||
buffer.write_row(pk_index, kv);
|
||||
buffer.write_row(pk_index, &kv);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user