diff --git a/config/config.md b/config/config.md
index 58c538d4ba..92f192e6df 100644
--- a/config/config.md
+++ b/config/config.md
@@ -153,7 +153,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
-| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
+| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.
This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -547,7 +547,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
-| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
+| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.
This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 07c52bae9b..e283967680 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -500,8 +500,8 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
-## Whether to enable experimental flat format.
-enable_experimental_flat_format = false
+## Whether to enable experimental flat format as the default format.
+default_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 72df1a4184..5fae0f444f 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -584,8 +584,8 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
-## Whether to enable experimental flat format.
-enable_experimental_flat_format = false
+## Whether to enable experimental flat format as the default format.
+default_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index 5be6e7c7c5..d83ed7ab7d 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -777,6 +777,7 @@ mod tests {
use super::*;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
+ use crate::sst::FormatType;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
@@ -1111,6 +1112,7 @@ mod tests {
},
Default::default(),
Default::default(),
+ FormatType::PrimaryKey,
)
.await
.unwrap();
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index e0c9ef410b..ba267f4a48 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -48,6 +48,7 @@ use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
use crate::schedule::scheduler::LocalScheduler;
+use crate::sst::FormatType;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
@@ -343,8 +344,14 @@ impl Compactor for DefaultCompactor {
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
- .engine_config
- .enable_experimental_flat_format;
+ .region_options
+ .sst_format
+ .map(|format| format == FormatType::Flat)
+ .unwrap_or(
+ compaction_region
+ .engine_config
+ .default_experimental_flat_format,
+ );
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
@@ -431,8 +438,8 @@ impl Compactor for DefaultCompactor {
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
- "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
- region_id, input_file_names, output_file_names, metrics
+ "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
+ region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs
index 8a5a878257..9ffaa654c4 100644
--- a/src/mito2/src/compaction/window.rs
+++ b/src/mito2/src/compaction/window.rs
@@ -250,6 +250,7 @@ mod tests {
index_options: Default::default(),
memtable: None,
merge_mode: None,
+ sst_format: None,
},
compaction_time_window: None,
}
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 20e7550b2f..ec528de295 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -144,9 +144,9 @@ pub struct MitoConfig {
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
- /// Whether to enable experimental flat format.
+ /// Whether to enable experimental flat format as the default format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
- pub enable_experimental_flat_format: bool,
+ pub default_experimental_flat_format: bool,
}
impl Default for MitoConfig {
@@ -184,7 +184,7 @@ impl Default for MitoConfig {
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
- enable_experimental_flat_format: false,
+ default_experimental_flat_format: false,
};
// Adjust buffer and cache size according to system memory if we can.
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index 20c524c61d..c10c398681 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -695,7 +695,7 @@ impl EngineInner {
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start)
- .with_flat_format(self.config.enable_experimental_flat_format);
+ .with_flat_format(self.config.default_experimental_flat_format);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs
index 8d3e943bf5..af09e6c861 100644
--- a/src/mito2/src/manifest/action.rs
+++ b/src/mito2/src/manifest/action.rs
@@ -29,6 +29,7 @@ use crate::error::{
DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::manifest::manager::RemoveFileOptions;
+use crate::sst::FormatType;
use crate::sst::file::FileMeta;
use crate::wal::EntryId;
@@ -49,6 +50,9 @@ pub enum RegionMetaAction {
pub struct RegionChange {
/// The metadata after changed.
pub metadata: RegionMetadataRef,
+ /// Format of the SST.
+ #[serde(default)]
+ pub sst_format: FormatType,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -128,6 +132,9 @@ pub struct RegionManifest {
/// Inferred compaction time window.
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option,
+ /// Format of the SST file.
+ #[serde(default)]
+ pub sst_format: FormatType,
}
#[cfg(test)]
@@ -155,6 +162,7 @@ pub struct RegionManifestBuilder {
truncated_entry_id: Option,
compaction_time_window: Option,
committed_sequence: Option,
+ sst_format: FormatType,
}
impl RegionManifestBuilder {
@@ -171,6 +179,7 @@ impl RegionManifestBuilder {
truncated_entry_id: s.truncated_entry_id,
compaction_time_window: s.compaction_time_window,
committed_sequence: s.committed_sequence,
+ sst_format: s.sst_format,
}
} else {
Default::default()
@@ -180,6 +189,7 @@ impl RegionManifestBuilder {
pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
self.metadata = Some(change.metadata);
self.manifest_version = manifest_version;
+ self.sst_format = change.sst_format;
}
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
@@ -269,6 +279,7 @@ impl RegionManifestBuilder {
manifest_version: self.manifest_version,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
+ sst_format: self.sst_format,
})
}
}
@@ -472,6 +483,7 @@ mod tests {
}"#;
let _ = serde_json::from_str::(region_edit).unwrap();
+ // Note: For backward compatibility, the test accepts a RegionChange without sst_format
let region_change = r#" {
"metadata":{
"column_metadatas":[
@@ -729,6 +741,7 @@ mod tests {
.unwrap()]),
}],
},
+ sst_format: FormatType::PrimaryKey,
};
let json = serde_json::to_string(&manifest).unwrap();
@@ -830,6 +843,7 @@ mod tests {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
+ sst_format: FormatType::PrimaryKey,
}
);
@@ -843,6 +857,7 @@ mod tests {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
+ sst_format: FormatType::PrimaryKey,
};
let json = serde_json::to_string(&new_manifest).unwrap();
let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
@@ -916,4 +931,36 @@ mod tests {
old_from_new
);
}
+
+ #[test]
+ fn test_region_change_backward_compatibility() {
+ // Test that we can deserialize a RegionChange without sst_format
+ let region_change_json = r#"{
+ "metadata": {
+ "column_metadatas": [
+ {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
+ {"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
+ {"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
+ ],
+ "primary_key": [
+ 1
+ ],
+ "region_id": 42,
+ "schema_version": 0
+ }
+ }"#;
+
+ let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
+ assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
+
+ // Test serialization and deserialization with sst_format
+ let region_change = RegionChange {
+ metadata: region_change.metadata.clone(),
+ sst_format: FormatType::Flat,
+ };
+
+ let serialized = serde_json::to_string(®ion_change).unwrap();
+ let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
+ assert_eq!(deserialized.sst_format, FormatType::Flat);
+ }
}
diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs
index 852fbbe523..c2cd4877fe 100644
--- a/src/mito2/src/manifest/manager.rs
+++ b/src/mito2/src/manifest/manager.rs
@@ -36,6 +36,7 @@ use crate::manifest::storage::{
};
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
+use crate::sst::FormatType;
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
@@ -154,6 +155,7 @@ impl RegionManifestManager {
options: RegionManifestOptions,
total_manifest_size: Arc,
manifest_version: Arc,
+ sst_format: FormatType,
) -> Result {
// construct storage
let mut store = ManifestObjectStore::new(
@@ -175,6 +177,7 @@ impl RegionManifestManager {
version,
RegionChange {
metadata: metadata.clone(),
+ sst_format,
},
);
let manifest = manifest_builder.try_build()?;
@@ -185,7 +188,10 @@ impl RegionManifestManager {
options.manifest_dir, manifest
);
- let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
+ let mut actions = vec![RegionMetaAction::Change(RegionChange {
+ metadata,
+ sst_format,
+ })];
if flushed_entry_id > 0 {
actions.push(RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
@@ -792,6 +798,7 @@ mod test {
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
+ sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
@@ -860,6 +867,7 @@ mod test {
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
+ sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
@@ -915,6 +923,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
- assert_eq!(manifest_size, 1721);
+ assert_eq!(manifest_size, 1748);
}
}
diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs
index 7aacffa34b..2744f24890 100644
--- a/src/mito2/src/memtable.rs
+++ b/src/mito2/src/memtable.rs
@@ -40,7 +40,8 @@ use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::read::prune::PruneTimeIterator;
use crate::read::scan_region::PredicateGroup;
-use crate::region::options::{MemtableOptions, MergeMode};
+use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
+use crate::sst::FormatType;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::SstInfo;
@@ -324,7 +325,7 @@ impl Drop for AllocTracker {
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option,
config: Arc,
- compact_dispatcher: Option>,
+ compact_dispatcher: Arc,
}
impl MemtableBuilderProvider {
@@ -332,9 +333,8 @@ impl MemtableBuilderProvider {
write_buffer_manager: Option,
config: Arc,
) -> Self {
- let compact_dispatcher = config
- .enable_experimental_flat_format
- .then(|| Arc::new(CompactDispatcher::new(config.max_background_compactions)));
+ let compact_dispatcher =
+ Arc::new(CompactDispatcher::new(config.max_background_compactions));
Self {
write_buffer_manager,
@@ -343,16 +343,19 @@ impl MemtableBuilderProvider {
}
}
- pub(crate) fn builder_for_options(
- &self,
- options: Option<&MemtableOptions>,
- dedup: bool,
- merge_mode: MergeMode,
- ) -> MemtableBuilderRef {
- if self.config.enable_experimental_flat_format {
- common_telemetry::info!(
- "Overriding memtable config, use BulkMemtable under flat format"
- );
+ pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
+ let dedup = options.need_dedup();
+ let merge_mode = options.merge_mode();
+ let flat_format = options
+ .sst_format
+ .map(|format| format == FormatType::Flat)
+ .unwrap_or(self.config.default_experimental_flat_format);
+ if flat_format {
+ if options.memtable.is_some() {
+ common_telemetry::info!(
+ "Overriding memtable config, use BulkMemtable under flat format"
+ );
+ }
return Arc::new(
BulkMemtableBuilder::new(
@@ -360,12 +363,11 @@ impl MemtableBuilderProvider {
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
- // Safety: We create the dispatcher if flat_format is enabled.
- .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
+ .with_compact_dispatcher(self.compact_dispatcher.clone()),
);
}
- match options {
+ match &options.memtable {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
@@ -388,15 +390,14 @@ impl MemtableBuilderProvider {
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
- if self.config.enable_experimental_flat_format {
+ if self.config.default_experimental_flat_format {
return Arc::new(
BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
- // Safety: We create the dispatcher if flat_format is enabled.
- .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
+ .with_compact_dispatcher(self.compact_dispatcher.clone()),
);
}
diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs
index b611387fd2..16933f6827 100644
--- a/src/mito2/src/region.rs
+++ b/src/mito2/src/region.rs
@@ -49,6 +49,7 @@ use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
+use crate::sst::FormatType;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::time_provider::TimeProviderRef;
@@ -131,7 +132,7 @@ pub struct MitoRegion {
///
/// The value will be updated to the latest offset of the topic
/// if region receives a flush request or schedules a periodic flush task
- /// and the region's memtable is empty.
+ /// and the region's memtable is empty.
///
/// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
/// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
@@ -140,6 +141,8 @@ pub struct MitoRegion {
pub(crate) written_bytes: Arc,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
+ /// Format type of the SST file.
+ pub(crate) sst_format: FormatType,
/// manifest stats
stats: ManifestStats,
}
@@ -191,12 +194,17 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
- /// Return last compaction time in millis.
+ /// Returns last compaction timestamp in millis.
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
}
- /// Update compaction time to now millis.
+ /// Returns format type of the SST file.
+ pub(crate) fn sst_format(&self) -> FormatType {
+ self.sst_format
+ }
+
+ /// Update compaction time to current time.
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
@@ -443,6 +451,7 @@ impl MitoRegion {
if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
let action = RegionMetaAction::Change(RegionChange {
metadata: current_meta.clone(),
+ sst_format: self.sst_format(),
});
let result = manager
.update(
@@ -1155,6 +1164,7 @@ mod tests {
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
};
+ use crate::sst::FormatType;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
@@ -1239,6 +1249,7 @@ mod tests {
},
Default::default(),
Default::default(),
+ FormatType::PrimaryKey,
)
.await
.unwrap();
@@ -1305,6 +1316,7 @@ mod tests {
},
Default::default(),
Default::default(),
+ FormatType::PrimaryKey,
)
.await
.unwrap();
@@ -1327,6 +1339,7 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
+ sst_format: FormatType::PrimaryKey,
stats: ManifestStats::default(),
};
diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs
index 4b9fd32e6b..95aef2776a 100644
--- a/src/mito2/src/region/opener.rs
+++ b/src/mito2/src/region/opener.rs
@@ -60,6 +60,7 @@ use crate::region::{
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
+use crate::sst::FormatType;
use crate::sst::file_purger::create_local_file_purger;
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -254,6 +255,15 @@ impl RegionOpener {
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
let provider = self.provider::(&options.wal_options)?;
let metadata = Arc::new(metadata);
+ // Set the sst_format based on options or flat_format flag
+ let sst_format = if let Some(format) = options.sst_format {
+ format
+ } else if config.default_experimental_flat_format {
+ FormatType::Flat
+ } else {
+ // Default to PrimaryKeyParquet for newly created regions
+ FormatType::PrimaryKey
+ };
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options =
Self::manifest_options(config, &options, ®ion_dir, &self.object_store_manager)?;
@@ -265,14 +275,11 @@ impl RegionOpener {
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
+ sst_format,
)
.await?;
- let memtable_builder = self.memtable_builder_provider.builder_for_options(
- options.memtable.as_ref(),
- options.need_dedup(),
- options.merge_mode(),
- );
+ let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
let part_duration = options.compaction.time_window();
// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
@@ -319,6 +326,7 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
written_bytes: Arc::new(AtomicU64::new(0)),
+ sst_format,
stats: self.stats,
})
}
@@ -445,11 +453,9 @@ impl RegionOpener {
self.cache_manager.clone(),
self.file_ref_manager.clone(),
);
- let memtable_builder = self.memtable_builder_provider.builder_for_options(
- region_options.memtable.as_ref(),
- region_options.need_dedup(),
- region_options.merge_mode(),
- );
+ let memtable_builder = self
+ .memtable_builder_provider
+ .builder_for_options(®ion_options);
// Use compaction time window in the manifest if region doesn't provide
// the time window option.
let part_duration = region_options
@@ -525,6 +531,9 @@ impl RegionOpener {
}
let now = self.time_provider.current_time_millis();
+ // Read sst_format from manifest
+ let sst_format = manifest.sst_format;
+
let region = MitoRegion {
region_id: self.region_id,
version_control,
@@ -542,6 +551,7 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder,
+ sst_format,
stats: self.stats.clone(),
};
Ok(Some(region))
diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs
index 4d08a613ea..363c756eb8 100644
--- a/src/mito2/src/region/options.rs
+++ b/src/mito2/src/region/options.rs
@@ -33,6 +33,7 @@ use strum::EnumString;
use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
+use crate::sst::FormatType;
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
@@ -73,6 +74,8 @@ pub struct RegionOptions {
/// The mode to merge duplicate rows.
/// Only takes effect when `append_mode` is `false`.
pub merge_mode: Option,
+ /// SST format type.
+ pub sst_format: Option,
}
impl RegionOptions {
@@ -151,6 +154,7 @@ impl TryFrom<&HashMap> for RegionOptions {
index_options,
memtable,
merge_mode: options.merge_mode,
+ sst_format: options.sst_format,
};
opts.validate()?;
@@ -256,6 +260,8 @@ struct RegionOptionsWithoutEnum {
append_mode: bool,
#[serde_as(as = "NoneAsEmptyString")]
merge_mode: Option,
+ #[serde_as(as = "NoneAsEmptyString")]
+ sst_format: Option,
}
impl Default for RegionOptionsWithoutEnum {
@@ -266,6 +272,7 @@ impl Default for RegionOptionsWithoutEnum {
storage: options.storage,
append_mode: options.append_mode,
merge_mode: options.merge_mode,
+ sst_format: options.sst_format,
}
}
}
@@ -652,6 +659,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
+ sst_format: None,
};
assert_eq!(expect, options);
}
@@ -685,6 +693,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
+ sst_format: None,
};
let region_options_json_str = serde_json::to_string(&options).unwrap();
let got: RegionOptions = serde_json::from_str(®ion_options_json_str).unwrap();
@@ -748,6 +757,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
+ sst_format: None,
};
assert_eq!(options, got);
}
diff --git a/src/mito2/src/remap_manifest.rs b/src/mito2/src/remap_manifest.rs
index ee0e690960..ed993b41b6 100644
--- a/src/mito2/src/remap_manifest.rs
+++ b/src/mito2/src/remap_manifest.rs
@@ -87,6 +87,7 @@ impl RemapManifest {
.next()
.context(error::NoOldManifestsSnafu)?;
let template_metadata = (*template_manifest.metadata).clone();
+ let sst_format = template_manifest.sst_format;
// Create empty manifest for each new region
for region_id in self.new_partition_exprs.keys() {
@@ -114,6 +115,7 @@ impl RemapManifest {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
+ sst_format,
};
new_manifests.insert(*region_id, manifest);
@@ -372,6 +374,7 @@ mod tests {
use super::*;
use crate::manifest::action::RegionManifest;
+ use crate::sst::FormatType;
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::wal::EntryId;
@@ -452,6 +455,7 @@ mod tests {
truncated_entry_id: None,
compaction_time_window: None,
committed_sequence: None,
+ sst_format: FormatType::PrimaryKey,
}
}
diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs
index b3e6fdbca2..1d94e74eaa 100644
--- a/src/mito2/src/sst.rs
+++ b/src/mito2/src/sst.rs
@@ -22,6 +22,7 @@ use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use datatypes::prelude::ConcreteDataType;
+use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use store_api::storage::consts::{
@@ -42,6 +43,18 @@ pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
/// Default number of concurrent write, it only works on object store backend(e.g., S3).
pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
+/// Format type of the SST file.
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
+#[serde(rename_all = "snake_case")]
+#[strum(serialize_all = "snake_case")]
+pub enum FormatType {
+ /// Parquet with primary key encoded.
+ #[default]
+ PrimaryKey,
+ /// Flat Parquet format.
+ Flat,
+}
+
/// Gets the arrow schema to store in parquet.
pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(
diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs
index 443377170a..8deba23ff8 100644
--- a/src/mito2/src/test_util.rs
+++ b/src/mito2/src/test_util.rs
@@ -74,6 +74,7 @@ use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::region::opener::{PartitionExprFetcher, PartitionExprFetcherRef};
+use crate::sst::FormatType;
use crate::sst::file_purger::{FilePurgerRef, NoopFilePurger};
use crate::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
use crate::sst::index::intermediate::IntermediateManager;
@@ -609,6 +610,7 @@ impl TestEnv {
manifest_opts,
Default::default(),
Default::default(),
+ FormatType::PrimaryKey,
)
.await
.map(Some)
diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs
index 4e3bc9fa62..3fbfbd0ad1 100644
--- a/src/mito2/src/test_util/scheduler_util.rs
+++ b/src/mito2/src/test_util/scheduler_util.rs
@@ -35,6 +35,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequestWithTime;
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
+use crate::sst::FormatType;
use crate::sst::index::IndexBuildScheduler;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -134,6 +135,7 @@ impl SchedulerEnv {
},
Default::default(),
Default::default(),
+ FormatType::PrimaryKey,
)
.await
.unwrap(),
diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs
index aff97e09a1..23d00ab4e7 100644
--- a/src/mito2/src/worker/handle_alter.rs
+++ b/src/mito2/src/worker/handle_alter.rs
@@ -154,7 +154,10 @@ impl RegionWorkerLoop {
}
};
// Persist the metadata to region's manifest.
- let change = RegionChange { metadata: new_meta };
+ let change = RegionChange {
+ metadata: new_meta,
+ sst_format: region.sst_format(),
+ };
self.handle_manifest_region_change(region, change, sender)
}
diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs
index 83a3fd7063..99e7166f43 100644
--- a/src/store-api/src/mito_engine_options.rs
+++ b/src/store-api/src/mito_engine_options.rs
@@ -55,6 +55,8 @@ pub const MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES: &str =
"memtable.partition_tree.fork_dictionary_bytes";
/// Option key for skipping WAL.
pub const SKIP_WAL_KEY: &str = "skip_wal";
+/// Option key for sst format.
+pub const SST_FORMAT_KEY: &str = "sst_format";
// Note: Adding new options here should also check if this option should be removed in [metric_engine::engine::create::region_options_for_metadata_region].
/// Returns true if the `key` is a valid option key for the mito engine.
@@ -78,6 +80,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
APPEND_MODE_KEY,
MERGE_MODE_KEY,
+ SST_FORMAT_KEY,
]
.contains(&key)
}
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 347a8d12c9..538392e437 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1502,7 +1502,7 @@ parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
min_compaction_interval = "0s"
-enable_experimental_flat_format = false
+default_experimental_flat_format = false
[region_engine.mito.index]
aux_path = ""