diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 41f939d158..2b6eb58354 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -140,9 +140,9 @@ intermediate_path = "" [region_engine.mito.memtable] # Memtable type. -# - "experimental": experimental memtable +# - "partition_tree": partition tree memtable # - "time_series": time-series memtable (deprecated) -type = "experimental" +type = "partition_tree" # The max number of keys in one shard. index_max_keys_per_shard = 8192 # The max rows of data inside the actively writing buffer in one shard. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 4483f481b8..f654241e17 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -246,9 +246,9 @@ intermediate_path = "" [region_engine.mito.memtable] # Memtable type. -# - "experimental": experimental memtable +# - "partition_tree": partition tree memtable # - "time_series": time-series memtable (deprecated) -type = "experimental" +type = "partition_tree" # The max number of keys in one shard. index_max_keys_per_shard = 8192 # The max rows of data inside the actively writing buffer in one shard. diff --git a/src/metric-engine/src/engine/options.rs b/src/metric-engine/src/engine/options.rs index 034caac6d1..56981329db 100644 --- a/src/metric-engine/src/engine/options.rs +++ b/src/metric-engine/src/engine/options.rs @@ -45,5 +45,5 @@ pub fn set_index_options_for_data_region(options: &mut HashMap) /// Set memtable options for the data region. pub fn set_memtable_options_for_data_region(options: &mut HashMap) { - options.insert("memtable.type".to_string(), "experimental".to_string()); + options.insert("memtable.type".to_string(), "partition_tree".to_string()); } diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index e20c44b424..4c2c127e24 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -21,7 +21,7 @@ use datafusion_common::Column; use datafusion_expr::{lit, Expr}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; -use mito2::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtable}; +use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; use mito2::memtable::{KeyValues, Memtable}; use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; @@ -41,9 +41,9 @@ fn write_rows(c: &mut Criterion) { // Note that this test only generate one time series. let mut group = c.benchmark_group("write"); - group.bench_function("merge_tree", |b| { + group.bench_function("partition_tree", |b| { let memtable = - MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default()); + PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default()); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); b.iter(|| { @@ -63,14 +63,14 @@ fn write_rows(c: &mut Criterion) { /// Scans all rows. fn full_scan(c: &mut Criterion) { let metadata = Arc::new(cpu_metadata()); - let config = MergeTreeConfig::default(); + let config = PartitionTreeConfig::default(); let start_sec = 1710043200; let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2); let mut group = c.benchmark_group("full_scan"); group.sample_size(10); - group.bench_function("merge_tree", |b| { - let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &config); + group.bench_function("partition_tree", |b| { + let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } @@ -100,14 +100,14 @@ fn full_scan(c: &mut Criterion) { /// Filters 1 host. fn filter_1_host(c: &mut Criterion) { let metadata = Arc::new(cpu_metadata()); - let config = MergeTreeConfig::default(); + let config = PartitionTreeConfig::default(); let start_sec = 1710043200; let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2); let mut group = c.benchmark_group("filter_1_host"); group.sample_size(10); - group.bench_function("merge_tree", |b| { - let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &config); + group.bench_function("partition_tree", |b| { + let memtable = PartitionTreeMemtable::new(1, metadata.clone(), None, &config); for kvs in generator.iter() { memtable.write(&kvs).unwrap(); } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index ed0b1b2145..60d079c5ab 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -328,14 +328,14 @@ mod tests { fn test_deserialize_config() { let s = r#" [memtable] -type = "experimental" +type = "partition_tree" index_max_keys_per_shard = 8192 data_freeze_threshold = 1024 dedup = true fork_dictionary_bytes = "512MiB" "#; let config: MitoConfig = toml::from_str(s).unwrap(); - let MemtableConfig::Experimental(config) = &config.memtable else { + let MemtableConfig::PartitionTree(config) = &config.memtable else { unreachable!() }; assert_eq!(1024, config.data_freeze_threshold); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 5e2d804123..602eea30bd 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -209,8 +209,8 @@ async fn test_engine_create_with_memtable_opts() { let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() - .insert_option("memtable.type", "experimental") - .insert_option("memtable.experimental.index_max_keys_per_shard", "2") + .insert_option("memtable.type", "partition_tree") + .insert_option("memtable.partition_tree.index_max_keys_per_shard", "2") .build(); let column_schemas = rows_schema(&request); engine @@ -218,7 +218,7 @@ async fn test_engine_create_with_memtable_opts() { .await .unwrap(); let region = engine.get_region(region_id).unwrap(); - let Some(MemtableOptions::Experimental(memtable_opts)) = ®ion.version().options.memtable + let Some(MemtableOptions::PartitionTree(memtable_opts)) = ®ion.version().options.memtable else { unreachable!(); }; diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 9ec4231d7c..d92857dc97 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -28,14 +28,14 @@ use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; pub use crate::memtable::key_values::KeyValues; -use crate::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtableBuilder}; +use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; use crate::region::options::MemtableOptions; pub mod key_values; -pub mod merge_tree; +pub mod partition_tree; pub mod time_partition; pub mod time_series; pub(crate) mod version; @@ -49,7 +49,7 @@ pub type MemtableId = u32; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "type", rename_all = "snake_case")] pub enum MemtableConfig { - Experimental(MergeTreeConfig), + PartitionTree(PartitionTreeConfig), TimeSeries, } @@ -234,15 +234,17 @@ impl MemtableBuilderProvider { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), )), - Some(MemtableOptions::Experimental(opts)) => Arc::new(MergeTreeMemtableBuilder::new( - MergeTreeConfig { - index_max_keys_per_shard: opts.index_max_keys_per_shard, - data_freeze_threshold: opts.data_freeze_threshold, - fork_dictionary_bytes: opts.fork_dictionary_bytes, - ..Default::default() - }, - self.write_buffer_manager.clone(), - )), + Some(MemtableOptions::PartitionTree(opts)) => { + Arc::new(PartitionTreeMemtableBuilder::new( + PartitionTreeConfig { + index_max_keys_per_shard: opts.index_max_keys_per_shard, + data_freeze_threshold: opts.data_freeze_threshold, + fork_dictionary_bytes: opts.fork_dictionary_bytes, + ..Default::default() + }, + self.write_buffer_manager.clone(), + )) + } None => self.default_memtable_builder.clone(), } } @@ -258,20 +260,20 @@ mod tests { #[test] fn test_deserialize_memtable_config() { let s = r#" -type = "experimental" +type = "partition_tree" index_max_keys_per_shard = 8192 data_freeze_threshold = 1024 dedup = true fork_dictionary_bytes = "512MiB" "#; let config: MemtableConfig = toml::from_str(s).unwrap(); - let MemtableConfig::Experimental(merge_tree) = config else { + let MemtableConfig::PartitionTree(memtable_config) = config else { unreachable!() }; - assert!(merge_tree.dedup); - assert_eq!(8192, merge_tree.index_max_keys_per_shard); - assert_eq!(1024, merge_tree.data_freeze_threshold); - assert_eq!(ReadableSize::mb(512), merge_tree.fork_dictionary_bytes); + assert!(memtable_config.dedup); + assert_eq!(8192, memtable_config.index_max_keys_per_shard); + assert_eq!(1024, memtable_config.data_freeze_threshold); + assert_eq!(ReadableSize::mb(512), memtable_config.fork_dictionary_bytes); } #[test] diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/partition_tree.rs similarity index 89% rename from src/mito2/src/memtable/merge_tree.rs rename to src/mito2/src/memtable/partition_tree.rs index a449c1a4c6..a6a5f9dd44 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Memtable implementation based on a merge tree. +//! Memtable implementation based on a partition tree. pub(crate) mod data; mod dedup; @@ -37,8 +37,8 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::tree::MergeTree; +use crate::memtable::partition_tree::metrics::WriteMetrics; +use crate::memtable::partition_tree::tree::PartitionTree; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, @@ -61,13 +61,13 @@ struct PkId { pk_index: PkIndex, } -// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple merge -// tree memtable then we will use a lot memory. We should find a better way to control the +// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple partition tree +// memtable then we will use a lot memory. We should find a better way to control the // dictionary size. -/// Config for the merge tree memtable. +/// Config for the partition tree memtable. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] -pub struct MergeTreeConfig { +pub struct PartitionTreeConfig { /// Max keys in an index shard. pub index_max_keys_per_shard: usize, /// Number of rows to freeze a data part. @@ -82,7 +82,7 @@ pub struct MergeTreeConfig { pub fork_dictionary_bytes: ReadableSize, } -impl Default for MergeTreeConfig { +impl Default for PartitionTreeConfig { fn default() -> Self { let mut fork_dictionary_bytes = ReadableSize::gb(1); if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { @@ -102,24 +102,24 @@ impl Default for MergeTreeConfig { } } -/// Memtable based on a merge tree. -pub struct MergeTreeMemtable { +/// Memtable based on a partition tree. +pub struct PartitionTreeMemtable { id: MemtableId, - tree: MergeTree, + tree: PartitionTree, alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, } -impl fmt::Debug for MergeTreeMemtable { +impl fmt::Debug for PartitionTreeMemtable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("MergeTreeMemtable") + f.debug_struct("PartitionTreeMemtable") .field("id", &self.id) .finish() } } -impl Memtable for MergeTreeMemtable { +impl Memtable for PartitionTreeMemtable { fn id(&self) -> MemtableId { self.id } @@ -197,29 +197,29 @@ impl Memtable for MergeTreeMemtable { fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { let tree = self.tree.fork(metadata.clone()); - let memtable = MergeTreeMemtable::with_tree(id, tree); + let memtable = PartitionTreeMemtable::with_tree(id, tree); Arc::new(memtable) } } -impl MergeTreeMemtable { +impl PartitionTreeMemtable { /// Returns a new memtable. pub fn new( id: MemtableId, metadata: RegionMetadataRef, write_buffer_manager: Option, - config: &MergeTreeConfig, + config: &PartitionTreeConfig, ) -> Self { Self::with_tree( id, - MergeTree::new(metadata, config, write_buffer_manager.clone()), + PartitionTree::new(metadata, config, write_buffer_manager.clone()), ) } /// Creates a mutable memtable from the tree. /// /// It also adds the bytes used by shared parts (e.g. index) to the memory usage. - fn with_tree(id: MemtableId, tree: MergeTree) -> Self { + fn with_tree(id: MemtableId, tree: PartitionTree) -> Self { let alloc_tracker = AllocTracker::new(tree.write_buffer_manager()); Self { @@ -278,17 +278,17 @@ impl MergeTreeMemtable { } } -/// Builder to build a [MergeTreeMemtable]. +/// Builder to build a [PartitionTreeMemtable]. #[derive(Debug, Default)] -pub struct MergeTreeMemtableBuilder { - config: MergeTreeConfig, +pub struct PartitionTreeMemtableBuilder { + config: PartitionTreeConfig, write_buffer_manager: Option, } -impl MergeTreeMemtableBuilder { +impl PartitionTreeMemtableBuilder { /// Creates a new builder with specific `write_buffer_manager`. pub fn new( - config: MergeTreeConfig, + config: PartitionTreeConfig, write_buffer_manager: Option, ) -> Self { Self { @@ -298,9 +298,9 @@ impl MergeTreeMemtableBuilder { } } -impl MemtableBuilder for MergeTreeMemtableBuilder { +impl MemtableBuilder for PartitionTreeMemtableBuilder { fn build(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef { - Arc::new(MergeTreeMemtable::new( + Arc::new(PartitionTreeMemtable::new( id, metadata.clone(), self.write_buffer_manager.clone(), @@ -335,7 +335,8 @@ mod tests { let timestamps = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1); - let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default()); + let memtable = + PartitionTreeMemtable::new(1, metadata, None, &PartitionTreeConfig::default()); memtable.write(&kvs).unwrap(); let expected_ts = kvs @@ -371,7 +372,7 @@ mod tests { memtable_util::metadata_with_primary_key(vec![], false) }; let memtable = - MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default()); + PartitionTreeMemtable::new(1, metadata.clone(), None, &PartitionTreeConfig::default()); let kvs = memtable_util::build_key_values( &metadata, @@ -430,8 +431,8 @@ mod tests { memtable_util::metadata_with_primary_key(vec![], false) }; // Try to build a memtable via the builder. - let memtable = - MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata); + let memtable = PartitionTreeMemtableBuilder::new(PartitionTreeConfig::default(), None) + .build(1, &metadata); let expect = (0..100).collect::>(); let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1); @@ -466,11 +467,11 @@ mod tests { fn write_iter_multi_keys(max_keys: usize, freeze_threshold: usize) { let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true); - let memtable = MergeTreeMemtable::new( + let memtable = PartitionTreeMemtable::new( 1, metadata.clone(), None, - &MergeTreeConfig { + &PartitionTreeConfig { index_max_keys_per_shard: max_keys, data_freeze_threshold: freeze_threshold, ..Default::default() @@ -515,8 +516,8 @@ mod tests { fn test_memtable_filter() { let metadata = memtable_util::metadata_with_primary_key(vec![0, 1], false); // Try to build a memtable via the builder. - let memtable = MergeTreeMemtableBuilder::new( - MergeTreeConfig { + let memtable = PartitionTreeMemtableBuilder::new( + PartitionTreeConfig { index_max_keys_per_shard: 40, ..Default::default() }, @@ -551,14 +552,14 @@ mod tests { #[test] fn test_deserialize_config() { - let config = MergeTreeConfig { + let config = PartitionTreeConfig { dedup: false, ..Default::default() }; // Creates a json with dedup = false. let json = serde_json::to_string(&config).unwrap(); - let config: MergeTreeConfig = serde_json::from_str(&json).unwrap(); + let config: PartitionTreeConfig = serde_json::from_str(&json).unwrap(); assert!(config.dedup); - assert_eq!(MergeTreeConfig::default(), config); + assert_eq!(PartitionTreeConfig::default(), config); } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/partition_tree/data.rs similarity index 98% rename from src/mito2/src/memtable/merge_tree/data.rs rename to src/mito2/src/memtable/partition_tree/data.rs index 3f6557ee03..8651eceb4b 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/partition_tree/data.rs @@ -45,9 +45,11 @@ use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; use crate::error; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger}; -use crate::memtable::merge_tree::PkIndex; -use crate::metrics::{MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, MERGE_TREE_READ_STAGE_ELAPSED}; +use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger}; +use crate::memtable::partition_tree::PkIndex; +use crate::metrics::{ + PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED, +}; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; @@ -255,7 +257,7 @@ impl DataBuffer { /// Builds a lazily initialized data buffer reader from [DataBuffer] pub fn read(&self) -> Result { - let _timer = MERGE_TREE_READ_STAGE_ELAPSED + let _timer = PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["read_data_buffer"]) .start_timer(); @@ -523,7 +525,7 @@ pub(crate) struct DataBufferReader { impl Drop for DataBufferReader { fn drop(&mut self) { - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["read_data_buffer"]) .observe(self.elapsed_time.as_secs_f64()) } @@ -780,7 +782,7 @@ impl<'a> DataPartEncoder<'a> { let mut bytes = Vec::with_capacity(1024); let rb = { - let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED + let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED .with_label_values(&["drain_data_buffer_to_batch"]) .start_timer(); drain_data_buffer_to_record_batches( @@ -793,7 +795,7 @@ impl<'a> DataPartEncoder<'a> { }; { - let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED + let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED .with_label_values(&["encode"]) .start_timer(); let mut writer = @@ -837,7 +839,7 @@ pub struct DataPartReader { impl Drop for DataPartReader { fn drop(&mut self) { - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["read_data_part"]) .observe(self.elapsed.as_secs_f64()); } @@ -973,7 +975,7 @@ impl DataParts { /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. pub fn read(&self) -> Result { - let _timer = MERGE_TREE_READ_STAGE_ELAPSED + let _timer = PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["build_data_parts_reader"]) .start_timer(); @@ -1030,7 +1032,7 @@ pub struct DataPartsReader { impl Drop for DataPartsReader { fn drop(&mut self) { - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["read_data_parts"]) .observe(self.elapsed.as_secs_f64()) } diff --git a/src/mito2/src/memtable/merge_tree/dedup.rs b/src/mito2/src/memtable/partition_tree/dedup.rs similarity index 96% rename from src/mito2/src/memtable/merge_tree/dedup.rs rename to src/mito2/src/memtable/partition_tree/dedup.rs index 0a68f9f564..59f481266b 100644 --- a/src/mito2/src/memtable/merge_tree/dedup.rs +++ b/src/mito2/src/memtable/partition_tree/dedup.rs @@ -15,9 +15,9 @@ use std::ops::Range; use crate::error::Result; -use crate::memtable::merge_tree::data::DataBatch; -use crate::memtable::merge_tree::shard::DataBatchSource; -use crate::memtable::merge_tree::PkId; +use crate::memtable::partition_tree::data::DataBatch; +use crate::memtable::partition_tree::shard::DataBatchSource; +use crate::memtable::partition_tree::PkId; /// A reader that dedup sorted batches from a merger. pub struct DedupReader { @@ -112,7 +112,7 @@ mod tests { use store_api::metadata::RegionMetadataRef; use super::*; - use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader}; + use crate::memtable::partition_tree::data::{DataBuffer, DataParts, DataPartsReader}; use crate::test_util::memtable_util::{ extract_data_batch, metadata_for_test, write_rows_to_buffer, }; diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs similarity index 99% rename from src/mito2/src/memtable/merge_tree/dict.rs rename to src/mito2/src/memtable/partition_tree/dict.rs index 126b8bdb13..ea7874352b 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use datatypes::arrow::array::{Array, ArrayBuilder, BinaryArray, BinaryBuilder}; -use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::PkIndex; +use crate::memtable::partition_tree::metrics::WriteMetrics; +use crate::memtable::partition_tree::PkIndex; use crate::metrics::MEMTABLE_DICT_BYTES; /// Maximum keys in a [DictBlock]. diff --git a/src/mito2/src/memtable/merge_tree/merger.rs b/src/mito2/src/memtable/partition_tree/merger.rs similarity index 98% rename from src/mito2/src/memtable/merge_tree/merger.rs rename to src/mito2/src/memtable/partition_tree/merger.rs index 4441eaa593..71dcacc08e 100644 --- a/src/mito2/src/memtable/merge_tree/merger.rs +++ b/src/mito2/src/memtable/partition_tree/merger.rs @@ -18,8 +18,8 @@ use std::fmt::Debug; use std::ops::Range; use crate::error::Result; -use crate::memtable::merge_tree::data::{DataBatch, DataBufferReader, DataPartReader}; -use crate::memtable::merge_tree::PkIndex; +use crate::memtable::partition_tree::data::{DataBatch, DataBufferReader, DataPartReader}; +use crate::memtable::partition_tree::PkIndex; /// Nodes of merger's heap. pub trait Node: Ord { @@ -297,7 +297,7 @@ mod tests { use store_api::metadata::RegionMetadataRef; use super::*; - use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBuffer}; + use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBuffer}; use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test}; fn write_rows_to_buffer( diff --git a/src/mito2/src/memtable/merge_tree/metrics.rs b/src/mito2/src/memtable/partition_tree/metrics.rs similarity index 96% rename from src/mito2/src/memtable/merge_tree/metrics.rs rename to src/mito2/src/memtable/partition_tree/metrics.rs index 7a2e37359a..584ff5dae7 100644 --- a/src/mito2/src/memtable/merge_tree/metrics.rs +++ b/src/mito2/src/memtable/partition_tree/metrics.rs @@ -14,7 +14,7 @@ //! Internal metrics of the memtable. -/// Metrics of writing the merge tree. +/// Metrics of writing the partition tree. pub struct WriteMetrics { /// Size allocated by keys. pub key_bytes: usize, diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs similarity index 96% rename from src/mito2/src/memtable/merge_tree/partition.rs rename to src/mito2/src/memtable/partition_tree/partition.rs index f031de57eb..3f38206dd8 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Partition of a merge tree. +//! Partition of a partition tree. //! //! We only support partitioning the tree by pre-defined internal columns. @@ -28,15 +28,15 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP}; -use crate::memtable::merge_tree::dedup::DedupReader; -use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::shard::{ +use crate::memtable::partition_tree::data::{DataBatch, DataParts, DATA_INIT_CAP}; +use crate::memtable::partition_tree::dedup::DedupReader; +use crate::memtable::partition_tree::metrics::WriteMetrics; +use crate::memtable::partition_tree::shard::{ BoxedDataBatchSource, Shard, ShardMerger, ShardNode, ShardSource, }; -use crate::memtable::merge_tree::shard_builder::ShardBuilder; -use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; -use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; +use crate::memtable::partition_tree::shard_builder::ShardBuilder; +use crate::memtable::partition_tree::{PartitionTreeConfig, PkId}; +use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; use crate::row_converter::{McmpRowCodec, RowCodec}; @@ -54,7 +54,7 @@ pub type PartitionRef = Arc; impl Partition { /// Creates a new partition. - pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { + pub fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self { Partition { inner: RwLock::new(Inner::new(metadata, config)), dedup: config.dedup, @@ -193,7 +193,7 @@ impl Partition { /// Forks the partition. /// /// Must freeze the partition before fork. - pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition { + pub fn fork(&self, metadata: &RegionMetadataRef, config: &PartitionTreeConfig) -> Partition { let (shards, shard_builder) = { let inner = self.inner.read().unwrap(); debug_assert!(inner.shard_builder.is_empty()); @@ -437,11 +437,11 @@ pub(crate) struct ReadPartitionContext { impl Drop for ReadPartitionContext { fn drop(&mut self) { let partition_read_source = self.metrics.read_source.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_read_source"]) .observe(partition_read_source); let partition_data_batch_to_batch = self.metrics.data_batch_to_batch.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["partition_data_batch_to_batch"]) .observe(partition_data_batch_to_batch); @@ -558,7 +558,7 @@ struct Inner { } impl Inner { - fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { + fn new(metadata: RegionMetadataRef, config: &PartitionTreeConfig) -> Self { let (shards, current_shard_id) = if metadata.primary_key.is_empty() { let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup); ( diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/partition_tree/shard.rs similarity index 95% rename from src/mito2/src/memtable/merge_tree/shard.rs rename to src/mito2/src/memtable/partition_tree/shard.rs index 2ac1ee90bc..33ca0e414b 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/partition_tree/shard.rs @@ -21,15 +21,15 @@ use store_api::metadata::RegionMetadataRef; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{ +use crate::memtable::partition_tree::data::{ DataBatch, DataParts, DataPartsReader, DataPartsReaderBuilder, DATA_INIT_CAP, }; -use crate::memtable::merge_tree::dict::KeyDictRef; -use crate::memtable::merge_tree::merger::{Merger, Node}; -use crate::memtable::merge_tree::partition::PrimaryKeyFilter; -use crate::memtable::merge_tree::shard_builder::ShardBuilderReader; -use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; -use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; +use crate::memtable::partition_tree::dict::KeyDictRef; +use crate::memtable::partition_tree::merger::{Merger, Node}; +use crate::memtable::partition_tree::partition::PrimaryKeyFilter; +use crate::memtable::partition_tree::shard_builder::ShardBuilderReader; +use crate::memtable::partition_tree::{PkId, PkIndex, ShardId}; +use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; /// Shard stores data related to the same key dictionary. pub struct Shard { @@ -257,7 +257,7 @@ impl ShardReader { impl Drop for ShardReader { fn drop(&mut self) { let shard_prune_pk = self.prune_pk_cost.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_prune_pk"]) .observe(shard_prune_pk); if self.keys_before_pruning > 0 { @@ -427,10 +427,10 @@ mod tests { use std::sync::Arc; use super::*; - use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice; - use crate::memtable::merge_tree::dict::KeyDictBuilder; - use crate::memtable::merge_tree::metrics::WriteMetrics; - use crate::memtable::merge_tree::PkIndex; + use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; + use crate::memtable::partition_tree::dict::KeyDictBuilder; + use crate::memtable::partition_tree::metrics::WriteMetrics; + use crate::memtable::partition_tree::PkIndex; use crate::memtable::KeyValues; use crate::test_util::memtable_util::{ build_key_values_with_ts_seq_values, encode_keys, metadata_for_test, diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/partition_tree/shard_builder.rs similarity index 93% rename from src/mito2/src/memtable/merge_tree/shard_builder.rs rename to src/mito2/src/memtable/partition_tree/shard_builder.rs index 01cb2de25a..12739d16d3 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/partition_tree/shard_builder.rs @@ -22,15 +22,15 @@ use store_api::metadata::RegionMetadataRef; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{ +use crate::memtable::partition_tree::data::{ DataBatch, DataBuffer, DataBufferReader, DataBufferReaderBuilder, DataParts, DATA_INIT_CAP, }; -use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; -use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::partition::PrimaryKeyFilter; -use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::{MergeTreeConfig, PkId, PkIndex, ShardId}; -use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; +use crate::memtable::partition_tree::dict::{DictBuilderReader, KeyDictBuilder}; +use crate::memtable::partition_tree::metrics::WriteMetrics; +use crate::memtable::partition_tree::partition::PrimaryKeyFilter; +use crate::memtable::partition_tree::shard::Shard; +use crate::memtable::partition_tree::{PartitionTreeConfig, PkId, PkIndex, ShardId}; +use crate::metrics::PARTITION_TREE_READ_STAGE_ELAPSED; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -50,7 +50,7 @@ impl ShardBuilder { /// Returns a new builder. pub fn new( metadata: RegionMetadataRef, - config: &MergeTreeConfig, + config: &PartitionTreeConfig, shard_id: ShardId, ) -> ShardBuilder { ShardBuilder { @@ -150,14 +150,14 @@ impl ShardBuilder { /// Scans the shard builder. pub fn read(&self, pk_weights_buffer: &mut Vec) -> Result { let dict_reader = { - let _timer = MERGE_TREE_READ_STAGE_ELAPSED + let _timer = PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_builder_read_pk"]) .start_timer(); self.dict_builder.read() }; { - let _timer = MERGE_TREE_READ_STAGE_ELAPSED + let _timer = PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["sort_pk"]) .start_timer(); dict_reader.pk_weights_to_sort_data(pk_weights_buffer); @@ -296,7 +296,7 @@ impl ShardBuilderReader { impl Drop for ShardBuilderReader { fn drop(&mut self) { let shard_builder_prune_pk = self.prune_pk_cost.as_secs_f64(); - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["shard_builder_prune_pk"]) .observe(shard_builder_prune_pk); if self.keys_before_pruning > 0 { @@ -315,8 +315,8 @@ impl Drop for ShardBuilderReader { mod tests { use super::*; - use crate::memtable::merge_tree::data::timestamp_array_to_i64_slice; - use crate::memtable::merge_tree::metrics::WriteMetrics; + use crate::memtable::partition_tree::data::timestamp_array_to_i64_slice; + use crate::memtable::partition_tree::metrics::WriteMetrics; use crate::memtable::KeyValues; use crate::test_util::memtable_util::{ build_key_values_with_ts_seq_values, encode_key_by_kv, metadata_for_test, @@ -355,7 +355,7 @@ mod tests { fn test_write_shard_builder() { let metadata = metadata_for_test(); let input = input_with_key(&metadata); - let config = MergeTreeConfig::default(); + let config = PartitionTreeConfig::default(); let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1); let mut metrics = WriteMetrics::default(); assert!(shard_builder @@ -382,7 +382,7 @@ mod tests { fn test_write_read_shard_builder() { let metadata = metadata_for_test(); let input = input_with_key(&metadata); - let config = MergeTreeConfig::default(); + let config = PartitionTreeConfig::default(); let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1); let mut metrics = WriteMetrics::default(); diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/partition_tree/tree.rs similarity index 95% rename from src/mito2/src/memtable/merge_tree/tree.rs rename to src/mito2/src/memtable/partition_tree/tree.rs index a059643dd4..ca0b478c87 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/partition_tree/tree.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Implementation of the merge tree. +//! Implementation of the partition tree. use std::collections::{BTreeMap, HashSet, VecDeque}; use std::sync::{Arc, RwLock}; @@ -33,20 +33,20 @@ use table::predicate::Predicate; use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu}; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::partition::{ +use crate::memtable::partition_tree::metrics::WriteMetrics; +use crate::memtable::partition_tree::partition::{ Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, }; -use crate::memtable::merge_tree::MergeTreeConfig; +use crate::memtable::partition_tree::PartitionTreeConfig; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; +use crate::metrics::{PARTITION_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; -/// The merge tree. -pub struct MergeTree { +/// The partition tree. +pub struct PartitionTree { /// Config of the tree. - config: MergeTreeConfig, + config: PartitionTreeConfig, /// Metadata of the region. pub(crate) metadata: RegionMetadataRef, /// Primary key codec. @@ -60,13 +60,13 @@ pub struct MergeTree { sparse_encoder: Arc, } -impl MergeTree { - /// Creates a new merge tree. +impl PartitionTree { + /// Creates a new partition tree. pub fn new( metadata: RegionMetadataRef, - config: &MergeTreeConfig, + config: &PartitionTreeConfig, write_buffer_manager: Option, - ) -> MergeTree { + ) -> PartitionTree { let row_codec = McmpRowCodec::new( metadata .primary_key_columns() @@ -84,7 +84,7 @@ impl MergeTree { }; let is_partitioned = Partition::has_multi_partitions(&metadata); - MergeTree { + PartitionTree { config: config.clone(), metadata, row_codec: Arc::new(row_codec), @@ -260,12 +260,12 @@ impl MergeTree { /// Forks an immutable tree. Returns a mutable tree that inherits the index /// of this tree. - pub fn fork(&self, metadata: RegionMetadataRef) -> MergeTree { + pub fn fork(&self, metadata: RegionMetadataRef) -> PartitionTree { if self.metadata.schema_version != metadata.schema_version || self.metadata.column_metadatas != metadata.column_metadatas { // The schema has changed, we can't reuse the tree. - return MergeTree::new(metadata, &self.config, self.write_buffer_manager.clone()); + return PartitionTree::new(metadata, &self.config, self.write_buffer_manager.clone()); } let mut total_shared_size = 0; @@ -313,7 +313,7 @@ impl MergeTree { forked.insert(part_key, Arc::new(forked_part)); } - MergeTree { + PartitionTree { config: self.config.clone(), metadata, row_codec: self.row_codec.clone(), @@ -446,9 +446,9 @@ struct TreeIter { impl Drop for TreeIter { fn drop(&mut self) { READ_ROWS_TOTAL - .with_label_values(&["merge_tree_memtable"]) + .with_label_values(&["partition_tree_memtable"]) .inc_by(self.metrics.rows_fetched as u64); - MERGE_TREE_READ_STAGE_ELAPSED + PARTITION_TREE_READ_STAGE_ELAPSED .with_label_values(&["fetch_next_partition"]) .observe(self.metrics.fetch_partition_elapsed.as_secs_f64()); let scan_elapsed = self.metrics.iter_elapsed.as_secs_f64(); diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index cba3ba3079..6d92488a7b 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -415,13 +415,13 @@ struct PartitionToWrite<'a> { #[cfg(test)] mod tests { use super::*; - use crate::memtable::merge_tree::MergeTreeMemtableBuilder; + use crate::memtable::partition_tree::PartitionTreeMemtableBuilder; use crate::test_util::memtable_util::{self, collect_iter_timestamps}; #[test] fn test_no_duration() { let metadata = memtable_util::metadata_for_test(); - let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder, 0, None); assert_eq!(1, partitions.num_partitions()); assert!(partitions.is_empty()); @@ -449,7 +449,7 @@ mod tests { #[test] fn test_write_single_part() { let metadata = memtable_util::metadata_for_test(); - let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(10))); assert_eq!(0, partitions.num_partitions()); @@ -496,7 +496,7 @@ mod tests { #[test] fn test_write_multi_parts() { let metadata = memtable_util::metadata_for_test(); - let builder = Arc::new(MergeTreeMemtableBuilder::default()); + let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder, 0, Some(Duration::from_secs(5))); assert_eq!(0, partitions.num_partitions()); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index b9e7fb33d1..f31aa00b6c 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -278,23 +278,23 @@ lazy_static! { .with_label_values(&["flush", "intermediate"]); // ------- End of index metrics. - /// Merge tree memtable data buffer freeze metrics - pub static ref MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( - "greptime_merge_tree_buffer_freeze_stage_elapsed", - "mito merge tree data buffer freeze stage elapsed", + /// Partition tree memtable data buffer freeze metrics + pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_partition_tree_buffer_freeze_stage_elapsed", + "mito partition tree data buffer freeze stage elapsed", &[STAGE_LABEL], vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] ) .unwrap(); - /// Merge tree memtable read path metrics - pub static ref MERGE_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( - "greptime_merge_tree_read_stage_elapsed", - "mito merge tree read stage elapsed", + /// Partition tree memtable read path metrics + pub static ref PARTITION_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_partition_tree_read_stage_elapsed", + "mito partition tree read stage elapsed", &[STAGE_LABEL], vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] ) .unwrap(); - // ------- End of merge tree memtable metrics. + // ------- End of partition tree memtable metrics. } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index c45e431048..f595b1f15b 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt}; use store_api::storage::ColumnId; use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result}; -use crate::memtable::merge_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD}; +use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD}; const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; @@ -225,17 +225,17 @@ impl Default for InvertedIndexOptions { #[serde(tag = "memtable.type", rename_all = "snake_case")] pub enum MemtableOptions { TimeSeries, - #[serde(with = "prefix_experimental")] - Experimental(ExperimentalOptions), + #[serde(with = "prefix_partition_tree")] + PartitionTree(PartitionTreeOptions), } -with_prefix!(prefix_experimental "memtable.experimental."); +with_prefix!(prefix_partition_tree "memtable.partition_tree."); -/// Experimental memtable options. +/// Partition tree memtable options. #[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(default)] -pub struct ExperimentalOptions { +pub struct PartitionTreeOptions { /// Max keys in an index shard. #[serde_as(as = "DisplayFromStr")] pub index_max_keys_per_shard: usize, @@ -246,7 +246,7 @@ pub struct ExperimentalOptions { pub fork_dictionary_bytes: ReadableSize, } -impl Default for ExperimentalOptions { +impl Default for PartitionTreeOptions { fn default() -> Self { Self { index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD, @@ -442,10 +442,12 @@ mod tests { }; assert_eq!(expect, options); - let map = make_map(&[("memtable.type", "experimental")]); + let map = make_map(&[("memtable.type", "partition_tree")]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { - memtable: Some(MemtableOptions::Experimental(ExperimentalOptions::default())), + memtable: Some(MemtableOptions::PartitionTree( + PartitionTreeOptions::default(), + )), ..Default::default() }; assert_eq!(expect, options); @@ -476,10 +478,10 @@ mod tests { WAL_OPTIONS_KEY, &serde_json::to_string(&wal_options).unwrap(), ), - ("memtable.type", "experimental"), - ("memtable.experimental.index_max_keys_per_shard", "2048"), - ("memtable.experimental.data_freeze_threshold", "2048"), - ("memtable.experimental.fork_dictionary_bytes", "128M"), + ("memtable.type", "partition_tree"), + ("memtable.partition_tree.index_max_keys_per_shard", "2048"), + ("memtable.partition_tree.data_freeze_threshold", "2048"), + ("memtable.partition_tree.fork_dictionary_bytes", "128M"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { @@ -497,7 +499,7 @@ mod tests { segment_row_count: 512, }, }, - memtable: Some(MemtableOptions::Experimental(ExperimentalOptions { + memtable: Some(MemtableOptions::PartitionTree(PartitionTreeOptions { index_max_keys_per_shard: 2048, data_freeze_threshold: 2048, fork_dictionary_bytes: ReadableSize::mb(128), diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 3fe378b099..1628ecf56e 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -30,7 +30,7 @@ use table::predicate::Predicate; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; +use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6f4f5d7692..d007a0c0dc 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -48,7 +48,7 @@ use crate::config::MitoConfig; use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::manifest::action::RegionEdit; -use crate::memtable::merge_tree::MergeTreeMemtableBuilder; +use crate::memtable::partition_tree::PartitionTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::memtable::{MemtableBuilderProvider, MemtableConfig}; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; @@ -340,8 +340,8 @@ impl WorkerStarter { let running = Arc::new(AtomicBool::new(true)); let default_memtable_builder = match &self.config.memtable { - MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new( - merge_tree.clone(), + MemtableConfig::PartitionTree(config) => Arc::new(PartitionTreeMemtableBuilder::new( + config.clone(), Some(self.write_buffer_manager.clone()), )) as _, MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some( diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 26b62551ec..cfb26d3089 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -30,9 +30,9 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "index.inverted_index.segment_row_count", WAL_OPTIONS_KEY, "memtable.type", - "memtable.experimental.index_max_keys_per_shard", - "memtable.experimental.data_freeze_threshold", - "memtable.experimental.fork_dictionary_bytes", + "memtable.partition_tree.index_max_keys_per_shard", + "memtable.partition_tree.data_freeze_threshold", + "memtable.partition_tree.fork_dictionary_bytes", ] .contains(&key) } @@ -62,13 +62,13 @@ mod tests { assert!(is_mito_engine_option_key("wal_options")); assert!(is_mito_engine_option_key("memtable.type")); assert!(is_mito_engine_option_key( - "memtable.experimental.index_max_keys_per_shard" + "memtable.partition_tree.index_max_keys_per_shard" )); assert!(is_mito_engine_option_key( - "memtable.experimental.data_freeze_threshold" + "memtable.partition_tree.data_freeze_threshold" )); assert!(is_mito_engine_option_key( - "memtable.experimental.fork_dictionary_bytes" + "memtable.partition_tree.fork_dictionary_bytes" )); assert!(!is_mito_engine_option_key("foo")); } diff --git a/tests/cases/standalone/common/create/create_with_options.result b/tests/cases/standalone/common/create/create_with_options.result index 5dd872c7f7..f205e8469d 100644 --- a/tests/cases/standalone/common/create/create_with_options.result +++ b/tests/cases/standalone/common/create/create_with_options.result @@ -74,7 +74,7 @@ with( 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', 'wal_options'='{"wal.provider":"raft_engine"}', - 'memtable.type' = 'experimental', + 'memtable.type' = 'partition_tree', ); Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/create_with_options.sql b/tests/cases/standalone/common/create/create_with_options.sql index 1f3c533412..99aa471189 100644 --- a/tests/cases/standalone/common/create/create_with_options.sql +++ b/tests/cases/standalone/common/create/create_with_options.sql @@ -64,7 +64,7 @@ with( 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', 'wal_options'='{"wal.provider":"raft_engine"}', - 'memtable.type' = 'experimental', + 'memtable.type' = 'partition_tree', ); drop table test_mito_options;