refactor!: Renames the new memtable to PartitionTreeMemtable (#3547)

* refactor: rename mod merge_tree to partition_tree

* refactor: rename merge_tree

* refactor: change merge tree comment

* refactor: rename merge tree struct

* refactor: memtable options
This commit is contained in:
Yingwen
2024-03-20 14:40:41 +08:00
committed by GitHub
parent bbcdb28b7c
commit 39b69f1e3b
25 changed files with 198 additions and 191 deletions

View File

@@ -140,9 +140,9 @@ intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "partition_tree": partition tree memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
type = "partition_tree"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.

View File

@@ -246,9 +246,9 @@ intermediate_path = ""
[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "partition_tree": partition tree memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
type = "partition_tree"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.

View File

@@ -45,5 +45,5 @@ pub fn set_index_options_for_data_region(options: &mut HashMap<String, String>)
/// Set memtable options for the data region.
pub fn set_memtable_options_for_data_region(options: &mut HashMap<String, String>) {
options.insert("memtable.type".to_string(), "experimental".to_string());
options.insert("memtable.type".to_string(), "partition_tree".to_string());
}

View File

@@ -21,7 +21,7 @@ use datafusion_common::Column;
use datafusion_expr::{lit, Expr};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtable};
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{KeyValues, Memtable};
use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema};
@@ -41,9 +41,9 @@ fn write_rows(c: &mut Criterion) {
// Note that this test only generate one time series.
let mut group = c.benchmark_group("write");
group.bench_function("merge_tree", |b| {
group.bench_function("partition_tree", |b| {
let memtable =
MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
@@ -63,14 +63,14 @@ fn write_rows(c: &mut Criterion) {
/// Scans all rows.
fn full_scan(c: &mut Criterion) {
let metadata = Arc::new(cpu_metadata());
let config = MergeTreeConfig::default();
let config = PartitionTreeConfig::default();
let start_sec = 1710043200;
let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2);
let mut group = c.benchmark_group("full_scan");
group.sample_size(10);
group.bench_function("merge_tree", |b| {
let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &config);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
@@ -100,14 +100,14 @@ fn full_scan(c: &mut Criterion) {
/// Filters 1 host.
fn filter_1_host(c: &mut Criterion) {
let metadata = Arc::new(cpu_metadata());
let config = MergeTreeConfig::default();
let config = PartitionTreeConfig::default();
let start_sec = 1710043200;
let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2);
let mut group = c.benchmark_group("filter_1_host");
group.sample_size(10);
group.bench_function("merge_tree", |b| {
let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &config);
group.bench_function("partition_tree", |b| {
let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}

View File

@@ -328,14 +328,14 @@ mod tests {
fn test_deserialize_config() {
let s = r#"
[memtable]
type = "experimental"
type = "partition_tree"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MitoConfig = toml::from_str(s).unwrap();
let MemtableConfig::Experimental(config) = &config.memtable else {
let MemtableConfig::PartitionTree(config) = &config.memtable else {
unreachable!()
};
assert_eq!(1024, config.data_freeze_threshold);

View File

@@ -209,8 +209,8 @@ async fn test_engine_create_with_memtable_opts() {
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("memtable.type", "experimental")
.insert_option("memtable.experimental.index_max_keys_per_shard", "2")
.insert_option("memtable.type", "partition_tree")
.insert_option("memtable.partition_tree.index_max_keys_per_shard", "2")
.build();
let column_schemas = rows_schema(&request);
engine
@@ -218,7 +218,7 @@ async fn test_engine_create_with_memtable_opts() {
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let Some(MemtableOptions::Experimental(memtable_opts)) = &region.version().options.memtable
let Some(MemtableOptions::PartitionTree(memtable_opts)) = &region.version().options.memtable
else {
unreachable!();
};

View File

@@ -28,14 +28,14 @@ use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtableBuilder};
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::region::options::MemtableOptions;
pub mod key_values;
pub mod merge_tree;
pub mod partition_tree;
pub mod time_partition;
pub mod time_series;
pub(crate) mod version;
@@ -49,7 +49,7 @@ pub type MemtableId = u32;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum MemtableConfig {
Experimental(MergeTreeConfig),
PartitionTree(PartitionTreeConfig),
TimeSeries,
}
@@ -234,15 +234,17 @@ impl MemtableBuilderProvider {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
)),
Some(MemtableOptions::Experimental(opts)) => Arc::new(MergeTreeMemtableBuilder::new(
MergeTreeConfig {
index_max_keys_per_shard: opts.index_max_keys_per_shard,
data_freeze_threshold: opts.data_freeze_threshold,
fork_dictionary_bytes: opts.fork_dictionary_bytes,
..Default::default()
},
self.write_buffer_manager.clone(),
)),
Some(MemtableOptions::PartitionTree(opts)) => {
Arc::new(PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: opts.index_max_keys_per_shard,
data_freeze_threshold: opts.data_freeze_threshold,
fork_dictionary_bytes: opts.fork_dictionary_bytes,
..Default::default()
},
self.write_buffer_manager.clone(),
))
}
None => self.default_memtable_builder.clone(),
}
}
@@ -258,20 +260,20 @@ mod tests {
#[test]
fn test_deserialize_memtable_config() {
let s = r#"
type = "experimental"
type = "partition_tree"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MemtableConfig = toml::from_str(s).unwrap();
let MemtableConfig::Experimental(merge_tree) = config else {
let MemtableConfig::PartitionTree(memtable_config) = config else {
unreachable!()
};
assert!(merge_tree.dedup);
assert_eq!(8192, merge_tree.index_max_keys_per_shard);
assert_eq!(1024, merge_tree.data_freeze_threshold);
assert_eq!(ReadableSize::mb(512), merge_tree.fork_dictionary_bytes);
assert!(memtable_config.dedup);
assert_eq!(8192, memtable_config.index_max_keys_per_shard);
assert_eq!(1024, memtable_config.data_freeze_threshold);
assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes);
}
#[test]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Memtable implementation based on a merge tree.
//! Memtable implementation based on a partition tree.
pub(crate) mod data;
mod dedup;
@@ -37,8 +37,8 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::tree::MergeTree;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRef, MemtableStats,
@@ -61,13 +61,13 @@ struct PkId {
pk_index: PkIndex,
}
// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple merge
// tree memtable then we will use a lot memory. We should find a better way to control the
// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree
// memtable then we will use a lot memory. We should find a better way to control the
// dictionary size.
/// Config for the merge tree memtable.
/// Config for the partition tree memtable.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct MergeTreeConfig {
pub struct PartitionTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
@@ -82,7 +82,7 @@ pub struct MergeTreeConfig {
pub fork_dictionary_bytes: ReadableSize,
}
impl Default for MergeTreeConfig {
impl Default for PartitionTreeConfig {
fn default() -> Self {
let mut fork_dictionary_bytes = ReadableSize::gb(1);
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
@@ -102,24 +102,24 @@ impl Default for MergeTreeConfig {
}
}
/// Memtable based on a merge tree.
pub struct MergeTreeMemtable {
/// Memtable based on a partition tree.
pub struct PartitionTreeMemtable {
id: MemtableId,
tree: MergeTree,
tree: PartitionTree,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
}
impl fmt::Debug for MergeTreeMemtable {
impl fmt::Debug for PartitionTreeMemtable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MergeTreeMemtable")
f.debug_struct("PartitionTreeMemtable")
.field("id", &self.id)
.finish()
}
}
impl Memtable for MergeTreeMemtable {
impl Memtable for PartitionTreeMemtable {
fn id(&self) -> MemtableId {
self.id
}
@@ -197,29 +197,29 @@ impl Memtable for MergeTreeMemtable {
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
let tree = self.tree.fork(metadata.clone());
let memtable = MergeTreeMemtable::with_tree(id, tree);
let memtable = PartitionTreeMemtable::with_tree(id, tree);
Arc::new(memtable)
}
}
impl MergeTreeMemtable {
impl PartitionTreeMemtable {
/// Returns a new memtable.
pub fn new(
id: MemtableId,
metadata: RegionMetadataRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
config: &MergeTreeConfig,
config: &PartitionTreeConfig,
) -> Self {
Self::with_tree(
id,
MergeTree::new(metadata, config, write_buffer_manager.clone()),
PartitionTree::new(metadata, config, write_buffer_manager.clone()),
)
}
/// Creates a mutable memtable from the tree.
///
/// It also adds the bytes used by shared parts (e.g. index) to the memory usage.
fn with_tree(id: MemtableId, tree: MergeTree) -> Self {
fn with_tree(id: MemtableId, tree: PartitionTree) -> Self {
let alloc_tracker = AllocTracker::new(tree.write_buffer_manager());
Self {
@@ -278,17 +278,17 @@ impl MergeTreeMemtable {
}
}
/// Builder to build a [MergeTreeMemtable].
/// Builder to build a [PartitionTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
config: MergeTreeConfig,
pub struct PartitionTreeMemtableBuilder {
config: PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
}
impl MergeTreeMemtableBuilder {
impl PartitionTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(
config: MergeTreeConfig,
config: PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
@@ -298,9 +298,9 @@ impl MergeTreeMemtableBuilder {
}
}
impl MemtableBuilder for MergeTreeMemtableBuilder {
impl MemtableBuilder for PartitionTreeMemtableBuilder {
fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(MergeTreeMemtable::new(
Arc::new(PartitionTreeMemtable::new(
id,
metadata.clone(),
self.write_buffer_manager.clone(),
@@ -335,7 +335,8 @@ mod tests {
let timestamps = (0..100).collect::<Vec<_>>();
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
let memtable =
PartitionTreeMemtable::new(1, metadata, None, &PartitionTreeConfig::default());
memtable.write(&kvs).unwrap();
let expected_ts = kvs
@@ -371,7 +372,7 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
let memtable =
MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());
PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default());
let kvs = memtable_util::build_key_values(
&metadata,
@@ -430,8 +431,8 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable =
MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata);
let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None)
.build(1, &metadata);
let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
@@ -466,11 +467,11 @@ mod tests {
fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let memtable = MergeTreeMemtable::new(
let memtable = PartitionTreeMemtable::new(
1,
metadata.clone(),
None,
&MergeTreeConfig {
&PartitionTreeConfig {
index_max_keys_per_shard: max_keys,
data_freeze_threshold: freeze_threshold,
..Default::default()
@@ -515,8 +516,8 @@ mod tests {
fn test_memtable_filter() {
let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false);
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(
MergeTreeConfig {
let memtable = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
@@ -551,14 +552,14 @@ mod tests {
#[test]
fn test_deserialize_config() {
let config = MergeTreeConfig {
let config = PartitionTreeConfig {
dedup: false,
..Default::default()
};
// Creates a json with dedup = false.
let json = serde_json::to_string(&config).unwrap();
let config: MergeTreeConfig = serde_json::from_str(&json).unwrap();
let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap();
assert!(config.dedup);
assert_eq!(MergeTreeConfig::default(), config);
assert_eq!(PartitionTreeConfig::default(), config);
}
}

View File

@@ -45,9 +45,11 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
use crate::error;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::merge_tree::PkIndex;
use crate::metrics::{MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, MERGE_TREE_READ_STAGE_ELAPSED};
use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::partition_tree::PkIndex;
use crate::metrics::{
PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
};
const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
@@ -255,7 +257,7 @@ impl DataBuffer {
/// Builds a lazily initialized data buffer reader from [DataBuffer]
pub fn read(&self) -> Result<DataBufferReaderBuilder> {
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_buffer"])
.start_timer();
@@ -523,7 +525,7 @@ pub(crate) struct DataBufferReader {
impl Drop for DataBufferReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_buffer"])
.observe(self.elapsed_time.as_secs_f64())
}
@@ -780,7 +782,7 @@ impl<'a> DataPartEncoder<'a> {
let mut bytes = Vec::with_capacity(1024);
let rb = {
let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
.with_label_values(&["drain_data_buffer_to_batch"])
.start_timer();
drain_data_buffer_to_record_batches(
@@ -793,7 +795,7 @@ impl<'a> DataPartEncoder<'a> {
};
{
let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
.with_label_values(&["encode"])
.start_timer();
let mut writer =
@@ -837,7 +839,7 @@ pub struct DataPartReader {
impl Drop for DataPartReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_part"])
.observe(self.elapsed.as_secs_f64());
}
@@ -973,7 +975,7 @@ impl DataParts {
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
pub fn read(&self) -> Result<DataPartsReaderBuilder> {
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["build_data_parts_reader"])
.start_timer();
@@ -1030,7 +1032,7 @@ pub struct DataPartsReader {
impl Drop for DataPartsReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_parts"])
.observe(self.elapsed.as_secs_f64())
}

View File

@@ -15,9 +15,9 @@
use std::ops::Range;
use crate::error::Result;
use crate::memtable::merge_tree::data::DataBatch;
use crate::memtable::merge_tree::shard::DataBatchSource;
use crate::memtable::merge_tree::PkId;
use crate::memtable::partition_tree::data::DataBatch;
use crate::memtable::partition_tree::shard::DataBatchSource;
use crate::memtable::partition_tree::PkId;
/// A reader that dedup sorted batches from a merger.
pub struct DedupReader<T> {
@@ -112,7 +112,7 @@ mod tests {
use store_api::metadata::RegionMetadataRef;
use super::*;
use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader};
use crate::memtable::partition_tree::data::{DataBuffer, DataParts, DataPartsReader};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};

View File

@@ -19,8 +19,8 @@ use std::sync::Arc;
use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder};
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::PkIndex;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::PkIndex;
use crate::metrics::MEMTABLE_DICT_BYTES;
/// Maximum keys in a [DictBlock].

View File

@@ -18,8 +18,8 @@ use std::fmt::Debug;
use std::ops::Range;
use crate::error::Result;
use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader};
use crate::memtable::merge_tree::PkIndex;
use crate::memtable::partition_tree::data::{DataBatch, DataBufferReader, DataPartReader};
use crate::memtable::partition_tree::PkIndex;
/// Nodes of merger's heap.
pub trait Node: Ord {
@@ -297,7 +297,7 @@ mod tests {
use store_api::metadata::RegionMetadataRef;
use super::*;
use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBuffer};
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBuffer};
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
fn write_rows_to_buffer(

View File

@@ -14,7 +14,7 @@
//! Internal metrics of the memtable.
/// Metrics of writing the merge tree.
/// Metrics of writing the partition tree.
pub struct WriteMetrics {
/// Size allocated by keys.
pub key_bytes: usize,

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Partition of a merge tree.
//! Partition of a partition tree.
//!
//! We only support partitioning the tree by pre-defined internal columns.
@@ -28,15 +28,15 @@ use store_api::storage::ColumnId;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::merge_tree::dedup::DedupReader;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::shard::{
use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP};
use crate::memtable::partition_tree::dedup::DedupReader;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::shard::{
BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource,
};
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
use crate::memtable::partition_tree::shard_builder::ShardBuilder;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{McmpRowCodec, RowCodec};
@@ -54,7 +54,7 @@ pub type PartitionRef = Arc<Partition>;
impl Partition {
/// Creates a new partition.
pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
Partition {
inner: RwLock::new(Inner::new(metadata, config)),
dedup: config.dedup,
@@ -193,7 +193,7 @@ impl Partition {
/// Forks the partition.
///
/// Must freeze the partition before fork.
pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition {
pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition {
let (shards, shard_builder) = {
let inner = self.inner.read().unwrap();
debug_assert!(inner.shard_builder.is_empty());
@@ -437,11 +437,11 @@ pub(crate) struct ReadPartitionContext {
impl Drop for ReadPartitionContext {
fn drop(&mut self) {
let partition_read_source = self.metrics.read_source.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_read_source"])
.observe(partition_read_source);
let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_data_batch_to_batch"])
.observe(partition_data_batch_to_batch);
@@ -558,7 +558,7 @@ struct Inner {
}
impl Inner {
fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self {
let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
(

View File

@@ -21,15 +21,15 @@ use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{
use crate::memtable::partition_tree::data::{
DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP,
};
use crate::memtable::merge_tree::dict::KeyDictRef;
use crate::memtable::merge_tree::merger::{Merger, Node};
use crate::memtable::merge_tree::partition::PrimaryKeyFilter;
use crate::memtable::merge_tree::shard_builder::ShardBuilderReader;
use crate::memtable::merge_tree::{PkId, PkIndex, ShardId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
use crate::memtable::partition_tree::dict::KeyDictRef;
use crate::memtable::partition_tree::merger::{Merger, Node};
use crate::memtable::partition_tree::partition::PrimaryKeyFilter;
use crate::memtable::partition_tree::shard_builder::ShardBuilderReader;
use crate::memtable::partition_tree::{PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
/// Shard stores data related to the same key dictionary.
pub struct Shard {
@@ -257,7 +257,7 @@ impl ShardReader {
impl Drop for ShardReader {
fn drop(&mut self) {
let shard_prune_pk = self.prune_pk_cost.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["shard_prune_pk"])
.observe(shard_prune_pk);
if self.keys_before_pruning > 0 {
@@ -427,10 +427,10 @@ mod tests {
use std::sync::Arc;
use super::*;
use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice;
use crate::memtable::merge_tree::dict::KeyDictBuilder;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::PkIndex;
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::memtable::partition_tree::dict::KeyDictBuilder;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::PkIndex;
use crate::memtable::KeyValues;
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, encode_keys, metadata_for_test,

View File

@@ -22,15 +22,15 @@ use store_api::metadata::RegionMetadataRef;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{
use crate::memtable::partition_tree::data::{
DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP,
};
use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder};
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::partition::PrimaryKeyFilter;
use crate::memtable::merge_tree::shard::Shard;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex, ShardId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder};
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::partition::PrimaryKeyFilter;
use crate::memtable::partition_tree::shard::Shard;
use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId};
use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED;
/// Builder to write keys and data to a shard that the key dictionary
/// is still active.
@@ -50,7 +50,7 @@ impl ShardBuilder {
/// Returns a new builder.
pub fn new(
metadata: RegionMetadataRef,
config: &MergeTreeConfig,
config: &PartitionTreeConfig,
shard_id: ShardId,
) -> ShardBuilder {
ShardBuilder {
@@ -150,14 +150,14 @@ impl ShardBuilder {
/// Scans the shard builder.
pub fn read(&self, pk_weights_buffer: &mut Vec<u16>) -> Result<ShardBuilderReaderBuilder> {
let dict_reader = {
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["shard_builder_read_pk"])
.start_timer();
self.dict_builder.read()
};
{
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["sort_pk"])
.start_timer();
dict_reader.pk_weights_to_sort_data(pk_weights_buffer);
@@ -296,7 +296,7 @@ impl ShardBuilderReader {
impl Drop for ShardBuilderReader {
fn drop(&mut self) {
let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64();
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["shard_builder_prune_pk"])
.observe(shard_builder_prune_pk);
if self.keys_before_pruning > 0 {
@@ -315,8 +315,8 @@ impl Drop for ShardBuilderReader {
mod tests {
use super::*;
use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice;
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::KeyValues;
use crate::test_util::memtable_util::{
build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test,
@@ -355,7 +355,7 @@ mod tests {
fn test_write_shard_builder() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let config = MergeTreeConfig::default();
let config = PartitionTreeConfig::default();
let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
let mut metrics = WriteMetrics::default();
assert!(shard_builder
@@ -382,7 +382,7 @@ mod tests {
fn test_write_read_shard_builder() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let config = MergeTreeConfig::default();
let config = PartitionTreeConfig::default();
let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1);
let mut metrics = WriteMetrics::default();

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementation of the merge tree.
//! Implementation of the partition tree.
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::sync::{Arc, RwLock};
@@ -33,20 +33,20 @@ use table::predicate::Predicate;
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
use crate::memtable::merge_tree::partition::{
use crate::memtable::partition_tree::metrics::WriteMetrics;
use crate::memtable::partition_tree::partition::{
Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext,
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::partition_tree::PartitionTreeConfig;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// The merge tree.
pub struct MergeTree {
/// The partition tree.
pub struct PartitionTree {
/// Config of the tree.
config: MergeTreeConfig,
config: PartitionTreeConfig,
/// Metadata of the region.
pub(crate) metadata: RegionMetadataRef,
/// Primary key codec.
@@ -60,13 +60,13 @@ pub struct MergeTree {
sparse_encoder: Arc<SparseEncoder>,
}
impl MergeTree {
/// Creates a new merge tree.
impl PartitionTree {
/// Creates a new partition tree.
pub fn new(
metadata: RegionMetadataRef,
config: &MergeTreeConfig,
config: &PartitionTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> MergeTree {
) -> PartitionTree {
let row_codec = McmpRowCodec::new(
metadata
.primary_key_columns()
@@ -84,7 +84,7 @@ impl MergeTree {
};
let is_partitioned = Partition::has_multi_partitions(&metadata);
MergeTree {
PartitionTree {
config: config.clone(),
metadata,
row_codec: Arc::new(row_codec),
@@ -260,12 +260,12 @@ impl MergeTree {
/// Forks an immutable tree. Returns a mutable tree that inherits the index
/// of this tree.
pub fn fork(&self, metadata: RegionMetadataRef) -> MergeTree {
pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree {
if self.metadata.schema_version != metadata.schema_version
|| self.metadata.column_metadatas != metadata.column_metadatas
{
// The schema has changed, we can't reuse the tree.
return MergeTree::new(metadata, &self.config, self.write_buffer_manager.clone());
return PartitionTree::new(metadata, &self.config, self.write_buffer_manager.clone());
}
let mut total_shared_size = 0;
@@ -313,7 +313,7 @@ impl MergeTree {
forked.insert(part_key, Arc::new(forked_part));
}
MergeTree {
PartitionTree {
config: self.config.clone(),
metadata,
row_codec: self.row_codec.clone(),
@@ -446,9 +446,9 @@ struct TreeIter {
impl Drop for TreeIter {
fn drop(&mut self) {
READ_ROWS_TOTAL
.with_label_values(&["merge_tree_memtable"])
.with_label_values(&["partition_tree_memtable"])
.inc_by(self.metrics.rows_fetched as u64);
MERGE_TREE_READ_STAGE_ELAPSED
PARTITION_TREE_READ_STAGE_ELAPSED
.with_label_values(&["fetch_next_partition"])
.observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64();

View File

@@ -415,13 +415,13 @@ struct PartitionToWrite<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
#[test]
fn test_no_duration() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder, 0, None);
assert_eq!(1, partitions.num_partitions());
assert!(partitions.is_empty());
@@ -449,7 +449,7 @@ mod tests {
#[test]
fn test_write_single_part() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10)));
assert_eq!(0, partitions.num_partitions());
@@ -496,7 +496,7 @@ mod tests {
#[test]
fn test_write_multi_parts() {
let metadata = memtable_util::metadata_for_test();
let builder = Arc::new(MergeTreeMemtableBuilder::default());
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions =
TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5)));
assert_eq!(0, partitions.num_partitions());

View File

@@ -278,23 +278,23 @@ lazy_static! {
.with_label_values(&["flush", "intermediate"]);
// ------- End of index metrics.
/// Merge tree memtable data buffer freeze metrics
pub static ref MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_merge_tree_buffer_freeze_stage_elapsed",
"mito merge tree data buffer freeze stage elapsed",
/// Partition tree memtable data buffer freeze metrics
pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_partition_tree_buffer_freeze_stage_elapsed",
"mito partition tree data buffer freeze stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
)
.unwrap();
/// Merge tree memtable read path metrics
pub static ref MERGE_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_merge_tree_read_stage_elapsed",
"mito merge tree read stage elapsed",
/// Partition tree memtable read path metrics
pub static ref PARTITION_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_partition_tree_read_stage_elapsed",
"mito partition tree read stage elapsed",
&[STAGE_LABEL],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
)
.unwrap();
// ------- End of merge tree memtable metrics.
// ------- End of partition tree memtable metrics.
}

View File

@@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt};
use store_api::storage::ColumnId;
use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
use crate::memtable::merge_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
@@ -225,17 +225,17 @@ impl Default for InvertedIndexOptions {
#[serde(tag = "memtable.type", rename_all = "snake_case")]
pub enum MemtableOptions {
TimeSeries,
#[serde(with = "prefix_experimental")]
Experimental(ExperimentalOptions),
#[serde(with = "prefix_partition_tree")]
PartitionTree(PartitionTreeOptions),
}
with_prefix!(prefix_experimental "memtable.experimental.");
with_prefix!(prefix_partition_tree "memtable.partition_tree.");
/// Experimental memtable options.
/// Partition tree memtable options.
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(default)]
pub struct ExperimentalOptions {
pub struct PartitionTreeOptions {
/// Max keys in an index shard.
#[serde_as(as = "DisplayFromStr")]
pub index_max_keys_per_shard: usize,
@@ -246,7 +246,7 @@ pub struct ExperimentalOptions {
pub fork_dictionary_bytes: ReadableSize,
}
impl Default for ExperimentalOptions {
impl Default for PartitionTreeOptions {
fn default() -> Self {
Self {
index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD,
@@ -442,10 +442,12 @@ mod tests {
};
assert_eq!(expect, options);
let map = make_map(&[("memtable.type", "experimental")]);
let map = make_map(&[("memtable.type", "partition_tree")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
memtable: Some(MemtableOptions::Experimental(ExperimentalOptions::default())),
memtable: Some(MemtableOptions::PartitionTree(
PartitionTreeOptions::default(),
)),
..Default::default()
};
assert_eq!(expect, options);
@@ -476,10 +478,10 @@ mod tests {
WAL_OPTIONS_KEY,
&serde_json::to_string(&wal_options).unwrap(),
),
("memtable.type", "experimental"),
("memtable.experimental.index_max_keys_per_shard", "2048"),
("memtable.experimental.data_freeze_threshold", "2048"),
("memtable.experimental.fork_dictionary_bytes", "128M"),
("memtable.type", "partition_tree"),
("memtable.partition_tree.index_max_keys_per_shard", "2048"),
("memtable.partition_tree.data_freeze_threshold", "2048"),
("memtable.partition_tree.fork_dictionary_bytes", "128M"),
]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
@@ -497,7 +499,7 @@ mod tests {
segment_row_count: 512,
},
},
memtable: Some(MemtableOptions::Experimental(ExperimentalOptions {
memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions {
index_max_keys_per_shard: 2048,
data_freeze_threshold: 2048,
fork_dictionary_bytes: ReadableSize::mb(128),

View File

@@ -30,7 +30,7 @@ use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef,
MemtableStats,

View File

@@ -48,7 +48,7 @@ use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::{MemtableBuilderProvider, MemtableConfig};
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
@@ -340,8 +340,8 @@ impl<S: LogStore> WorkerStarter<S> {
let running = Arc::new(AtomicBool::new(true));
let default_memtable_builder = match &self.config.memtable {
MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new(
merge_tree.clone(),
MemtableConfig::PartitionTree(config) => Arc::new(PartitionTreeMemtableBuilder::new(
config.clone(),
Some(self.write_buffer_manager.clone()),
)) as _,
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some(

View File

@@ -30,9 +30,9 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
"index.inverted_index.segment_row_count",
WAL_OPTIONS_KEY,
"memtable.type",
"memtable.experimental.index_max_keys_per_shard",
"memtable.experimental.data_freeze_threshold",
"memtable.experimental.fork_dictionary_bytes",
"memtable.partition_tree.index_max_keys_per_shard",
"memtable.partition_tree.data_freeze_threshold",
"memtable.partition_tree.fork_dictionary_bytes",
]
.contains(&key)
}
@@ -62,13 +62,13 @@ mod tests {
assert!(is_mito_engine_option_key("wal_options"));
assert!(is_mito_engine_option_key("memtable.type"));
assert!(is_mito_engine_option_key(
"memtable.experimental.index_max_keys_per_shard"
"memtable.partition_tree.index_max_keys_per_shard"
));
assert!(is_mito_engine_option_key(
"memtable.experimental.data_freeze_threshold"
"memtable.partition_tree.data_freeze_threshold"
));
assert!(is_mito_engine_option_key(
"memtable.experimental.fork_dictionary_bytes"
"memtable.partition_tree.fork_dictionary_bytes"
));
assert!(!is_mito_engine_option_key("foo"));
}

View File

@@ -74,7 +74,7 @@ with(
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
'wal_options'='{"wal.provider":"raft_engine"}',
'memtable.type' = 'experimental',
'memtable.type' = 'partition_tree',
);
Affected Rows: 0

View File

@@ -64,7 +64,7 @@ with(
'index.inverted_index.ignore_column_ids'='1,2,3',
'index.inverted_index.segment_row_count'='512',
'wal_options'='{"wal.provider":"raft_engine"}',
'memtable.type' = 'experimental',
'memtable.type' = 'partition_tree',
);
drop table test_mito_options;