mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: make MergeTreeMemtable the default choice (#3430)
* refactor: make MergeTreeMemtable the default choice * refactor: reformat * chore: add doc to config
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5380,6 +5380,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml 0.8.8",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
|
||||
@@ -138,6 +138,18 @@ mem_threshold_on_create = "64M"
|
||||
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
|
||||
intermediate_path = ""
|
||||
|
||||
[region_engine.mito.memtable]
|
||||
# Memtable type.
|
||||
# - "experimental": experimental memtable
|
||||
# - "time_series": time-series memtable (deprecated)
|
||||
type = "experimental"
|
||||
# The max number of keys in one shard.
|
||||
index_max_keys_per_shard = 8192
|
||||
# The max rows of data inside the actively writing buffer in one shard.
|
||||
data_freeze_threshold = 32768
|
||||
# Max dictionary bytes.
|
||||
fork_dictionary_bytes = "1GiB"
|
||||
|
||||
# Log options, see `standalone.example.toml`
|
||||
# [logging]
|
||||
# dir = "/tmp/greptimedb/logs"
|
||||
|
||||
@@ -244,6 +244,18 @@ mem_threshold_on_create = "64M"
|
||||
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
|
||||
intermediate_path = ""
|
||||
|
||||
[region_engine.mito.memtable]
|
||||
# Memtable type.
|
||||
# - "experimental": experimental memtable
|
||||
# - "time_series": time-series memtable (deprecated)
|
||||
type = "experimental"
|
||||
# The max number of keys in one shard.
|
||||
index_max_keys_per_shard = 8192
|
||||
# The max rows of data inside the actively writing buffer in one shard.
|
||||
data_freeze_threshold = 32768
|
||||
# Max dictionary bytes.
|
||||
fork_dictionary_bytes = "1GiB"
|
||||
|
||||
# Log options
|
||||
# [logging]
|
||||
# Specify logs directory.
|
||||
|
||||
@@ -76,6 +76,7 @@ common-test-util.workspace = true
|
||||
criterion = "0.4"
|
||||
log-store.workspace = true
|
||||
rand.workspace = true
|
||||
toml.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_merge_tree"
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, NoneAsEmptyString};
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::merge_tree::MergeTreeConfig;
|
||||
use crate::memtable::MemtableConfig;
|
||||
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
|
||||
|
||||
/// Default max running background job.
|
||||
@@ -104,8 +104,8 @@ pub struct MitoConfig {
|
||||
/// Inverted index configs.
|
||||
pub inverted_index: InvertedIndexConfig,
|
||||
|
||||
/// Experimental memtable.
|
||||
pub experimental_memtable: Option<MergeTreeConfig>,
|
||||
/// Memtable config
|
||||
pub memtable: MemtableConfig,
|
||||
}
|
||||
|
||||
impl Default for MitoConfig {
|
||||
@@ -131,7 +131,7 @@ impl Default for MitoConfig {
|
||||
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
|
||||
allow_stale_entries: false,
|
||||
inverted_index: InvertedIndexConfig::default(),
|
||||
experimental_memtable: None,
|
||||
memtable: MemtableConfig::default(),
|
||||
};
|
||||
|
||||
// Adjust buffer and cache size according to system memory if we can.
|
||||
@@ -319,3 +319,25 @@ fn divide_num_cpus(divisor: usize) -> usize {
|
||||
|
||||
(cores + divisor - 1) / divisor
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_config() {
|
||||
let s = r#"
|
||||
[memtable]
|
||||
type = "experimental"
|
||||
index_max_keys_per_shard = 8192
|
||||
data_freeze_threshold = 1024
|
||||
dedup = true
|
||||
fork_dictionary_bytes = "512MiB"
|
||||
"#;
|
||||
let config: MitoConfig = toml::from_str(s).unwrap();
|
||||
let MemtableConfig::Experimental(config) = &config.memtable else {
|
||||
unreachable!()
|
||||
};
|
||||
assert_eq!(1024, config.data_freeze_threshold);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,16 +14,12 @@
|
||||
|
||||
//! Memtables are write buffers for regions.
|
||||
|
||||
pub mod key_values;
|
||||
pub mod merge_tree;
|
||||
pub mod time_series;
|
||||
pub(crate) mod version;
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_time::Timestamp;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
@@ -31,14 +27,34 @@ use table::predicate::Predicate;
|
||||
use crate::error::Result;
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
pub use crate::memtable::key_values::KeyValues;
|
||||
use crate::memtable::merge_tree::MergeTreeConfig;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
use crate::read::Batch;
|
||||
|
||||
pub mod key_values;
|
||||
pub mod merge_tree;
|
||||
pub mod time_series;
|
||||
pub(crate) mod version;
|
||||
|
||||
/// Id for memtables.
|
||||
///
|
||||
/// Should be unique under the same region.
|
||||
pub type MemtableId = u32;
|
||||
|
||||
/// Config for memtables.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum MemtableConfig {
|
||||
Experimental(MergeTreeConfig),
|
||||
TimeSeries,
|
||||
}
|
||||
|
||||
impl Default for MemtableConfig {
|
||||
fn default() -> Self {
|
||||
Self::Experimental(MergeTreeConfig::default())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct MemtableStats {
|
||||
/// The estimated bytes allocated by this memtable from heap.
|
||||
@@ -187,9 +203,30 @@ impl Drop for AllocTracker {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_base::readable_size::ReadableSize;
|
||||
|
||||
use super::*;
|
||||
use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_memtable_config() {
|
||||
let s = r#"
|
||||
type = "experimental"
|
||||
index_max_keys_per_shard = 8192
|
||||
data_freeze_threshold = 1024
|
||||
dedup = true
|
||||
fork_dictionary_bytes = "512MiB"
|
||||
"#;
|
||||
let config: MemtableConfig = toml::from_str(s).unwrap();
|
||||
let MemtableConfig::Experimental(merge_tree) = config else {
|
||||
unreachable!()
|
||||
};
|
||||
assert!(merge_tree.dedup);
|
||||
assert_eq!(8192, merge_tree.index_max_keys_per_shard);
|
||||
assert_eq!(1024, merge_tree.data_freeze_threshold);
|
||||
assert_eq!(ReadableSize::mb(512), merge_tree.fork_dictionary_bytes);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_alloc_tracker_without_manager() {
|
||||
let tracker = AllocTracker::new(None);
|
||||
|
||||
@@ -44,7 +44,7 @@ use crate::memtable::{
|
||||
};
|
||||
|
||||
/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
|
||||
const DICTIONARY_SIZE_FACTOR: u64 = 16;
|
||||
const DICTIONARY_SIZE_FACTOR: u64 = 8;
|
||||
|
||||
/// Id of a shard, only unique inside a partition.
|
||||
type ShardId = u32;
|
||||
@@ -74,7 +74,7 @@ pub struct MergeTreeConfig {
|
||||
|
||||
impl Default for MergeTreeConfig {
|
||||
fn default() -> Self {
|
||||
let mut fork_dictionary_bytes = ReadableSize::mb(512);
|
||||
let mut fork_dictionary_bytes = ReadableSize::gb(1);
|
||||
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
|
||||
let adjust_dictionary_bytes =
|
||||
std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
|
||||
@@ -85,7 +85,7 @@ impl Default for MergeTreeConfig {
|
||||
|
||||
Self {
|
||||
index_max_keys_per_shard: 8192,
|
||||
data_freeze_threshold: 102400,
|
||||
data_freeze_threshold: 32768,
|
||||
dedup: true,
|
||||
fork_dictionary_bytes,
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef
|
||||
use crate::manifest::action::RegionEdit;
|
||||
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::memtable::MemtableBuilderRef;
|
||||
use crate::memtable::{MemtableBuilderRef, MemtableConfig};
|
||||
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
|
||||
use crate::request::{
|
||||
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
|
||||
@@ -320,15 +320,15 @@ impl<S: LogStore> WorkerStarter<S> {
|
||||
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
|
||||
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
let memtable_builder = if let Some(config) = &self.config.experimental_memtable {
|
||||
Arc::new(MergeTreeMemtableBuilder::new(
|
||||
config.clone(),
|
||||
|
||||
let memtable_builder = match &self.config.memtable {
|
||||
MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new(
|
||||
merge_tree.clone(),
|
||||
Some(self.write_buffer_manager.clone()),
|
||||
)) as _
|
||||
} else {
|
||||
Arc::new(TimeSeriesMemtableBuilder::new(Some(
|
||||
)) as _,
|
||||
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some(
|
||||
self.write_buffer_manager.clone(),
|
||||
))) as _
|
||||
))) as _,
|
||||
};
|
||||
let mut worker_thread = RegionWorkerLoop {
|
||||
id: self.id,
|
||||
|
||||
@@ -785,6 +785,13 @@ write_buffer_size = "8MiB"
|
||||
mem_threshold_on_create = "64.0MiB"
|
||||
intermediate_path = ""
|
||||
|
||||
[datanode.region_engine.mito.memtable]
|
||||
type = "experimental"
|
||||
index_max_keys_per_shard = 8192
|
||||
data_freeze_threshold = 32768
|
||||
dedup = true
|
||||
fork_dictionary_bytes = "1GiB"
|
||||
|
||||
[[datanode.region_engine]]
|
||||
|
||||
[datanode.region_engine.file]
|
||||
|
||||
Reference in New Issue
Block a user