From 641592644d195c3df3e2e3ae3f6c3e3fc7a24fd6 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 19 Mar 2024 16:50:10 +0800 Subject: [PATCH] feat: support per table memtable options (#3524) * feat: add memtable builder to region * refactor: rename memtable_builder in worker to default_memtable_builder * fix: return error instead of using default compaction options Support deserializing memtable and compaction options from the option map * feat: optional memtable options * feat: add MemtableBuilderProvider to create builders * feat: change default memtable and skip deserializing dedup * chore: update test and comment * chore: test invalid type * feat: metric engine use new memtable manually * feat: expose more memtable configs * feat: add memtable options to valid option list * test: add test * test: sqlness test * chore: serde workspace * chore: remove comments --- src/metric-engine/src/engine/create.rs | 7 +- src/metric-engine/src/engine/options.rs | 5 + src/mito2/Cargo.toml | 2 +- src/mito2/src/engine/create_test.rs | 49 +++++- src/mito2/src/error.rs | 6 +- src/mito2/src/memtable.rs | 46 +++++- src/mito2/src/memtable/merge_tree.rs | 22 +++ src/mito2/src/region.rs | 5 +- src/mito2/src/region/opener.rs | 20 ++- src/mito2/src/region/options.rs | 148 +++++++++++++++--- src/mito2/src/worker.rs | 14 +- src/mito2/src/worker/handle_alter.rs | 10 +- src/mito2/src/worker/handle_catchup.rs | 2 +- src/mito2/src/worker/handle_create.rs | 2 +- src/mito2/src/worker/handle_drop.rs | 4 +- src/mito2/src/worker/handle_open.rs | 2 +- src/mito2/src/worker/handle_truncate.rs | 2 +- src/store-api/src/mito_engine_options.rs | 14 ++ tests-integration/tests/http.rs | 6 +- .../common/create/create_with_options.result | 13 ++ .../common/create/create_with_options.sql | 11 ++ .../standalone/common/show/show_create.result | 2 +- 22 files changed, 337 insertions(+), 55 deletions(-) diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index fbadbf5d2e..8fad9f9dc8 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -36,7 +36,9 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest use store_api::storage::consts::ReservedColumnId; use store_api::storage::RegionId; -use crate::engine::options::set_index_options_for_data_region; +use crate::engine::options::{ + set_index_options_for_data_region, set_memtable_options_for_data_region, +}; use crate::engine::MetricEngineInner; use crate::error::{ ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, @@ -380,6 +382,9 @@ impl MetricEngineInner { // set index options set_index_options_for_data_region(&mut data_region_request.options); + // Set memtable options. + set_memtable_options_for_data_region(&mut data_region_request.options); + data_region_request } diff --git a/src/metric-engine/src/engine/options.rs b/src/metric-engine/src/engine/options.rs index ee071e8d48..034caac6d1 100644 --- a/src/metric-engine/src/engine/options.rs +++ b/src/metric-engine/src/engine/options.rs @@ -42,3 +42,8 @@ pub fn set_index_options_for_data_region(options: &mut HashMap) SEG_ROW_COUNT_FOR_DATA_REGION.to_string(), ); } + +/// Set memtable options for the data region. +pub fn set_memtable_options_for_data_region(options: &mut HashMap) { + options.insert("memtable.type".to_string(), "experimental".to_string()); +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index cf1a8533f1..917d7d0b41 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -58,7 +58,7 @@ prost.workspace = true puffin.workspace = true rand.workspace = true regex = "1.5" -serde = { version = "1.0", features = ["derive"] } +serde.workspace = true serde_json.workspace = true serde_with.workspace = true smallvec.workspace = true diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index eb1cb71690..5e2d804123 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -14,12 +14,15 @@ use std::time::Duration; +use api::v1::Rows; +use common_recordbatch::RecordBatches; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionCloseRequest, RegionRequest}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::region::options::MemtableOptions; +use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_engine_create_new_region() { @@ -198,3 +201,45 @@ async fn test_engine_create_with_custom_store() { .await .unwrap()); } + +#[tokio::test] +async fn test_engine_create_with_memtable_opts() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("memtable.type", "experimental") + .insert_option("memtable.experimental.index_max_keys_per_shard", "2") + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + let Some(MemtableOptions::Experimental(memtable_opts)) = ®ion.version().options.memtable + else { + unreachable!(); + }; + assert_eq!(2, memtable_opts.index_max_keys_per_shard); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 3885e3ae85..39c1527e08 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -572,6 +572,9 @@ pub enum Error { #[snafu(source)] error: parquet::errors::ParquetError, }, + + #[snafu(display("Invalid region options, {}", reason))] + InvalidRegionOptions { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -621,7 +624,8 @@ impl ErrorExt for Error { | FillDefault { .. } | ConvertColumnDataType { .. } | ColumnNotFound { .. } - | InvalidMetadata { .. } => StatusCode::InvalidArguments, + | InvalidMetadata { .. } + | InvalidRegionOptions { .. } => StatusCode::InvalidArguments, InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 8c9cd0172a..9ec4231d7c 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -28,9 +28,11 @@ use crate::error::Result; use crate::flush::WriteBufferManagerRef; use crate::memtable::key_values::KeyValue; pub use crate::memtable::key_values::KeyValues; -use crate::memtable::merge_tree::MergeTreeConfig; +use crate::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtableBuilder}; +use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; +use crate::region::options::MemtableOptions; pub mod key_values; pub mod merge_tree; @@ -53,7 +55,7 @@ pub enum MemtableConfig { impl Default for MemtableConfig { fn default() -> Self { - Self::Experimental(MergeTreeConfig::default()) + Self::TimeSeries } } @@ -206,6 +208,46 @@ impl Drop for AllocTracker { } } +/// Provider of memtable builders for regions. +#[derive(Clone)] +pub(crate) struct MemtableBuilderProvider { + write_buffer_manager: Option, + default_memtable_builder: MemtableBuilderRef, +} + +impl MemtableBuilderProvider { + pub(crate) fn new( + write_buffer_manager: Option, + default_memtable_builder: MemtableBuilderRef, + ) -> Self { + Self { + write_buffer_manager, + default_memtable_builder, + } + } + + pub(crate) fn builder_for_options( + &self, + options: Option<&MemtableOptions>, + ) -> MemtableBuilderRef { + match options { + Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( + self.write_buffer_manager.clone(), + )), + Some(MemtableOptions::Experimental(opts)) => Arc::new(MergeTreeMemtableBuilder::new( + MergeTreeConfig { + index_max_keys_per_shard: opts.index_max_keys_per_shard, + data_freeze_threshold: opts.data_freeze_threshold, + fork_dictionary_bytes: opts.fork_dictionary_bytes, + ..Default::default() + }, + self.write_buffer_manager.clone(), + )), + None => self.default_memtable_builder.clone(), + } + } +} + #[cfg(test)] mod tests { use common_base::readable_size::ReadableSize; diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index a916f4f9b4..a449c1a4c6 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -46,6 +46,8 @@ use crate::memtable::{ /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. const DICTIONARY_SIZE_FACTOR: u64 = 8; +pub(crate) const DEFAULT_MAX_KEYS_PER_SHARD: usize = 8192; +pub(crate) const DEFAULT_FREEZE_THRESHOLD: usize = 131072; /// Id of a shard, only unique inside a partition. type ShardId = u32; @@ -59,6 +61,9 @@ struct PkId { pk_index: PkIndex, } +// TODO(yingwen): `fork_dictionary_bytes` is per region option, if we have multiple merge +// tree memtable then we will use a lot memory. We should find a better way to control the +// dictionary size. /// Config for the merge tree memtable. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] @@ -68,6 +73,10 @@ pub struct MergeTreeConfig { /// Number of rows to freeze a data part. pub data_freeze_threshold: usize, /// Whether to delete duplicates rows. + /// + /// Skips deserializing as it should be determined by whether the + /// table is append only. + #[serde(skip_deserializing)] pub dedup: bool, /// Total bytes of dictionary to keep in fork. pub fork_dictionary_bytes: ReadableSize, @@ -539,4 +548,17 @@ mod tests { assert_eq!(timestamps, read); } } + + #[test] + fn test_deserialize_config() { + let config = MergeTreeConfig { + dedup: false, + ..Default::default() + }; + // Creates a json with dedup = false. + let json = serde_json::to_string(&config).unwrap(); + let config: MergeTreeConfig = serde_json::from_str(&json).unwrap(); + assert!(config.dedup); + assert_eq!(MergeTreeConfig::default(), config); + } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c32e45b87a..1fa3fb7d4f 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -32,7 +32,7 @@ use crate::access_layer::AccessLayerRef; use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; -use crate::memtable::MemtableId; +use crate::memtable::{MemtableBuilderRef, MemtableId}; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::OnFailure; use crate::sst::file_purger::FilePurgerRef; @@ -83,9 +83,10 @@ pub(crate) struct MitoRegion { last_flush_millis: AtomicI64, /// Whether the region is writable. writable: AtomicBool, - /// Provider to get current time. time_provider: TimeProviderRef, + /// Memtable builder for the region. + pub(crate) memtable_builder: MemtableBuilderRef, } pub(crate) type MitoRegionRef = Arc; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index d0ac1a5530..f99ab4e5d0 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -37,7 +37,7 @@ use crate::error::{ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::time_partition::TimePartitions; -use crate::memtable::MemtableBuilderRef; +use crate::memtable::MemtableBuilderProvider; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::MitoRegion; @@ -53,7 +53,7 @@ use crate::wal::{EntryId, Wal}; pub(crate) struct RegionOpener { region_id: RegionId, metadata: Option, - memtable_builder: MemtableBuilderRef, + memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, region_dir: String, scheduler: SchedulerRef, @@ -69,7 +69,7 @@ impl RegionOpener { pub(crate) fn new( region_id: RegionId, region_dir: &str, - memtable_builder: MemtableBuilderRef, + memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, scheduler: SchedulerRef, intermediate_manager: IntermediateManager, @@ -77,7 +77,7 @@ impl RegionOpener { RegionOpener { region_id, metadata: None, - memtable_builder, + memtable_builder_provider, object_store_manager, region_dir: normalize_dir(region_dir), scheduler, @@ -171,11 +171,14 @@ impl RegionOpener { let manifest_manager = RegionManifestManager::new(metadata.clone(), region_manifest_options).await?; + let memtable_builder = self + .memtable_builder_provider + .builder_for_options(options.memtable.as_ref()); // Initial memtable id is 0. let part_duration = options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( metadata.clone(), - self.memtable_builder, + memtable_builder.clone(), 0, part_duration, )); @@ -210,6 +213,7 @@ impl RegionOpener { // Region is writable after it is created. writable: AtomicBool::new(true), time_provider, + memtable_builder, }) } @@ -277,11 +281,14 @@ impl RegionOpener { access_layer.clone(), self.cache_manager.clone(), )); + let memtable_builder = self + .memtable_builder_provider + .builder_for_options(region_options.memtable.as_ref()); // Initial memtable id is 0. let part_duration = region_options.compaction.time_window(); let mutable = Arc::new(TimePartitions::new( metadata.clone(), - self.memtable_builder.clone(), + memtable_builder.clone(), 0, part_duration, )); @@ -329,6 +336,7 @@ impl RegionOpener { // Region is always opened in read only mode. writable: AtomicBool::new(false), time_provider, + memtable_builder, }; Ok(Some(region)) } diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 1667b57573..c45e431048 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -19,15 +19,17 @@ use std::collections::HashMap; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; use serde::de::Error as _; use serde::{Deserialize, Deserializer}; use serde_json::Value; use serde_with::{serde_as, with_prefix, DisplayFromStr}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::storage::ColumnId; -use crate::error::{Error, JsonOptionsSnafu, Result}; +use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result}; +use crate::memtable::merge_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD}; const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; @@ -48,6 +50,8 @@ pub struct RegionOptions { pub wal_options: WalOptions, /// Index options. pub index_options: IndexOptions, + /// Memtable options. + pub memtable: Option, } impl TryFrom<&HashMap> for RegionOptions { @@ -62,7 +66,11 @@ impl TryFrom<&HashMap> for RegionOptions { // See https://github.com/serde-rs/serde/issues/1626 let options: RegionOptionsWithoutEnum = serde_json::from_str(&json).context(JsonOptionsSnafu)?; - let compaction: CompactionOptions = serde_json::from_str(&json).unwrap_or_default(); + let compaction = if validate_enum_options(options_map, "compaction.type")? { + serde_json::from_str(&json).context(JsonOptionsSnafu)? + } else { + CompactionOptions::default() + }; // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map. let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else( @@ -73,6 +81,11 @@ impl TryFrom<&HashMap> for RegionOptions { )?; let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?; + let memtable = if validate_enum_options(options_map, "memtable.type")? { + Some(serde_json::from_str(&json).context(JsonOptionsSnafu)?) + } else { + None + }; Ok(RegionOptions { ttl: options.ttl, @@ -80,6 +93,7 @@ impl TryFrom<&HashMap> for RegionOptions { storage: options.storage, wal_options, index_options, + memtable, }) } } @@ -87,7 +101,7 @@ impl TryFrom<&HashMap> for RegionOptions { /// Options for compactions #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] #[serde(tag = "compaction.type")] -#[serde(rename_all = "lowercase")] +#[serde(rename_all = "snake_case")] pub enum CompactionOptions { /// Time window compaction strategy. #[serde(with = "prefix_twcs")] @@ -206,6 +220,42 @@ impl Default for InvertedIndexOptions { } } +/// Options for region level memtable. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(tag = "memtable.type", rename_all = "snake_case")] +pub enum MemtableOptions { + TimeSeries, + #[serde(with = "prefix_experimental")] + Experimental(ExperimentalOptions), +} + +with_prefix!(prefix_experimental "memtable.experimental."); + +/// Experimental memtable options. +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(default)] +pub struct ExperimentalOptions { + /// Max keys in an index shard. + #[serde_as(as = "DisplayFromStr")] + pub index_max_keys_per_shard: usize, + /// Number of rows to freeze a data part. + #[serde_as(as = "DisplayFromStr")] + pub data_freeze_threshold: usize, + /// Total bytes of dictionary to keep in fork. + pub fork_dictionary_bytes: ReadableSize, +} + +impl Default for ExperimentalOptions { + fn default() -> Self { + Self { + index_max_keys_per_shard: DEFAULT_MAX_KEYS_PER_SHARD, + data_freeze_threshold: DEFAULT_FREEZE_THRESHOLD, + fork_dictionary_bytes: ReadableSize::mb(64), + } + } +} + fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, @@ -221,25 +271,56 @@ where /// Converts the `options` map to a json object. /// -/// Converts all key-values to lowercase and replaces "null" strings by `null` json values. +/// Replaces "null" strings by `null` json values. fn options_map_to_value(options: &HashMap) -> Value { let map = options .iter() .map(|(key, value)| { - let (key, value) = (key.to_lowercase(), value.to_lowercase()); - - if value == "null" { - (key, Value::Null) + // Only convert the key to lowercase. + if value.eq_ignore_ascii_case("null") { + (key.to_string(), Value::Null) } else { - (key, Value::from(value)) + (key.to_string(), Value::from(value.to_string())) } }) .collect(); Value::Object(map) } +// `#[serde(default)]` doesn't support enum (https://github.com/serde-rs/serde/issues/1799) so we +// check the type key first. +/// Validates whether the `options_map` has valid options for specific `enum_tag_key` +/// and returns `true` if the map contains enum options. +fn validate_enum_options( + options_map: &HashMap, + enum_tag_key: &str, +) -> Result { + let enum_type = enum_tag_key.split('.').next().unwrap(); + let mut has_other_options = false; + let mut has_tag = false; + for key in options_map.keys() { + if key == enum_tag_key { + has_tag = true; + } else if key.starts_with(enum_type) { + has_other_options = true; + } + } + + // If tag is not provided, then other options for the enum should not exist. + ensure!( + has_tag || !has_other_options, + InvalidRegionOptionsSnafu { + reason: format!("missing key {} in options", enum_tag_key), + } + ); + + Ok(has_tag) +} + #[cfg(test)] mod tests { + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; use common_wal::options::KafkaWalOptions; use super::*; @@ -274,7 +355,7 @@ mod tests { let map = make_map(&[("storage", "S3")]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { - storage: Some("s3".to_string()), + storage: Some("S3".to_string()), ..Default::default() }; assert_eq!(expect, options); @@ -282,16 +363,12 @@ mod tests { #[test] fn test_without_compaction_type() { - // If `compaction.type` is not provided, we ignore all compaction - // related options. Actually serde does not support deserialize - // an enum without knowning its type. let map = make_map(&[ ("compaction.twcs.max_active_window_files", "8"), ("compaction.twcs.time_window", "2h"), ]); - let options = RegionOptions::try_from(&map).unwrap(); - let expect = RegionOptions::default(); - assert_eq!(expect, options); + let err = RegionOptions::try_from(&map).unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); } #[test] @@ -355,6 +432,32 @@ mod tests { all_wal_options.iter().all(test_with_wal_options); } + #[test] + fn test_with_memtable() { + let map = make_map(&[("memtable.type", "time_series")]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + memtable: Some(MemtableOptions::TimeSeries), + ..Default::default() + }; + assert_eq!(expect, options); + + let map = make_map(&[("memtable.type", "experimental")]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + memtable: Some(MemtableOptions::Experimental(ExperimentalOptions::default())), + ..Default::default() + }; + assert_eq!(expect, options); + } + + #[test] + fn test_unknown_memtable_type() { + let map = make_map(&[("memtable.type", "no_such_memtable")]); + let err = RegionOptions::try_from(&map).unwrap_err(); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + #[test] fn test_with_all() { let wal_options = WalOptions::Kafka(KafkaWalOptions { @@ -373,6 +476,10 @@ mod tests { WAL_OPTIONS_KEY, &serde_json::to_string(&wal_options).unwrap(), ), + ("memtable.type", "experimental"), + ("memtable.experimental.index_max_keys_per_shard", "2048"), + ("memtable.experimental.data_freeze_threshold", "2048"), + ("memtable.experimental.fork_dictionary_bytes", "128M"), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { @@ -382,7 +489,7 @@ mod tests { max_inactive_window_files: 2, time_window: Some(Duration::from_secs(3600 * 2)), }), - storage: Some("s3".to_string()), + storage: Some("S3".to_string()), wal_options, index_options: IndexOptions { inverted_index: InvertedIndexOptions { @@ -390,6 +497,11 @@ mod tests { segment_row_count: 512, }, }, + memtable: Some(MemtableOptions::Experimental(ExperimentalOptions { + index_max_keys_per_shard: 2048, + data_freeze_threshold: 2048, + fork_dictionary_bytes: ReadableSize::mb(128), + })), }; assert_eq!(expect, options); } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a8fe38e87d..6f4f5d7692 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -50,7 +50,7 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef use crate::manifest::action::RegionEdit; use crate::memtable::merge_tree::MergeTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; -use crate::memtable::{MemtableBuilderRef, MemtableConfig}; +use crate::memtable::{MemtableBuilderProvider, MemtableConfig}; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -338,7 +338,8 @@ impl WorkerStarter { let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); - let memtable_builder = match &self.config.memtable { + + let default_memtable_builder = match &self.config.memtable { MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new( merge_tree.clone(), Some(self.write_buffer_manager.clone()), @@ -358,7 +359,10 @@ impl WorkerStarter { wal: Wal::new(self.log_store), object_store_manager: self.object_store_manager.clone(), running: running.clone(), - memtable_builder, + memtable_builder_provider: MemtableBuilderProvider::new( + Some(self.write_buffer_manager.clone()), + default_memtable_builder, + ), scheduler: self.scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), @@ -513,8 +517,8 @@ struct RegionWorkerLoop { object_store_manager: ObjectStoreManagerRef, /// Whether the worker thread is still running. running: Arc, - /// Memtable builder for each region. - memtable_builder: MemtableBuilderRef, + /// Memtable builder provider for each region. + memtable_builder_provider: MemtableBuilderProvider, /// Background job scheduler. scheduler: SchedulerRef, /// Engine write buffer manager. diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index eefee19233..d0d6e51039 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -27,7 +27,6 @@ use crate::error::{ }; use crate::flush::FlushReason; use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; -use crate::memtable::MemtableBuilderRef; use crate::region::version::Version; use crate::region::MitoRegionRef; use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; @@ -109,9 +108,7 @@ impl RegionWorkerLoop { } // Now we can alter the region directly. - if let Err(e) = - alter_region_schema(®ion, &version, request, &self.memtable_builder).await - { + if let Err(e) = alter_region_schema(®ion, &version, request).await { error!(e; "Failed to alter region schema, region_id: {}", region_id); sender.send(Err(e)); return; @@ -134,7 +131,6 @@ async fn alter_region_schema( region: &MitoRegionRef, version: &Version, request: RegionAlterRequest, - builder: &MemtableBuilderRef, ) -> Result<()> { let new_meta = metadata_after_alteration(&version.metadata, request)?; // Persist the metadata to region's manifest. @@ -145,7 +141,9 @@ async fn alter_region_schema( region.manifest_manager.update(action_list).await?; // Apply the metadata to region's version. - region.version_control.alter_schema(new_meta, builder); + region + .version_control + .alter_schema(new_meta, ®ion.memtable_builder); Ok(()) } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 3622793273..4a9730dd7a 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -51,7 +51,7 @@ impl RegionWorkerLoop { RegionOpener::new( region_id, region.region_dir(), - self.memtable_builder.clone(), + self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.scheduler.clone(), self.intermediate_manager.clone(), diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 0a87ba2ed5..6b54289637 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -58,7 +58,7 @@ impl RegionWorkerLoop { let region = RegionOpener::new( region_id, &request.region_dir, - self.memtable_builder.clone(), + self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.scheduler.clone(), self.intermediate_manager.clone(), diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index fa0d1181a9..490fa432aa 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -61,7 +61,9 @@ impl RegionWorkerLoop { self.compaction_scheduler.on_region_dropped(region_id); // mark region version as dropped - region.version_control.mark_dropped(&self.memtable_builder); + region + .version_control + .mark_dropped(®ion.memtable_builder); info!( "Region {} is dropped logically, but some files are not deleted yet", region_id diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 9163b6f174..884012473e 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -65,7 +65,7 @@ impl RegionWorkerLoop { let region = RegionOpener::new( region_id, &request.region_dir, - self.memtable_builder.clone(), + self.memtable_builder_provider.clone(), self.object_store_manager.clone(), self.scheduler.clone(), self.intermediate_manager.clone(), diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 811a6f2c99..c853f5eb03 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -55,7 +55,7 @@ impl RegionWorkerLoop { region.version_control.truncate( truncated_entry_id, truncated_sequence, - &self.memtable_builder, + ®ion.memtable_builder, ); // Make all data obsolete. diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 7ef963a4c0..26b62551ec 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -29,6 +29,10 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "index.inverted_index.ignore_column_ids", "index.inverted_index.segment_row_count", WAL_OPTIONS_KEY, + "memtable.type", + "memtable.experimental.index_max_keys_per_shard", + "memtable.experimental.data_freeze_threshold", + "memtable.experimental.fork_dictionary_bytes", ] .contains(&key) } @@ -56,6 +60,16 @@ mod tests { "index.inverted_index.segment_row_count" )); assert!(is_mito_engine_option_key("wal_options")); + assert!(is_mito_engine_option_key("memtable.type")); + assert!(is_mito_engine_option_key( + "memtable.experimental.index_max_keys_per_shard" + )); + assert!(is_mito_engine_option_key( + "memtable.experimental.data_freeze_threshold" + )); + assert!(is_mito_engine_option_key( + "memtable.experimental.fork_dictionary_bytes" + )); assert!(!is_mito_engine_option_key("foo")); } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a75e9e2196..abf207fd35 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -787,11 +787,7 @@ mem_threshold_on_create = "64.0MiB" intermediate_path = "" [datanode.region_engine.mito.memtable] -type = "experimental" -index_max_keys_per_shard = 8192 -data_freeze_threshold = 131072 -dedup = true -fork_dictionary_bytes = "1GiB" +type = "time_series" [[datanode.region_engine]] diff --git a/tests/cases/standalone/common/create/create_with_options.result b/tests/cases/standalone/common/create/create_with_options.result index e50746d127..5dd872c7f7 100644 --- a/tests/cases/standalone/common/create/create_with_options.result +++ b/tests/cases/standalone/common/create/create_with_options.result @@ -74,6 +74,7 @@ with( 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', 'wal_options'='{"wal.provider":"raft_engine"}', + 'memtable.type' = 'experimental', ); Affected Rows: 0 @@ -82,3 +83,15 @@ drop table test_mito_options; Affected Rows: 0 +create table if not exists invalid_compaction( + host string, + ts timestamp, + memory double, + TIME INDEX (ts), + PRIMARY KEY(host) +) +engine=mito +with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d'); + +Error: 1004(InvalidArguments), Invalid options: invalid digit found in string + diff --git a/tests/cases/standalone/common/create/create_with_options.sql b/tests/cases/standalone/common/create/create_with_options.sql index 094d905eda..1f3c533412 100644 --- a/tests/cases/standalone/common/create/create_with_options.sql +++ b/tests/cases/standalone/common/create/create_with_options.sql @@ -64,6 +64,17 @@ with( 'index.inverted_index.ignore_column_ids'='1,2,3', 'index.inverted_index.segment_row_count'='512', 'wal_options'='{"wal.provider":"raft_engine"}', + 'memtable.type' = 'experimental', ); drop table test_mito_options; + +create table if not exists invalid_compaction( + host string, + ts timestamp, + memory double, + TIME INDEX (ts), + PRIMARY KEY(host) +) +engine=mito +with('compaction.type'='twcs', 'compaction.twcs.max_active_window_files'='8d'); diff --git a/tests/cases/standalone/common/show/show_create.result b/tests/cases/standalone/common/show/show_create.result index b016174a38..60bd139fcf 100644 --- a/tests/cases/standalone/common/show/show_create.result +++ b/tests/cases/standalone/common/show/show_create.result @@ -96,5 +96,5 @@ WITH( storage = 'S3' ); -Error: 1004(InvalidArguments), Object store not found: s3 +Error: 1004(InvalidArguments), Object store not found: S3