feat: support setting sst_format in table options (#7068)

* feat: add FormatType to support multi format in the future

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add sst_format to RegionOptions

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: sets the sst_format based on RegionOptions

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add sst_format to mito table options

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix RegionManifest deserialization without sst_format

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove Parquet suffix from FormatType

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: prefer RegionOptions::sst_format in compactor/memtable builder

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename enable_experimental_flat_format to
default_experimental_flat_format

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config.md

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fmt

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update manifest test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors, handle sst_format in remap_manifest

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-10-13 16:38:37 +08:00
committed by GitHub
parent 1a73b485fe
commit a9c342b0f7
21 changed files with 178 additions and 52 deletions

View File

@@ -153,7 +153,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -547,7 +547,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |

View File

@@ -500,8 +500,8 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format.
enable_experimental_flat_format = false
## Whether to enable experimental flat format as the default format.
default_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -584,8 +584,8 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format.
enable_experimental_flat_format = false
## Whether to enable experimental flat format as the default format.
default_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -777,6 +777,7 @@ mod tests {
use super::*;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::ManifestContext;
use crate::sst::FormatType;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{VersionControlBuilder, apply_edit};
@@ -1111,6 +1112,7 @@ mod tests {
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
)
.await
.unwrap();

View File

@@ -48,6 +48,7 @@ use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::FormatType;
use crate::sst::file::FileMeta;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
@@ -343,8 +344,14 @@ impl Compactor for DefaultCompactor {
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.engine_config
.enable_experimental_flat_format;
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
@@ -431,8 +438,8 @@ impl Compactor for DefaultCompactor {
let output_file_names =
output_files.iter().map(|f| f.file_id.to_string()).join(",");
info!(
"Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
region_id, input_file_names, output_file_names, metrics
"Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)

View File

@@ -250,6 +250,7 @@ mod tests {
index_options: Default::default(),
memtable: None,
merge_mode: None,
sst_format: None,
},
compaction_time_window: None,
}

View File

@@ -144,9 +144,9 @@ pub struct MitoConfig {
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
/// Whether to enable experimental flat format.
/// Whether to enable experimental flat format as the default format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
pub enable_experimental_flat_format: bool,
pub default_experimental_flat_format: bool,
}
impl Default for MitoConfig {
@@ -184,7 +184,7 @@ impl Default for MitoConfig {
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
enable_experimental_flat_format: false,
default_experimental_flat_format: false,
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -695,7 +695,7 @@ impl EngineInner {
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start)
.with_flat_format(self.config.enable_experimental_flat_format);
.with_flat_format(self.config.default_experimental_flat_format);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);

View File

@@ -29,6 +29,7 @@ use crate::error::{
DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::manifest::manager::RemoveFileOptions;
use crate::sst::FormatType;
use crate::sst::file::FileMeta;
use crate::wal::EntryId;
@@ -49,6 +50,9 @@ pub enum RegionMetaAction {
pub struct RegionChange {
/// The metadata after changed.
pub metadata: RegionMetadataRef,
/// Format of the SST.
#[serde(default)]
pub sst_format: FormatType,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -128,6 +132,9 @@ pub struct RegionManifest {
/// Inferred compaction time window.
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
/// Format of the SST file.
#[serde(default)]
pub sst_format: FormatType,
}
#[cfg(test)]
@@ -155,6 +162,7 @@ pub struct RegionManifestBuilder {
truncated_entry_id: Option<EntryId>,
compaction_time_window: Option<Duration>,
committed_sequence: Option<SequenceNumber>,
sst_format: FormatType,
}
impl RegionManifestBuilder {
@@ -171,6 +179,7 @@ impl RegionManifestBuilder {
truncated_entry_id: s.truncated_entry_id,
compaction_time_window: s.compaction_time_window,
committed_sequence: s.committed_sequence,
sst_format: s.sst_format,
}
} else {
Default::default()
@@ -180,6 +189,7 @@ impl RegionManifestBuilder {
pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
self.metadata = Some(change.metadata);
self.manifest_version = manifest_version;
self.sst_format = change.sst_format;
}
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
@@ -269,6 +279,7 @@ impl RegionManifestBuilder {
manifest_version: self.manifest_version,
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
sst_format: self.sst_format,
})
}
}
@@ -472,6 +483,7 @@ mod tests {
}"#;
let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
// Note: For backward compatibility, the test accepts a RegionChange without sst_format
let region_change = r#" {
"metadata":{
"column_metadatas":[
@@ -729,6 +741,7 @@ mod tests {
.unwrap()]),
}],
},
sst_format: FormatType::PrimaryKey,
};
let json = serde_json::to_string(&manifest).unwrap();
@@ -830,6 +843,7 @@ mod tests {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
sst_format: FormatType::PrimaryKey,
}
);
@@ -843,6 +857,7 @@ mod tests {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
sst_format: FormatType::PrimaryKey,
};
let json = serde_json::to_string(&new_manifest).unwrap();
let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
@@ -916,4 +931,36 @@ mod tests {
old_from_new
);
}
#[test]
fn test_region_change_backward_compatibility() {
// Test that we can deserialize a RegionChange without sst_format
let region_change_json = r#"{
"metadata": {
"column_metadatas": [
{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},
{"column_schema":{"name":"b","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},
{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
],
"primary_key": [
1
],
"region_id": 42,
"schema_version": 0
}
}"#;
let region_change: RegionChange = serde_json::from_str(region_change_json).unwrap();
assert_eq!(region_change.sst_format, FormatType::PrimaryKey);
// Test serialization and deserialization with sst_format
let region_change = RegionChange {
metadata: region_change.metadata.clone(),
sst_format: FormatType::Flat,
};
let serialized = serde_json::to_string(&region_change).unwrap();
let deserialized: RegionChange = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.sst_format, FormatType::Flat);
}
}

View File

@@ -36,6 +36,7 @@ use crate::manifest::storage::{
};
use crate::metrics::MANIFEST_OP_ELAPSED;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::sst::FormatType;
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
@@ -154,6 +155,7 @@ impl RegionManifestManager {
options: RegionManifestOptions,
total_manifest_size: Arc<AtomicU64>,
manifest_version: Arc<AtomicU64>,
sst_format: FormatType,
) -> Result<Self> {
// construct storage
let mut store = ManifestObjectStore::new(
@@ -175,6 +177,7 @@ impl RegionManifestManager {
version,
RegionChange {
metadata: metadata.clone(),
sst_format,
},
);
let manifest = manifest_builder.try_build()?;
@@ -185,7 +188,10 @@ impl RegionManifestManager {
options.manifest_dir, manifest
);
let mut actions = vec![RegionMetaAction::Change(RegionChange { metadata })];
let mut actions = vec![RegionMetaAction::Change(RegionChange {
metadata,
sst_format,
})];
if flushed_entry_id > 0 {
actions.push(RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
@@ -792,6 +798,7 @@ mod test {
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
@@ -860,6 +867,7 @@ mod test {
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
sst_format: FormatType::PrimaryKey,
}));
let current_version = manager
@@ -915,6 +923,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1721);
assert_eq!(manifest_size, 1748);
}
}

View File

@@ -40,7 +40,8 @@ use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::read::prune::PruneTimeIterator;
use crate::read::scan_region::PredicateGroup;
use crate::region::options::{MemtableOptions, MergeMode};
use crate::region::options::{MemtableOptions, MergeMode, RegionOptions};
use crate::sst::FormatType;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::SstInfo;
@@ -324,7 +325,7 @@ impl Drop for AllocTracker {
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
compact_dispatcher: Option<Arc<CompactDispatcher>>,
compact_dispatcher: Arc<CompactDispatcher>,
}
impl MemtableBuilderProvider {
@@ -332,9 +333,8 @@ impl MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
) -> Self {
let compact_dispatcher = config
.enable_experimental_flat_format
.then(|| Arc::new(CompactDispatcher::new(config.max_background_compactions)));
let compact_dispatcher =
Arc::new(CompactDispatcher::new(config.max_background_compactions));
Self {
write_buffer_manager,
@@ -343,16 +343,19 @@ impl MemtableBuilderProvider {
}
}
pub(crate) fn builder_for_options(
&self,
options: Option<&MemtableOptions>,
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
if self.config.enable_experimental_flat_format {
common_telemetry::info!(
"Overriding memtable config, use BulkMemtable under flat format"
);
pub(crate) fn builder_for_options(&self, options: &RegionOptions) -> MemtableBuilderRef {
let dedup = options.need_dedup();
let merge_mode = options.merge_mode();
let flat_format = options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(self.config.default_experimental_flat_format);
if flat_format {
if options.memtable.is_some() {
common_telemetry::info!(
"Overriding memtable config, use BulkMemtable under flat format"
);
}
return Arc::new(
BulkMemtableBuilder::new(
@@ -360,12 +363,11 @@ impl MemtableBuilderProvider {
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
// Safety: We create the dispatcher if flat_format is enabled.
.with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
.with_compact_dispatcher(self.compact_dispatcher.clone()),
);
}
match options {
match &options.memtable {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
dedup,
@@ -388,15 +390,14 @@ impl MemtableBuilderProvider {
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
if self.config.enable_experimental_flat_format {
if self.config.default_experimental_flat_format {
return Arc::new(
BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
// Safety: We create the dispatcher if flat_format is enabled.
.with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
.with_compact_dispatcher(self.compact_dispatcher.clone()),
);
}

View File

@@ -49,6 +49,7 @@ use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::FormatType;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::time_provider::TimeProviderRef;
@@ -131,7 +132,7 @@ pub struct MitoRegion {
///
/// The value will be updated to the latest offset of the topic
/// if region receives a flush request or schedules a periodic flush task
/// and the region's memtable is empty.
/// and the region's memtable is empty.
///
/// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
/// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
@@ -140,6 +141,8 @@ pub struct MitoRegion {
pub(crate) written_bytes: Arc<AtomicU64>,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// Format type of the SST file.
pub(crate) sst_format: FormatType,
/// manifest stats
stats: ManifestStats,
}
@@ -191,12 +194,17 @@ impl MitoRegion {
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Return last compaction time in millis.
/// Returns last compaction timestamp in millis.
pub(crate) fn last_compaction_millis(&self) -> i64 {
self.last_compaction_millis.load(Ordering::Relaxed)
}
/// Update compaction time to now millis.
/// Returns format type of the SST file.
pub(crate) fn sst_format(&self) -> FormatType {
self.sst_format
}
/// Update compaction time to current time.
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
self.last_compaction_millis.store(now, Ordering::Relaxed);
@@ -443,6 +451,7 @@ impl MitoRegion {
if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
let action = RegionMetaAction::Change(RegionChange {
metadata: current_meta.clone(),
sst_format: self.sst_format(),
});
let result = manager
.update(
@@ -1155,6 +1164,7 @@ mod tests {
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
};
use crate::sst::FormatType;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
@@ -1239,6 +1249,7 @@ mod tests {
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
)
.await
.unwrap();
@@ -1305,6 +1316,7 @@ mod tests {
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
)
.await
.unwrap();
@@ -1327,6 +1339,7 @@ mod tests {
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
sst_format: FormatType::PrimaryKey,
stats: ManifestStats::default(),
};

View File

@@ -60,6 +60,7 @@ use crate::region::{
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file_purger::create_local_file_purger;
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
@@ -254,6 +255,15 @@ impl RegionOpener {
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
let provider = self.provider::<S>(&options.wal_options)?;
let metadata = Arc::new(metadata);
// Set the sst_format based on options or flat_format flag
let sst_format = if let Some(format) = options.sst_format {
format
} else if config.default_experimental_flat_format {
FormatType::Flat
} else {
// Default to PrimaryKeyParquet for newly created regions
FormatType::PrimaryKey
};
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options =
Self::manifest_options(config, &options, &region_dir, &self.object_store_manager)?;
@@ -265,14 +275,11 @@ impl RegionOpener {
region_manifest_options,
self.stats.total_manifest_size.clone(),
self.stats.manifest_version.clone(),
sst_format,
)
.await?;
let memtable_builder = self.memtable_builder_provider.builder_for_options(
options.memtable.as_ref(),
options.need_dedup(),
options.merge_mode(),
);
let memtable_builder = self.memtable_builder_provider.builder_for_options(&options);
let part_duration = options.compaction.time_window();
// Initial memtable id is 0.
let mutable = Arc::new(TimePartitions::new(
@@ -319,6 +326,7 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
written_bytes: Arc::new(AtomicU64::new(0)),
sst_format,
stats: self.stats,
})
}
@@ -445,11 +453,9 @@ impl RegionOpener {
self.cache_manager.clone(),
self.file_ref_manager.clone(),
);
let memtable_builder = self.memtable_builder_provider.builder_for_options(
region_options.memtable.as_ref(),
region_options.need_dedup(),
region_options.merge_mode(),
);
let memtable_builder = self
.memtable_builder_provider
.builder_for_options(&region_options);
// Use compaction time window in the manifest if region doesn't provide
// the time window option.
let part_duration = region_options
@@ -525,6 +531,9 @@ impl RegionOpener {
}
let now = self.time_provider.current_time_millis();
// Read sst_format from manifest
let sst_format = manifest.sst_format;
let region = MitoRegion {
region_id: self.region_id,
version_control,
@@ -542,6 +551,7 @@ impl RegionOpener {
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder,
sst_format,
stats: self.stats.clone(),
};
Ok(Some(region))

View File

@@ -33,6 +33,7 @@ use strum::EnumString;
use crate::error::{Error, InvalidRegionOptionsSnafu, JsonOptionsSnafu, Result};
use crate::memtable::partition_tree::{DEFAULT_FREEZE_THRESHOLD, DEFAULT_MAX_KEYS_PER_SHARD};
use crate::sst::FormatType;
const DEFAULT_INDEX_SEGMENT_ROW_COUNT: usize = 1024;
@@ -73,6 +74,8 @@ pub struct RegionOptions {
/// The mode to merge duplicate rows.
/// Only takes effect when `append_mode` is `false`.
pub merge_mode: Option<MergeMode>,
/// SST format type.
pub sst_format: Option<FormatType>,
}
impl RegionOptions {
@@ -151,6 +154,7 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
index_options,
memtable,
merge_mode: options.merge_mode,
sst_format: options.sst_format,
};
opts.validate()?;
@@ -256,6 +260,8 @@ struct RegionOptionsWithoutEnum {
append_mode: bool,
#[serde_as(as = "NoneAsEmptyString")]
merge_mode: Option<MergeMode>,
#[serde_as(as = "NoneAsEmptyString")]
sst_format: Option<FormatType>,
}
impl Default for RegionOptionsWithoutEnum {
@@ -266,6 +272,7 @@ impl Default for RegionOptionsWithoutEnum {
storage: options.storage,
append_mode: options.append_mode,
merge_mode: options.merge_mode,
sst_format: options.sst_format,
}
}
}
@@ -652,6 +659,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
sst_format: None,
};
assert_eq!(expect, options);
}
@@ -685,6 +693,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
sst_format: None,
};
let region_options_json_str = serde_json::to_string(&options).unwrap();
let got: RegionOptions = serde_json::from_str(&region_options_json_str).unwrap();
@@ -748,6 +757,7 @@ mod tests {
primary_key_encoding: PrimaryKeyEncoding::Dense,
})),
merge_mode: Some(MergeMode::LastNonNull),
sst_format: None,
};
assert_eq!(options, got);
}

View File

@@ -87,6 +87,7 @@ impl RemapManifest {
.next()
.context(error::NoOldManifestsSnafu)?;
let template_metadata = (*template_manifest.metadata).clone();
let sst_format = template_manifest.sst_format;
// Create empty manifest for each new region
for region_id in self.new_partition_exprs.keys() {
@@ -114,6 +115,7 @@ impl RemapManifest {
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
sst_format,
};
new_manifests.insert(*region_id, manifest);
@@ -372,6 +374,7 @@ mod tests {
use super::*;
use crate::manifest::action::RegionManifest;
use crate::sst::FormatType;
use crate::sst::file::{FileMeta, FileTimeRange};
use crate::wal::EntryId;
@@ -452,6 +455,7 @@ mod tests {
truncated_entry_id: None,
compaction_time_window: None,
committed_sequence: None,
sst_format: FormatType::PrimaryKey,
}
}

View File

@@ -22,6 +22,7 @@ use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
};
use datatypes::prelude::ConcreteDataType;
use serde::{Deserialize, Serialize};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadata;
use store_api::storage::consts::{
@@ -42,6 +43,18 @@ pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
/// Default number of concurrent write, it only works on object store backend(e.g., S3).
pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
/// Format type of the SST file.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum FormatType {
/// Parquet with primary key encoded.
#[default]
PrimaryKey,
/// Flat Parquet format.
Flat,
}
/// Gets the arrow schema to store in parquet.
pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
let fields = Fields::from_iter(

View File

@@ -74,6 +74,7 @@ use crate::flush::{WriteBufferManager, WriteBufferManagerRef};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::region::opener::{PartitionExprFetcher, PartitionExprFetcherRef};
use crate::sst::FormatType;
use crate::sst::file_purger::{FilePurgerRef, NoopFilePurger};
use crate::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
use crate::sst::index::intermediate::IntermediateManager;
@@ -609,6 +610,7 @@ impl TestEnv {
manifest_opts,
Default::default(),
Default::default(),
FormatType::PrimaryKey,
)
.await
.map(Some)

View File

@@ -35,6 +35,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequestWithTime;
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::FormatType;
use crate::sst::index::IndexBuildScheduler;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -134,6 +135,7 @@ impl SchedulerEnv {
},
Default::default(),
Default::default(),
FormatType::PrimaryKey,
)
.await
.unwrap(),

View File

@@ -154,7 +154,10 @@ impl<S> RegionWorkerLoop<S> {
}
};
// Persist the metadata to region's manifest.
let change = RegionChange { metadata: new_meta };
let change = RegionChange {
metadata: new_meta,
sst_format: region.sst_format(),
};
self.handle_manifest_region_change(region, change, sender)
}

View File

@@ -55,6 +55,8 @@ pub const MEMTABLE_PARTITION_TREE_FORK_DICTIONARY_BYTES: &str =
"memtable.partition_tree.fork_dictionary_bytes";
/// Option key for skipping WAL.
pub const SKIP_WAL_KEY: &str = "skip_wal";
/// Option key for sst format.
pub const SST_FORMAT_KEY: &str = "sst_format";
// Note: Adding new options here should also check if this option should be removed in [metric_engine::engine::create::region_options_for_metadata_region].
/// Returns true if the `key` is a valid option key for the mito engine.
@@ -78,6 +80,7 @@ pub fn is_mito_engine_option_key(key: &str) -> bool {
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
APPEND_MODE_KEY,
MERGE_MODE_KEY,
SST_FORMAT_KEY,
]
.contains(&key)
}

View File

@@ -1502,7 +1502,7 @@ parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
min_compaction_interval = "0s"
enable_experimental_flat_format = false
default_experimental_flat_format = false
[region_engine.mito.index]
aux_path = ""