From 376409b8571ec56ca9a285ee4f8f29021de5949f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 1 Mar 2024 14:22:15 +0800 Subject: [PATCH] feat: employ sparse key encoding for shard lookup (#3410) * feat: employ short key encoding for shard lookup * fix: license * chore: simplify code * refactor: only enable sparse encoding to speed lookup on metric engine * fix: names --- Cargo.lock | 1 + src/mito2/Cargo.toml | 5 ++ src/mito2/benches/bench_merge_tree.rs | 21 ++++++ src/mito2/benches/merge_tree_bench.rs | 36 ++++++++++ src/mito2/src/memtable/merge_tree/data.rs | 4 +- src/mito2/src/memtable/merge_tree/merger.rs | 2 +- .../src/memtable/merge_tree/partition.rs | 39 ++++++++--- src/mito2/src/memtable/merge_tree/shard.rs | 4 +- .../src/memtable/merge_tree/shard_builder.rs | 23 +++++-- src/mito2/src/memtable/merge_tree/tree.rs | 67 +++++++++++++++++-- src/mito2/src/test_util/memtable_util.rs | 6 +- 11 files changed, 180 insertions(+), 28 deletions(-) create mode 100644 src/mito2/benches/bench_merge_tree.rs create mode 100644 src/mito2/benches/merge_tree_bench.rs diff --git a/Cargo.lock b/Cargo.lock index 0839b0595b..a8de2387ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5346,6 +5346,7 @@ dependencies = [ "common-test-util", "common-time", "common-wal", + "criterion", "dashmap", "datafusion", "datafusion-common", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 5ebe42a9da..27605bc3fc 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -73,5 +73,10 @@ uuid.workspace = true [dev-dependencies] common-procedure-test.workspace = true common-test-util.workspace = true +criterion = "0.4" log-store.workspace = true rand.workspace = true + +[[bench]] +name = "bench_merge_tree" +harness = false diff --git a/src/mito2/benches/bench_merge_tree.rs b/src/mito2/benches/bench_merge_tree.rs new file mode 100644 index 0000000000..febc648dae --- /dev/null +++ b/src/mito2/benches/bench_merge_tree.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod merge_tree_bench; + +use criterion::criterion_main; + +criterion_main! { + merge_tree_bench::benches +} diff --git a/src/mito2/benches/merge_tree_bench.rs b/src/mito2/benches/merge_tree_bench.rs new file mode 100644 index 0000000000..3ad86c2d9e --- /dev/null +++ b/src/mito2/benches/merge_tree_bench.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::{criterion_group, criterion_main, Criterion}; +use mito2::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtable}; +use mito2::memtable::Memtable; +use mito2::test_util::memtable_util; + +fn bench_merge_tree_memtable(c: &mut Criterion) { + let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); + let timestamps = (0..100).collect::>(); + + let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default()); + + let _ = c.bench_function("MergeTreeMemtable", |b| { + let kvs = + memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); + b.iter(|| { + memtable.write(&kvs).unwrap(); + }); + }); +} + +criterion_group!(benches, bench_merge_tree_memtable); +criterion_main!(benches); diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index e3c0e0a8ab..7bafdb904b 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -216,7 +216,7 @@ impl DataBuffer { } /// Writes a row to data buffer. - pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { + pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) { self.ts_builder.push_value_ref(kv.timestamp()); self.pk_index_builder.push(Some(pk_index)); self.sequence_builder.push(Some(kv.sequence())); @@ -953,7 +953,7 @@ impl DataParts { } /// Writes a row into parts. - pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) { + pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) { self.active.write_row(pk_index, kv) } diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/merge_tree/merger.rs index 5ba5f5aae4..4441eaa593 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/merge_tree/merger.rs @@ -319,7 +319,7 @@ mod tests { ); for kv in kvs.iter() { - buffer.write_row(pk_index, kv); + buffer.write_row(pk_index, &kv); } *sequence += rows; diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 4d2675917a..424fad6939 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -64,8 +64,10 @@ impl Partition { /// Writes to the partition with a primary key. pub fn write_with_key( &self, - primary_key: &[u8], + primary_key: &mut Vec, + row_codec: &McmpRowCodec, key_value: KeyValue, + re_encode: bool, metrics: &mut WriteMetrics, ) -> Result<()> { let mut inner = self.inner.write().unwrap(); @@ -76,17 +78,30 @@ impl Partition { // Finds key in shards, now we ensure one key only exists in one shard. if let Some(pk_id) = inner.find_key_in_shards(primary_key) { - // Key already in shards. - inner.write_to_shard(pk_id, key_value); + inner.write_to_shard(pk_id, &key_value); + inner.num_rows += 1; return Ok(()); } - // Write to the shard builder. - inner - .shard_builder - .write_with_key(primary_key, key_value, metrics); - inner.num_rows += 1; + // Key does not yet exist in shard or builder, encode and insert the full primary key. + if re_encode { + // `primary_key` is sparse, re-encode the full primary key. + let sparse_key = primary_key.clone(); + primary_key.clear(); + row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?; + let pk_id = inner + .shard_builder + .write_with_key(primary_key, &key_value, metrics); + inner.pk_to_pk_id.insert(sparse_key, pk_id); + } else { + // `primary_key` is already the full primary key. + let pk_id = inner + .shard_builder + .write_with_key(primary_key, &key_value, metrics); + inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id); + }; + inner.num_rows += 1; Ok(()) } @@ -102,7 +117,7 @@ impl Partition { shard_id: 0, pk_index: 0, }; - inner.shards[0].write_with_pk_id(pk_id, key_value); + inner.shards[0].write_with_pk_id(pk_id, &key_value); inner.num_rows += 1; } @@ -583,7 +598,11 @@ impl Inner { self.pk_to_pk_id.get(primary_key).copied() } - fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) { + fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) { + if pk_id.shard_id == self.shard_builder.current_shard_id() { + self.shard_builder.write_with_pk_id(pk_id, key_value); + return; + } for shard in &mut self.shards { if shard.shard_id == pk_id.shard_id { shard.write_with_pk_id(pk_id, key_value); diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index 30f0a80daa..889c05a582 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -55,7 +55,7 @@ impl Shard { } /// Writes a key value into the shard. - pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) { + pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) { debug_assert_eq!(self.shard_id, pk_id.shard_id); self.data_parts.write_row(pk_id.pk_index, key_value); @@ -417,7 +417,7 @@ mod tests { shard_id: shard.shard_id, pk_index: *pk_index, }; - shard.write_with_pk_id(pk_id, kv); + shard.write_with_pk_id(pk_id, &kv); } } assert!(!shard.is_empty()); diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 0ffaa91e0e..07fedc38dd 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -60,11 +60,26 @@ impl ShardBuilder { } } + /// Write a key value with given pk_index (caller must ensure the pk_index exist in dict_builder) + pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) { + assert_eq!(self.current_shard_id, pk_id.shard_id); + self.data_buffer.write_row(pk_id.pk_index, key_value); + } + /// Write a key value with its encoded primary key. - pub fn write_with_key(&mut self, key: &[u8], key_value: KeyValue, metrics: &mut WriteMetrics) { + pub fn write_with_key( + &mut self, + primary_key: &[u8], + key_value: &KeyValue, + metrics: &mut WriteMetrics, + ) -> PkId { // Safety: we check whether the builder need to freeze before. - let pk_index = self.dict_builder.insert_key(key, metrics); + let pk_index = self.dict_builder.insert_key(primary_key, metrics); self.data_buffer.write_row(pk_index, key_value); + PkId { + shard_id: self.current_shard_id, + pk_index, + } } /// Returns true if the builder need to freeze. @@ -261,7 +276,7 @@ mod tests { for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); - shard_builder.write_with_key(&key, kv, &mut metrics); + shard_builder.write_with_key(&key, &kv, &mut metrics); } } let shard = shard_builder @@ -283,7 +298,7 @@ mod tests { for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); - shard_builder.write_with_key(&key, kv, &mut metrics); + shard_builder.write_with_key(&key, &kv, &mut metrics); } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 252d361f3b..095010762a 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -22,12 +22,15 @@ use api::v1::OpType; use common_recordbatch::filter::SimpleFilterEvaluator; use common_time::Timestamp; use datafusion_common::ScalarValue; -use snafu::ensure; +use datatypes::prelude::ValueRef; +use memcomparable::Serializer; +use serde::Serialize; +use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; +use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::metrics::WriteMetrics; @@ -54,6 +57,7 @@ pub struct MergeTree { is_partitioned: bool, /// Manager to report size of the tree. write_buffer_manager: Option, + sparse_encoder: Arc, } impl MergeTree { @@ -69,6 +73,15 @@ impl MergeTree { .map(|c| SortField::new(c.column_schema.data_type.clone())) .collect(), ); + let sparse_encoder = SparseEncoder { + fields: metadata + .primary_key_columns() + .map(|c| FieldWithId { + field: SortField::new(c.column_schema.data_type.clone()), + column_id: c.column_id, + }) + .collect(), + }; let is_partitioned = Partition::has_multi_partitions(&metadata); MergeTree { @@ -78,6 +91,7 @@ impl MergeTree { partitions: Default::default(), is_partitioned, write_buffer_manager, + sparse_encoder: Arc::new(sparse_encoder), } } @@ -116,9 +130,15 @@ impl MergeTree { // Encode primary key. pk_buffer.clear(); - self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + if self.is_partitioned { + // Use sparse encoder for metric engine. + self.sparse_encoder + .encode_to_vec(kv.primary_keys(), pk_buffer)?; + } else { + self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?; + } - // Write rows with primary keys. + // Write rows with self.write_with_key(pk_buffer, kv, metrics)?; } @@ -252,6 +272,7 @@ impl MergeTree { partitions: RwLock::new(forked), is_partitioned: self.is_partitioned, write_buffer_manager: self.write_buffer_manager.clone(), + sparse_encoder: self.sparse_encoder.clone(), } } @@ -262,14 +283,20 @@ impl MergeTree { fn write_with_key( &self, - primary_key: &[u8], + primary_key: &mut Vec, key_value: KeyValue, metrics: &mut WriteMetrics, ) -> Result<()> { let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned); let partition = self.get_or_create_partition(partition_key); - partition.write_with_key(primary_key, key_value, metrics) + partition.write_with_key( + primary_key, + &self.row_codec, + key_value, + self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key. + metrics, + ) } fn write_no_key(&self, key_value: KeyValue) { @@ -324,6 +351,34 @@ impl MergeTree { } } +struct FieldWithId { + field: SortField, + column_id: ColumnId, +} + +struct SparseEncoder { + fields: Vec, +} + +impl SparseEncoder { + fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec) -> Result<()> + where + I: Iterator>, + { + let mut serializer = Serializer::new(buffer); + for (value, field) in row.zip(self.fields.iter()) { + if !value.is_null() { + field + .column_id + .serialize(&mut serializer) + .context(SerializeFieldSnafu)?; + field.field.serialize(&mut serializer, &value)?; + } + } + Ok(()) + } +} + #[derive(Default)] struct TreeIterMetrics { iter_elapsed: Duration, diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 002f5d23bd..4f3d9cfd3d 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -104,7 +104,7 @@ pub(crate) fn metadata_for_test() -> RegionMetadataRef { /// /// If `enable_table_id` is false, the schema is `k0, k1, ts, v0, v1`. /// If `enable_table_id` is true, the schema is `k0, __table_id, ts, v0, v1`. -pub(crate) fn metadata_with_primary_key( +pub fn metadata_with_primary_key( primary_key: Vec, enable_table_id: bool, ) -> RegionMetadataRef { @@ -158,7 +158,7 @@ fn semantic_type_of_column(column_id: ColumnId, primary_key: &[ColumnId]) -> Sem } /// Builds key values with `len` rows for test. -pub(crate) fn build_key_values( +pub fn build_key_values( schema: &RegionMetadataRef, k0: String, k1: u32, @@ -195,7 +195,7 @@ pub(crate) fn write_rows_to_buffer( ); for kv in kvs.iter() { - buffer.write_row(pk_index, kv); + buffer.write_row(pk_index, &kv); } }