From a9c342b0f7c34f547fecd3dc131f55a89e8a1321 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 13 Oct 2025 16:38:37 +0800 Subject: [PATCH] feat: support setting sst_format in table options (#7068) * feat: add FormatType to support multi format in the future Signed-off-by: evenyag * feat: add sst_format to RegionOptions Signed-off-by: evenyag * feat: sets the sst_format based on RegionOptions Signed-off-by: evenyag * feat: add sst_format to mito table options Signed-off-by: evenyag * fix: fix RegionManifest deserialization without sst_format Signed-off-by: evenyag * refactor: remove Parquet suffix from FormatType Signed-off-by: evenyag * feat: prefer RegionOptions::sst_format in compactor/memtable builder Signed-off-by: evenyag * refactor: rename enable_experimental_flat_format to default_experimental_flat_format Signed-off-by: evenyag * docs: update config.md Signed-off-by: evenyag * style: fmt Signed-off-by: evenyag * test: update manifest test Signed-off-by: evenyag * chore: fix compiler errors, handle sst_format in remap_manifest Signed-off-by: evenyag --------- Signed-off-by: evenyag --- config/config.md | 4 +- config/datanode.example.toml | 4 +- config/standalone.example.toml | 4 +- src/mito2/src/compaction.rs | 2 + src/mito2/src/compaction/compactor.rs | 15 ++++++-- src/mito2/src/compaction/window.rs | 1 + src/mito2/src/config.rs | 6 +-- src/mito2/src/engine.rs | 2 +- src/mito2/src/manifest/action.rs | 47 +++++++++++++++++++++++ src/mito2/src/manifest/manager.rs | 12 +++++- src/mito2/src/memtable.rs | 43 +++++++++++---------- src/mito2/src/region.rs | 19 +++++++-- src/mito2/src/region/opener.rs | 30 ++++++++++----- src/mito2/src/region/options.rs | 10 +++++ src/mito2/src/remap_manifest.rs | 4 ++ src/mito2/src/sst.rs | 13 +++++++ src/mito2/src/test_util.rs | 2 + src/mito2/src/test_util/scheduler_util.rs | 2 + src/mito2/src/worker/handle_alter.rs | 5 ++- src/store-api/src/mito_engine_options.rs | 3 ++ tests-integration/tests/http.rs | 2 +- 21 files changed, 178 insertions(+), 52 deletions(-) diff --git a/config/config.md b/config/config.md index 58c538d4ba..92f192e6df 100644 --- a/config/config.md +++ b/config/config.md @@ -153,7 +153,7 @@ | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | -| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. | +| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | @@ -547,7 +547,7 @@ | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | | `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. | | `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). | -| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. | +| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. | | `region_engine.mito.index` | -- | -- | The options for index in Mito engine. | | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 07c52bae9b..e283967680 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -500,8 +500,8 @@ allow_stale_entries = false ## To align with the old behavior, the default value is 0 (no restrictions). min_compaction_interval = "0m" -## Whether to enable experimental flat format. -enable_experimental_flat_format = false +## Whether to enable experimental flat format as the default format. +default_experimental_flat_format = false ## The options for index in Mito engine. [region_engine.mito.index] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 72df1a4184..5fae0f444f 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -584,8 +584,8 @@ allow_stale_entries = false ## To align with the old behavior, the default value is 0 (no restrictions). min_compaction_interval = "0m" -## Whether to enable experimental flat format. -enable_experimental_flat_format = false +## Whether to enable experimental flat format as the default format. +default_experimental_flat_format = false ## The options for index in Mito engine. [region_engine.mito.index] diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5be6e7c7c5..d83ed7ab7d 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -777,6 +777,7 @@ mod tests { use super::*; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::ManifestContext; + use crate::sst::FormatType; use crate::test_util::mock_schema_metadata_manager; use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler}; use crate::test_util::version_util::{VersionControlBuilder, apply_edit}; @@ -1111,6 +1112,7 @@ mod tests { }, Default::default(), Default::default(), + FormatType::PrimaryKey, ) .await .unwrap(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e0c9ef410b..ba267f4a48 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -48,6 +48,7 @@ use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; +use crate::sst::FormatType; use crate::sst::file::FileMeta; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; @@ -343,8 +344,14 @@ impl Compactor for DefaultCompactor { let append_mode = compaction_region.current_version.options.append_mode; let merge_mode = compaction_region.current_version.options.merge_mode(); let flat_format = compaction_region - .engine_config - .enable_experimental_flat_format; + .region_options + .sst_format + .map(|format| format == FormatType::Flat) + .unwrap_or( + compaction_region + .engine_config + .default_experimental_flat_format, + ); let index_config = compaction_region.engine_config.index.clone(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); @@ -431,8 +438,8 @@ impl Compactor for DefaultCompactor { let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(","); info!( - "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}", - region_id, input_file_names, output_file_names, metrics + "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}", + region_id, input_file_names, output_file_names, flat_format, metrics ); metrics.observe(); Ok(output_files) diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 8a5a878257..9ffaa654c4 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -250,6 +250,7 @@ mod tests { index_options: Default::default(), memtable: None, merge_mode: None, + sst_format: None, }, compaction_time_window: None, } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 20e7550b2f..ec528de295 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -144,9 +144,9 @@ pub struct MitoConfig { #[serde(with = "humantime_serde")] pub min_compaction_interval: Duration, - /// Whether to enable experimental flat format. + /// Whether to enable experimental flat format as the default format. /// When enabled, forces using BulkMemtable and BulkMemtableBuilder. - pub enable_experimental_flat_format: bool, + pub default_experimental_flat_format: bool, } impl Default for MitoConfig { @@ -184,7 +184,7 @@ impl Default for MitoConfig { bloom_filter_index: BloomFilterConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), - enable_experimental_flat_format: false, + default_experimental_flat_format: false, }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 20c524c61d..c10c398681 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -695,7 +695,7 @@ impl EngineInner { .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) .with_start_time(query_start) - .with_flat_format(self.config.enable_experimental_flat_format); + .with_flat_format(self.config.default_experimental_flat_format); #[cfg(feature = "enterprise")] let scan_region = self.maybe_fill_extension_range_provider(scan_region, region); diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 8d3e943bf5..af09e6c861 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -29,6 +29,7 @@ use crate::error::{ DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu, }; use crate::manifest::manager::RemoveFileOptions; +use crate::sst::FormatType; use crate::sst::file::FileMeta; use crate::wal::EntryId; @@ -49,6 +50,9 @@ pub enum RegionMetaAction { pub struct RegionChange { /// The metadata after changed. pub metadata: RegionMetadataRef, + /// Format of the SST. + #[serde(default)] + pub sst_format: FormatType, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -128,6 +132,9 @@ pub struct RegionManifest { /// Inferred compaction time window. #[serde(with = "humantime_serde")] pub compaction_time_window: Option, + /// Format of the SST file. + #[serde(default)] + pub sst_format: FormatType, } #[cfg(test)] @@ -155,6 +162,7 @@ pub struct RegionManifestBuilder { truncated_entry_id: Option, compaction_time_window: Option, committed_sequence: Option, + sst_format: FormatType, } impl RegionManifestBuilder { @@ -171,6 +179,7 @@ impl RegionManifestBuilder { truncated_entry_id: s.truncated_entry_id, compaction_time_window: s.compaction_time_window, committed_sequence: s.committed_sequence, + sst_format: s.sst_format, } } else { Default::default() @@ -180,6 +189,7 @@ impl RegionManifestBuilder { pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) { self.metadata = Some(change.metadata); self.manifest_version = manifest_version; + self.sst_format = change.sst_format; } pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) { @@ -269,6 +279,7 @@ impl RegionManifestBuilder { manifest_version: self.manifest_version, truncated_entry_id: self.truncated_entry_id, compaction_time_window: self.compaction_time_window, + sst_format: self.sst_format, }) } } @@ -472,6 +483,7 @@ mod tests { }"#; let _ = serde_json::from_str::(region_edit).unwrap(); + // Note: For backward compatibility, the test accepts a RegionChange without sst_format let region_change = r#" { "metadata":{ "column_metadatas":[ @@ -729,6 +741,7 @@ mod tests { .unwrap()]), }], }, + sst_format: FormatType::PrimaryKey, }; let json = serde_json::to_string(&manifest).unwrap(); @@ -830,6 +843,7 @@ mod tests { manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, + sst_format: FormatType::PrimaryKey, } ); @@ -843,6 +857,7 @@ mod tests { manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, + sst_format: FormatType::PrimaryKey, }; let json = serde_json::to_string(&new_manifest).unwrap(); let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap(); @@ -916,4 +931,36 @@ mod tests { old_from_new ); } + + #[test] + fn test_region_change_backward_compatibility() { + // Test that we can deserialize a RegionChange without sst_format + let region_change_json = r#"{ + "metadata": { + "column_metadatas": [ + {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1}, + {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2}, + {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3} + ], + "primary_key": [ + 1 + ], + "region_id": 42, + "schema_version": 0 + } + }"#; + + let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap(); + assert_eq!(region_change.sst_format, FormatType::PrimaryKey); + + // Test serialization and deserialization with sst_format + let region_change = RegionChange { + metadata: region_change.metadata.clone(), + sst_format: FormatType::Flat, + }; + + let serialized = serde_json::to_string(®ion_change).unwrap(); + let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.sst_format, FormatType::Flat); + } } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 852fbbe523..c2cd4877fe 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -36,6 +36,7 @@ use crate::manifest::storage::{ }; use crate::metrics::MANIFEST_OP_ELAPSED; use crate::region::{RegionLeaderState, RegionRoleState}; +use crate::sst::FormatType; /// Options for [RegionManifestManager]. #[derive(Debug, Clone)] @@ -154,6 +155,7 @@ impl RegionManifestManager { options: RegionManifestOptions, total_manifest_size: Arc, manifest_version: Arc, + sst_format: FormatType, ) -> Result { // construct storage let mut store = ManifestObjectStore::new( @@ -175,6 +177,7 @@ impl RegionManifestManager { version, RegionChange { metadata: metadata.clone(), + sst_format, }, ); let manifest = manifest_builder.try_build()?; @@ -185,7 +188,10 @@ impl RegionManifestManager { options.manifest_dir, manifest ); - let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })]; + let mut actions = vec![RegionMetaAction::Change(RegionChange { + metadata, + sst_format, + })]; if flushed_entry_id > 0 { actions.push(RegionMetaAction::Edit(RegionEdit { files_to_add: vec![], @@ -792,6 +798,7 @@ mod test { let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: new_metadata.clone(), + sst_format: FormatType::PrimaryKey, })); let current_version = manager @@ -860,6 +867,7 @@ mod test { let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: new_metadata.clone(), + sst_format: FormatType::PrimaryKey, })); let current_version = manager @@ -915,6 +923,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1721); + assert_eq!(manifest_size, 1748); } } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 7aacffa34b..2744f24890 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -40,7 +40,8 @@ use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; use crate::read::prune::PruneTimeIterator; use crate::read::scan_region::PredicateGroup; -use crate::region::options::{MemtableOptions, MergeMode}; +use crate::region::options::{MemtableOptions, MergeMode, RegionOptions}; +use crate::sst::FormatType; use crate::sst::file::FileTimeRange; use crate::sst::parquet::SstInfo; @@ -324,7 +325,7 @@ impl Drop for AllocTracker { pub(crate) struct MemtableBuilderProvider { write_buffer_manager: Option, config: Arc, - compact_dispatcher: Option>, + compact_dispatcher: Arc, } impl MemtableBuilderProvider { @@ -332,9 +333,8 @@ impl MemtableBuilderProvider { write_buffer_manager: Option, config: Arc, ) -> Self { - let compact_dispatcher = config - .enable_experimental_flat_format - .then(|| Arc::new(CompactDispatcher::new(config.max_background_compactions))); + let compact_dispatcher = + Arc::new(CompactDispatcher::new(config.max_background_compactions)); Self { write_buffer_manager, @@ -343,16 +343,19 @@ impl MemtableBuilderProvider { } } - pub(crate) fn builder_for_options( - &self, - options: Option<&MemtableOptions>, - dedup: bool, - merge_mode: MergeMode, - ) -> MemtableBuilderRef { - if self.config.enable_experimental_flat_format { - common_telemetry::info!( - "Overriding memtable config, use BulkMemtable under flat format" - ); + pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef { + let dedup = options.need_dedup(); + let merge_mode = options.merge_mode(); + let flat_format = options + .sst_format + .map(|format| format == FormatType::Flat) + .unwrap_or(self.config.default_experimental_flat_format); + if flat_format { + if options.memtable.is_some() { + common_telemetry::info!( + "Overriding memtable config, use BulkMemtable under flat format" + ); + } return Arc::new( BulkMemtableBuilder::new( @@ -360,12 +363,11 @@ impl MemtableBuilderProvider { !dedup, // append_mode: true if not dedup, false if dedup merge_mode, ) - // Safety: We create the dispatcher if flat_format is enabled. - .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()), + .with_compact_dispatcher(self.compact_dispatcher.clone()), ); } - match options { + match &options.memtable { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), dedup, @@ -388,15 +390,14 @@ impl MemtableBuilderProvider { } fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef { - if self.config.enable_experimental_flat_format { + if self.config.default_experimental_flat_format { return Arc::new( BulkMemtableBuilder::new( self.write_buffer_manager.clone(), !dedup, // append_mode: true if not dedup, false if dedup merge_mode, ) - // Safety: We create the dispatcher if flat_format is enabled. - .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()), + .with_compact_dispatcher(self.compact_dispatcher.clone()), ); } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index b611387fd2..16933f6827 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -49,6 +49,7 @@ use crate::manifest::manager::RegionManifestManager; use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OnFailure, OptionOutputTx}; +use crate::sst::FormatType; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location::{index_file_path, sst_file_path}; use crate::time_provider::TimeProviderRef; @@ -131,7 +132,7 @@ pub struct MitoRegion { /// /// The value will be updated to the latest offset of the topic /// if region receives a flush request or schedules a periodic flush task - /// and the region's memtable is empty. + /// and the region's memtable is empty. /// /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region, /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`. @@ -140,6 +141,8 @@ pub struct MitoRegion { pub(crate) written_bytes: Arc, /// Memtable builder for the region. pub(crate) memtable_builder: MemtableBuilderRef, + /// Format type of the SST file. + pub(crate) sst_format: FormatType, /// manifest stats stats: ManifestStats, } @@ -191,12 +194,17 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } - /// Return last compaction time in millis. + /// Returns last compaction timestamp in millis. pub(crate) fn last_compaction_millis(&self) -> i64 { self.last_compaction_millis.load(Ordering::Relaxed) } - /// Update compaction time to now millis. + /// Returns format type of the SST file. + pub(crate) fn sst_format(&self) -> FormatType { + self.sst_format + } + + /// Update compaction time to current time. pub(crate) fn update_compaction_millis(&self) { let now = self.time_provider.current_time_millis(); self.last_compaction_millis.store(now, Ordering::Relaxed); @@ -443,6 +451,7 @@ impl MitoRegion { if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() { let action = RegionMetaAction::Change(RegionChange { metadata: current_meta.clone(), + sst_format: self.sst_format(), }); let result = manager .update( @@ -1155,6 +1164,7 @@ mod tests { use crate::region::{ ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState, }; + use crate::sst::FormatType; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::test_util::memtable_util::EmptyMemtableBuilder; @@ -1239,6 +1249,7 @@ mod tests { }, Default::default(), Default::default(), + FormatType::PrimaryKey, ) .await .unwrap(); @@ -1305,6 +1316,7 @@ mod tests { }, Default::default(), Default::default(), + FormatType::PrimaryKey, ) .await .unwrap(); @@ -1327,6 +1339,7 @@ mod tests { topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), memtable_builder: Arc::new(EmptyMemtableBuilder::default()), + sst_format: FormatType::PrimaryKey, stats: ManifestStats::default(), }; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 4b9fd32e6b..95aef2776a 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -60,6 +60,7 @@ use crate::region::{ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; +use crate::sst::FormatType; use crate::sst::file_purger::create_local_file_purger; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; @@ -254,6 +255,15 @@ impl RegionOpener { let object_store = get_object_store(&options.storage, &self.object_store_manager)?; let provider = self.provider::(&options.wal_options)?; let metadata = Arc::new(metadata); + // Set the sst_format based on options or flat_format flag + let sst_format = if let Some(format) = options.sst_format { + format + } else if config.default_experimental_flat_format { + FormatType::Flat + } else { + // Default to PrimaryKeyParquet for newly created regions + FormatType::PrimaryKey + }; // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?; @@ -265,14 +275,11 @@ impl RegionOpener { region_manifest_options, self.stats.total_manifest_size.clone(), self.stats.manifest_version.clone(), + sst_format, ) .await?; - let memtable_builder = self.memtable_builder_provider.builder_for_options( - options.memtable.as_ref(), - options.need_dedup(), - options.merge_mode(), - ); + let memtable_builder = self.memtable_builder_provider.builder_for_options(&options); let part_duration = options.compaction.time_window(); // Initial memtable id is 0. let mutable = Arc::new(TimePartitions::new( @@ -319,6 +326,7 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(0), memtable_builder, written_bytes: Arc::new(AtomicU64::new(0)), + sst_format, stats: self.stats, }) } @@ -445,11 +453,9 @@ impl RegionOpener { self.cache_manager.clone(), self.file_ref_manager.clone(), ); - let memtable_builder = self.memtable_builder_provider.builder_for_options( - region_options.memtable.as_ref(), - region_options.need_dedup(), - region_options.merge_mode(), - ); + let memtable_builder = self + .memtable_builder_provider + .builder_for_options(®ion_options); // Use compaction time window in the manifest if region doesn't provide // the time window option. let part_duration = region_options @@ -525,6 +531,9 @@ impl RegionOpener { } let now = self.time_provider.current_time_millis(); + // Read sst_format from manifest + let sst_format = manifest.sst_format; + let region = MitoRegion { region_id: self.region_id, version_control, @@ -542,6 +551,7 @@ impl RegionOpener { topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), memtable_builder, + sst_format, stats: self.stats.clone(), }; Ok(Some(region)) diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 4d08a613ea..363c756eb8 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -33,6 +33,7 @@ use strum::EnumString; use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result}; use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD}; +use crate::sst::FormatType; const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024; @@ -73,6 +74,8 @@ pub struct RegionOptions { /// The mode to merge duplicate rows. /// Only takes effect when `append_mode` is `false`. pub merge_mode: Option, + /// SST format type. + pub sst_format: Option, } impl RegionOptions { @@ -151,6 +154,7 @@ impl TryFrom<&HashMap> for RegionOptions { index_options, memtable, merge_mode: options.merge_mode, + sst_format: options.sst_format, }; opts.validate()?; @@ -256,6 +260,8 @@ struct RegionOptionsWithoutEnum { append_mode: bool, #[serde_as(as = "NoneAsEmptyString")] merge_mode: Option, + #[serde_as(as = "NoneAsEmptyString")] + sst_format: Option, } impl Default for RegionOptionsWithoutEnum { @@ -266,6 +272,7 @@ impl Default for RegionOptionsWithoutEnum { storage: options.storage, append_mode: options.append_mode, merge_mode: options.merge_mode, + sst_format: options.sst_format, } } } @@ -652,6 +659,7 @@ mod tests { primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), + sst_format: None, }; assert_eq!(expect, options); } @@ -685,6 +693,7 @@ mod tests { primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), + sst_format: None, }; let region_options_json_str = serde_json::to_string(&options).unwrap(); let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap(); @@ -748,6 +757,7 @@ mod tests { primary_key_encoding: PrimaryKeyEncoding::Dense, })), merge_mode: Some(MergeMode::LastNonNull), + sst_format: None, }; assert_eq!(options, got); } diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs index ee0e690960..ed993b41b6 100644 --- a/src/mito2/src/remap_manifest.rs +++ b/src/mito2/src/remap_manifest.rs @@ -87,6 +87,7 @@ impl RemapManifest { .next() .context(error::NoOldManifestsSnafu)?; let template_metadata = (*template_manifest.metadata).clone(); + let sst_format = template_manifest.sst_format; // Create empty manifest for each new region for region_id in self.new_partition_exprs.keys() { @@ -114,6 +115,7 @@ impl RemapManifest { manifest_version: 0, truncated_entry_id: None, compaction_time_window: None, + sst_format, }; new_manifests.insert(*region_id, manifest); @@ -372,6 +374,7 @@ mod tests { use super::*; use crate::manifest::action::RegionManifest; + use crate::sst::FormatType; use crate::sst::file::{FileMeta, FileTimeRange}; use crate::wal::EntryId; @@ -452,6 +455,7 @@ mod tests { truncated_entry_id: None, compaction_time_window: None, committed_sequence: None, + sst_format: FormatType::PrimaryKey, } } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index b3e6fdbca2..1d94e74eaa 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -22,6 +22,7 @@ use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, }; use datatypes::prelude::ConcreteDataType; +use serde::{Deserialize, Serialize}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::RegionMetadata; use store_api::storage::consts::{ @@ -42,6 +43,18 @@ pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); /// Default number of concurrent write, it only works on object store backend(e.g., S3). pub const DEFAULT_WRITE_CONCURRENCY: usize = 8; +/// Format type of the SST file. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum FormatType { + /// Parquet with primary key encoded. + #[default] + PrimaryKey, + /// Flat Parquet format. + Flat, +} + /// Gets the arrow schema to store in parquet. pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef { let fields = Fields::from_iter( diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 443377170a..8deba23ff8 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -74,6 +74,7 @@ use crate::flush::{WriteBufferManager, WriteBufferManagerRef}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::region::opener::{PartitionExprFetcher, PartitionExprFetcherRef}; +use crate::sst::FormatType; use crate::sst::file_purger::{FilePurgerRef, NoopFilePurger}; use crate::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef}; use crate::sst::index::intermediate::IntermediateManager; @@ -609,6 +610,7 @@ impl TestEnv { manifest_opts, Default::default(), Default::default(), + FormatType::PrimaryKey, ) .await .map(Some) diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 4e3bc9fa62..3fbfbd0ad1 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -35,6 +35,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState}; use crate::request::WorkerRequestWithTime; use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef}; +use crate::sst::FormatType; use crate::sst::index::IndexBuildScheduler; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -134,6 +135,7 @@ impl SchedulerEnv { }, Default::default(), Default::default(), + FormatType::PrimaryKey, ) .await .unwrap(), diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index aff97e09a1..23d00ab4e7 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -154,7 +154,10 @@ impl RegionWorkerLoop { } }; // Persist the metadata to region's manifest. - let change = RegionChange { metadata: new_meta }; + let change = RegionChange { + metadata: new_meta, + sst_format: region.sst_format(), + }; self.handle_manifest_region_change(region, change, sender) } diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 83a3fd7063..99e7166f43 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -55,6 +55,8 @@ pub const MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES: &str = "memtable.partition_tree.fork_dictionary_bytes"; /// Option key for skipping WAL. pub const SKIP_WAL_KEY: &str = "skip_wal"; +/// Option key for sst format. +pub const SST_FORMAT_KEY: &str = "sst_format"; // Note: Adding new options here should also check if this option should be removed in [metric_engine::engine::create::region_options_for_metadata_region]. /// Returns true if the `key` is a valid option key for the mito engine. @@ -78,6 +80,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING, APPEND_MODE_KEY, MERGE_MODE_KEY, + SST_FORMAT_KEY, ] .contains(&key) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 347a8d12c9..538392e437 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1502,7 +1502,7 @@ parallel_scan_channel_size = 32 max_concurrent_scan_files = 384 allow_stale_entries = false min_compaction_interval = "0s" -enable_experimental_flat_format = false +default_experimental_flat_format = false [region_engine.mito.index] aux_path = ""