diff --git a/config/config.md b/config/config.md
index 4861675217..f28d09e28d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -157,13 +157,12 @@
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true). When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
-| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately. "fail" (default) fails fast and is the recommended option for most users. "wait" / "wait()" waits for memory to become available. This is mainly for advanced tuning in bursty workloads where temporary contention is common and higher latency is acceptable. "wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
-| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
+| `region_engine.mito.default_flat_format` | Bool | `true` | Whether to enable flat format as the default SST format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`. The default name for this directory is `index_intermediate` for backward compatibility. This path contains two subdirectories: - `__intm`: for storing intermediate files used during creating index. - `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -550,13 +549,12 @@
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true). When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
-| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries. Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%"). Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately. "fail" (default) fails fast and is the recommended option for most users. "wait" / "wait()" waits for memory to become available. This is mainly for advanced tuning in bursty workloads where temporary contention is common and higher latency is acceptable. "wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions. To align with the old behavior, the default value is 0 (no restrictions). |
-| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
+| `region_engine.mito.default_flat_format` | Bool | `true` | Whether to enable flat format as the default SST format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`. The default name for this directory is `index_intermediate` for backward compatibility. This path contains two subdirectories: - `__intm`: for storing intermediate files used during creating index. - `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 833a567d74..10e6965b84 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -520,9 +520,6 @@ manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
-## Capacity of the channel to send data from parallel scan tasks to the main task.
-parallel_scan_channel_size = 32
-
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
@@ -545,8 +542,8 @@ scan_memory_on_exhausted = "fail"
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
-## Whether to enable experimental flat format as the default format.
-default_experimental_flat_format = false
+## Whether to enable flat format as the default SST format.
+default_flat_format = true
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 94c5feebf1..486bc74af2 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -612,9 +612,6 @@ manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
-## Capacity of the channel to send data from parallel scan tasks to the main task.
-parallel_scan_channel_size = 32
-
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
@@ -637,8 +634,8 @@ scan_memory_on_exhausted = "fail"
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
-## Whether to enable experimental flat format as the default format.
-default_experimental_flat_format = false
+## Whether to enable flat format as the default SST format.
+default_flat_format = true
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs
index 300bd34647..942dae1136 100644
--- a/src/metric-engine/src/engine/bulk_insert.rs
+++ b/src/metric-engine/src/engine/bulk_insert.rs
@@ -528,7 +528,7 @@ mod tests {
async fn test_bulk_insert_physical_region_passthrough() {
// Use flat format so that BulkMemtable is used (supports write_bulk).
let mito_config = MitoConfig {
- default_experimental_flat_format: true,
+ default_flat_format: true,
..Default::default()
};
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
@@ -585,7 +585,7 @@ mod tests {
async fn test_bulk_insert_physical_region_empty_batch() {
// Use flat format so that BulkMemtable is used (supports write_bulk).
let mito_config = MitoConfig {
- default_experimental_flat_format: true,
+ default_flat_format: true,
..Default::default()
};
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
diff --git a/src/metric-engine/src/engine/flush.rs b/src/metric-engine/src/engine/flush.rs
index 5d7479c5d0..8c0f33aaf3 100644
--- a/src/metric-engine/src/engine/flush.rs
+++ b/src/metric-engine/src/engine/flush.rs
@@ -121,6 +121,10 @@ mod tests {
.map(|path| path.replace(&e.file_id, ""));
e.file_id = "".to_string();
e.index_version = 0;
+ // Round down sizes to nearest 1000 to avoid exact size
+ // comparisons that break when the SST format changes.
+ e.file_size = e.file_size / 1000 * 1000;
+ e.index_file_size = e.index_file_size.map(|s| s / 1000 * 1000);
format!("\n{:?}", e)
})
.sorted()
@@ -129,12 +133,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
-ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000001/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000002/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/.parquet", file_size: 3000, index_file_path: Some("test_metric_region/22_0000000042/data/index/.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
+ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index b03e6415e8..ff4317331f 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -322,11 +322,7 @@ impl DefaultCompactor {
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
- .unwrap_or(
- compaction_region
- .engine_config
- .default_experimental_flat_format,
- );
+ .unwrap_or(compaction_region.engine_config.default_flat_format);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index da0ec74022..b3ddb023cb 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -33,8 +33,6 @@ use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
-/// Default channel size for parallel scan task.
-pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
/// Default maximum number of SST files to scan concurrently.
pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
@@ -142,8 +140,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
- /// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
- pub parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently (default 384).
pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
@@ -177,9 +173,9 @@ pub struct MitoConfig {
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
- /// Whether to enable experimental flat format as the default format.
+ /// Whether to enable flat format as the default SST format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
- pub default_experimental_flat_format: bool,
+ pub default_flat_format: bool,
pub gc: GcConfig,
}
@@ -217,7 +213,6 @@ impl Default for MitoConfig {
enable_refill_cache_on_read: true,
manifest_cache_size: ReadableSize::mb(256),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
- parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
scan_memory_limit: MemoryLimit::default(),
@@ -230,7 +225,7 @@ impl Default for MitoConfig {
vector_index: VectorIndexConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
- default_experimental_flat_format: false,
+ default_flat_format: true,
gc: GcConfig::default(),
};
@@ -295,14 +290,6 @@ impl MitoConfig {
);
}
- if self.parallel_scan_channel_size < 1 {
- self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
- warn!(
- "Sanitize scan channel size to {}",
- self.parallel_scan_channel_size
- );
- }
-
// Sets write cache path if it is empty.
if self.write_cache_path.trim().is_empty() {
self.write_cache_path = data_home.to_string();
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index d1c30c3ff6..d006067f0d 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -1027,7 +1027,6 @@ impl EngineInner {
request,
CacheStrategy::EnableAll(cache_manager),
)
- .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs
index b8ba06f0b9..05ba5dae25 100644
--- a/src/mito2/src/engine/alter_test.rs
+++ b/src/mito2/src/engine/alter_test.rs
@@ -141,7 +141,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -213,7 +213,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -267,7 +267,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -318,7 +318,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -387,7 +387,7 @@ async fn test_alter_region_retry_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -457,7 +457,7 @@ async fn test_alter_on_flushing_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -574,7 +574,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -681,7 +681,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -718,7 +718,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -816,7 +816,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -853,7 +853,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -916,7 +916,7 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -994,7 +994,7 @@ async fn test_alter_region_sst_format_with_flush() {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: false,
+ default_flat_format: false,
..Default::default()
})
.await;
@@ -1085,7 +1085,7 @@ async fn test_alter_region_sst_format_with_flush() {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: false,
+ default_flat_format: false,
..Default::default()
},
)
@@ -1118,7 +1118,7 @@ async fn test_alter_region_sst_format_without_flush() {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: false,
+ default_flat_format: false,
..Default::default()
})
.await;
@@ -1203,7 +1203,7 @@ async fn test_alter_region_sst_format_without_flush() {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: false,
+ default_flat_format: false,
..Default::default()
},
)
@@ -1231,6 +1231,250 @@ async fn test_alter_region_sst_format_without_flush() {
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
+#[tokio::test]
+async fn test_alter_region_sst_format_flat_to_pk_with_flush() {
+ common_telemetry::init_default_ut_logging();
+
+ let mut env = TestEnv::new().await;
+ let engine = env
+ .create_engine(MitoConfig {
+ default_flat_format: true,
+ ..Default::default()
+ })
+ .await;
+
+ let region_id = RegionId::new(1, 1);
+ let request = CreateRequestBuilder::new().build();
+
+ env.get_schema_metadata_manager()
+ .register_region_table_info(
+ region_id.table_id(),
+ "test_table",
+ "test_catalog",
+ "test_schema",
+ None,
+ env.get_kv_backend(),
+ )
+ .await;
+
+ let column_schemas = rows_schema(&request);
+ let table_dir = request.table_dir.clone();
+ engine
+ .handle_request(region_id, RegionRequest::Create(request))
+ .await
+ .unwrap();
+
+ // Inserts some data with flat format
+ let rows = Rows {
+ schema: column_schemas.clone(),
+ rows: build_rows(0, 3),
+ };
+ put_rows(&engine, region_id, rows).await;
+
+ // Flushes to create SST files with flat format
+ flush_region(&engine, region_id, None).await;
+
+ let expected_data = "\
++-------+---------+---------------------+
+| tag_0 | field_0 | ts |
++-------+---------+---------------------+
+| 0 | 0.0 | 1970-01-01T00:00:00 |
+| 1 | 1.0 | 1970-01-01T00:00:01 |
+| 2 | 2.0 | 1970-01-01T00:00:02 |
++-------+---------+---------------------+";
+ let request = ScanRequest::default();
+ let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let batches = RecordBatches::try_collect(stream).await.unwrap();
+ assert_eq!(expected_data, batches.pretty_print().unwrap());
+
+ // Alters sst_format from flat to primary_key
+ let alter_format_request = RegionAlterRequest {
+ kind: AlterKind::SetRegionOptions {
+ options: vec![SetRegionOption::Format("primary_key".to_string())],
+ },
+ };
+ engine
+ .handle_request(region_id, RegionRequest::Alter(alter_format_request))
+ .await
+ .unwrap();
+
+ // Inserts more data after alter
+ let rows = Rows {
+ schema: column_schemas.clone(),
+ rows: build_rows(3, 6),
+ };
+ put_rows(&engine, region_id, rows).await;
+
+ // Flushes to create SST files with primary_key format
+ flush_region(&engine, region_id, None).await;
+
+ let expected_all_data = "\
++-------+---------+---------------------+
+| tag_0 | field_0 | ts |
++-------+---------+---------------------+
+| 0 | 0.0 | 1970-01-01T00:00:00 |
+| 1 | 1.0 | 1970-01-01T00:00:01 |
+| 2 | 2.0 | 1970-01-01T00:00:02 |
+| 3 | 3.0 | 1970-01-01T00:00:03 |
+| 4 | 4.0 | 1970-01-01T00:00:04 |
+| 5 | 5.0 | 1970-01-01T00:00:05 |
++-------+---------+---------------------+";
+ let request = ScanRequest::default();
+ let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let batches = RecordBatches::try_collect(stream).await.unwrap();
+ assert_eq!(expected_all_data, batches.pretty_print().unwrap());
+
+ // Reopens region to verify format persists
+ let engine = env
+ .reopen_engine(
+ engine,
+ MitoConfig {
+ default_flat_format: false,
+ ..Default::default()
+ },
+ )
+ .await;
+ engine
+ .handle_request(
+ region_id,
+ RegionRequest::Open(RegionOpenRequest {
+ engine: String::new(),
+ table_dir,
+ path_type: PathType::Bare,
+ options: HashMap::default(),
+ skip_wal_replay: false,
+ checkpoint: None,
+ }),
+ )
+ .await
+ .unwrap();
+
+ let request = ScanRequest::default();
+ let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let batches = RecordBatches::try_collect(stream).await.unwrap();
+ assert_eq!(expected_all_data, batches.pretty_print().unwrap());
+}
+
+#[tokio::test]
+async fn test_alter_region_sst_format_flat_to_pk_without_flush() {
+ common_telemetry::init_default_ut_logging();
+
+ let mut env = TestEnv::new().await;
+ let engine = env
+ .create_engine(MitoConfig {
+ default_flat_format: true,
+ ..Default::default()
+ })
+ .await;
+
+ let region_id = RegionId::new(1, 1);
+ let request = CreateRequestBuilder::new().build();
+
+ env.get_schema_metadata_manager()
+ .register_region_table_info(
+ region_id.table_id(),
+ "test_table",
+ "test_catalog",
+ "test_schema",
+ None,
+ env.get_kv_backend(),
+ )
+ .await;
+
+ let column_schemas = rows_schema(&request);
+ let table_dir = request.table_dir.clone();
+ engine
+ .handle_request(region_id, RegionRequest::Create(request))
+ .await
+ .unwrap();
+
+ let check_format = |engine: &MitoEngine, expected: Option| {
+ let current_format = engine
+ .get_region(region_id)
+ .unwrap()
+ .version()
+ .options
+ .sst_format;
+ assert_eq!(current_format, expected);
+ };
+ check_format(&engine, Some(FormatType::Flat));
+
+ // Inserts some data with flat format
+ let rows = Rows {
+ schema: column_schemas.clone(),
+ rows: build_rows(0, 3),
+ };
+ put_rows(&engine, region_id, rows).await;
+
+ // Alters sst_format from flat to primary_key
+ let alter_format_request = RegionAlterRequest {
+ kind: AlterKind::SetRegionOptions {
+ options: vec![SetRegionOption::Format("primary_key".to_string())],
+ },
+ };
+ engine
+ .handle_request(region_id, RegionRequest::Alter(alter_format_request))
+ .await
+ .unwrap();
+
+ check_format(&engine, Some(FormatType::PrimaryKey));
+
+ // Inserts more data after alter
+ let rows = Rows {
+ schema: column_schemas.clone(),
+ rows: build_rows(3, 6),
+ };
+ put_rows(&engine, region_id, rows).await;
+
+ let expected_all_data = "\
++-------+---------+---------------------+
+| tag_0 | field_0 | ts |
++-------+---------+---------------------+
+| 0 | 0.0 | 1970-01-01T00:00:00 |
+| 1 | 1.0 | 1970-01-01T00:00:01 |
+| 2 | 2.0 | 1970-01-01T00:00:02 |
+| 3 | 3.0 | 1970-01-01T00:00:03 |
+| 4 | 4.0 | 1970-01-01T00:00:04 |
+| 5 | 5.0 | 1970-01-01T00:00:05 |
++-------+---------+---------------------+";
+ let request = ScanRequest::default();
+ let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let batches = RecordBatches::try_collect(stream).await.unwrap();
+ assert_eq!(expected_all_data, batches.pretty_print().unwrap());
+
+ // Reopens region to verify format persists
+ let engine = env
+ .reopen_engine(
+ engine,
+ MitoConfig {
+ default_flat_format: false,
+ ..Default::default()
+ },
+ )
+ .await;
+ engine
+ .handle_request(
+ region_id,
+ RegionRequest::Open(RegionOpenRequest {
+ engine: String::new(),
+ table_dir,
+ path_type: PathType::Bare,
+ options: HashMap::default(),
+ skip_wal_replay: false,
+ checkpoint: None,
+ }),
+ )
+ .await
+ .unwrap();
+
+ check_format(&engine, Some(FormatType::PrimaryKey));
+
+ let request = ScanRequest::default();
+ let stream = engine.scan_to_stream(region_id, request).await.unwrap();
+ let batches = RecordBatches::try_collect(stream).await.unwrap();
+ assert_eq!(expected_all_data, batches.pretty_print().unwrap());
+}
+
#[tokio::test]
async fn test_alter_region_append_mode_with_flush() {
common_telemetry::init_default_ut_logging();
diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs
index 61488b6592..fa7db1f573 100644
--- a/src/mito2/src/engine/append_mode_test.rs
+++ b/src/mito2/src/engine/append_mode_test.rs
@@ -44,7 +44,7 @@ async fn test_append_mode_write_query_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -211,7 +211,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -238,7 +238,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -329,7 +329,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -376,7 +376,7 @@ async fn test_put_single_range_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -474,7 +474,7 @@ async fn test_put_single_range_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs
index 401e6572a2..a82fcfe049 100644
--- a/src/mito2/src/engine/apply_staging_manifest_test.rs
+++ b/src/mito2/src/engine/apply_staging_manifest_test.rs
@@ -62,7 +62,7 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -125,7 +125,7 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_
let mut env = TestEnv::with_prefix("mismatched-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -205,7 +205,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("success").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -406,7 +406,7 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
let mut env = TestEnv::with_prefix("invalid-files-to-add").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -483,7 +483,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo
let mut env = TestEnv::with_prefix("apply-change-edit-different-columns").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -599,7 +599,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
let mut env = TestEnv::with_prefix("apply-preserve-memtable").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index ed92c2b4ac..5c2bd4fd4e 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -56,7 +56,7 @@ async fn test_engine_new_stop_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -93,7 +93,7 @@ async fn test_write_to_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("write-to-region").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -134,7 +134,7 @@ async fn test_region_replay_with_format(factory: Option, flat_f
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -169,7 +169,7 @@ async fn test_region_replay_with_format(factory: Option, flat_f
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -234,7 +234,7 @@ async fn test_write_query_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -278,7 +278,7 @@ async fn test_different_order_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -339,7 +339,7 @@ async fn test_different_order_and_type_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -403,7 +403,7 @@ async fn test_put_delete_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -465,7 +465,7 @@ async fn test_delete_not_null_fields_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -524,7 +524,7 @@ async fn test_put_overwrite_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -594,7 +594,7 @@ async fn test_absent_and_invalid_columns_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -650,7 +650,7 @@ async fn test_region_usage_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("region_usage").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -716,7 +716,7 @@ async fn test_engine_with_write_cache_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
}
.enable_write_cache(path, ReadableSize::mb(512), None);
@@ -765,7 +765,7 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
vector_cache_size: ReadableSize::mb(32),
..Default::default()
})
@@ -896,7 +896,7 @@ async fn test_list_ssts_with_format(
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1002,7 +1002,7 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/batch_catchup_test.rs b/src/mito2/src/engine/batch_catchup_test.rs
index d8c744a733..dc0b552adc 100644
--- a/src/mito2/src/engine/batch_catchup_test.rs
+++ b/src/mito2/src/engine/batch_catchup_test.rs
@@ -49,7 +49,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -135,7 +135,7 @@ async fn test_batch_catchup_with_format(factory: Option, flat_f
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -216,7 +216,7 @@ async fn test_batch_catchup_err_with_format(factory: Option, fl
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs
index c718ef248c..6b16b3c120 100644
--- a/src/mito2/src/engine/batch_open_test.rs
+++ b/src/mito2/src/engine/batch_open_test.rs
@@ -49,7 +49,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -157,7 +157,7 @@ async fn test_batch_open_with_format(factory: Option, flat_form
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -193,7 +193,7 @@ async fn test_batch_open_err_with_format(factory: Option, flat_
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/bump_committed_sequence_test.rs b/src/mito2/src/engine/bump_committed_sequence_test.rs
index 00d2c0f51c..12db0044c5 100644
--- a/src/mito2/src/engine/bump_committed_sequence_test.rs
+++ b/src/mito2/src/engine/bump_committed_sequence_test.rs
@@ -35,7 +35,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -97,7 +97,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -136,7 +136,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs
index 718462e8a8..e10e91b51b 100644
--- a/src/mito2/src/engine/catchup_test.rs
+++ b/src/mito2/src/engine/catchup_test.rs
@@ -701,7 +701,7 @@ async fn test_catchup_not_exist_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs
index 965a4f6fff..4c06583b0b 100644
--- a/src/mito2/src/engine/close_test.rs
+++ b/src/mito2/src/engine/close_test.rs
@@ -29,7 +29,7 @@ async fn test_engine_close_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("close").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs
index df8521535f..cbcad3a58a 100644
--- a/src/mito2/src/engine/compaction_test.rs
+++ b/src/mito2/src/engine/compaction_test.rs
@@ -147,7 +147,7 @@ async fn test_compaction_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -223,7 +223,7 @@ async fn test_infer_compaction_time_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -374,7 +374,7 @@ async fn test_compaction_overlapping_files_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -445,7 +445,7 @@ async fn test_compaction_region_with_overlapping_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -503,7 +503,7 @@ async fn test_compaction_region_with_overlapping_delete_all_with_format(flat_for
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -571,7 +571,7 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
// Ensure there is only one background worker for purge task.
max_background_purges: 1,
..Default::default()
@@ -730,7 +730,7 @@ async fn test_compaction_update_time_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -836,7 +836,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -938,7 +938,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -981,7 +981,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1040,7 +1040,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/copy_region_from_test.rs b/src/mito2/src/engine/copy_region_from_test.rs
index e9f8398302..0cf2686fca 100644
--- a/src/mito2/src/engine/copy_region_from_test.rs
+++ b/src/mito2/src/engine/copy_region_from_test.rs
@@ -41,7 +41,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index:
let mut env = TestEnv::with_prefix("copy-region-from").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -156,7 +156,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -283,7 +283,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -328,7 +328,7 @@ async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs
index e5980d9442..6dff346539 100644
--- a/src/mito2/src/engine/create_test.rs
+++ b/src/mito2/src/engine/create_test.rs
@@ -36,7 +36,7 @@ async fn test_engine_create_new_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("new-region").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -61,7 +61,7 @@ async fn test_engine_create_existing_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("create-existing").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("create-close-create").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -131,7 +131,7 @@ async fn test_engine_create_with_different_id_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -160,7 +160,7 @@ async fn test_engine_create_with_different_schema_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -190,7 +190,7 @@ async fn test_engine_create_with_different_primary_key_with_format(flat_format:
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_engine_create_with_options_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -253,7 +253,7 @@ async fn test_engine_create_with_custom_store_with_format(flat_format: bool) {
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -301,7 +301,7 @@ async fn test_engine_create_with_memtable_opts_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -353,7 +353,7 @@ async fn create_with_partition_expr_persists_manifest_with_format(flat_format: b
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -401,7 +401,7 @@ async fn test_engine_create_with_format_one_case(create_format: &str, default_fl
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: default_flat_format,
+ default_flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs
index b3da775117..a34a5d1172 100644
--- a/src/mito2/src/engine/drop_test.rs
+++ b/src/mito2/src/engine/drop_test.rs
@@ -45,7 +45,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -175,7 +175,7 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool)
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs
index 01bdf60070..4a92d3494f 100644
--- a/src/mito2/src/engine/edit_region_test.rs
+++ b/src/mito2/src/engine/edit_region_test.rs
@@ -54,7 +54,7 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
let (tx, mut rx) = oneshot::channel();
let config = MitoConfig {
min_compaction_interval: Duration::from_secs(60 * 60),
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
};
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
@@ -154,7 +154,7 @@ async fn test_edit_region_fill_cache_with_format(flat_format: bool) {
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_write_cache: true,
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -268,7 +268,7 @@ async fn test_edit_region_concurrently_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
// Suppress the compaction to not impede the speed of this kinda stress testing.
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()
diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs
index c40fc7ba02..497583b8bc 100644
--- a/src/mito2/src/engine/filter_deleted_test.rs
+++ b/src/mito2/src/engine/filter_deleted_test.rs
@@ -36,7 +36,7 @@ async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs
index 78bae2b461..b86e75c72a 100644
--- a/src/mito2/src/engine/flush_test.rs
+++ b/src/mito2/src/engine/flush_test.rs
@@ -49,7 +49,7 @@ async fn test_manual_flush_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_flush_engine_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -191,7 +191,7 @@ async fn test_write_stall_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -274,7 +274,7 @@ async fn test_flush_empty_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -447,7 +447,7 @@ async fn test_auto_flush_engine_with_format(flat_format: bool) {
.create_engine_with_time(
MitoConfig {
auto_flush_interval: Duration::from_secs(60 * 5),
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -523,7 +523,7 @@ async fn test_flush_workers_with_format(flat_format: bool) {
.create_engine_with(
MitoConfig {
num_workers: 2,
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs
index 097d5e2b91..40a87642ae 100644
--- a/src/mito2/src/engine/merge_mode_test.rs
+++ b/src/mito2/src/engine/merge_mode_test.rs
@@ -39,7 +39,7 @@ async fn test_merge_mode_write_query_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -107,7 +107,7 @@ async fn test_merge_mode_compaction_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_merge_mode_compaction_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs
index 5ee25fb9ff..28ad1de71e 100644
--- a/src/mito2/src/engine/open_test.rs
+++ b/src/mito2/src/engine/open_test.rs
@@ -48,7 +48,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("open-empty").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -87,7 +87,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("open-exiting").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -126,7 +126,7 @@ async fn test_engine_reopen_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("reopen-region").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -153,7 +153,7 @@ async fn test_engine_open_readonly_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -207,7 +207,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -260,7 +260,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -332,7 +332,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -376,7 +376,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -415,7 +415,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -462,7 +462,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -513,7 +513,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -569,7 +569,7 @@ async fn test_open_compaction_region() {
async fn test_open_compaction_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let mut mito_config = MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
};
mito_config
diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs
index cf5b6491a7..b88a60739b 100644
--- a/src/mito2/src/engine/parallel_test.rs
+++ b/src/mito2/src/engine/parallel_test.rs
@@ -33,13 +33,11 @@ async fn scan_in_parallel(
region_id: RegionId,
table_dir: &str,
parallelism: usize,
- channel_size: usize,
flat_format: bool,
) {
let engine = env
.open_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
- parallel_scan_channel_size: channel_size,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -85,7 +83,7 @@ async fn test_parallel_scan_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -146,15 +144,13 @@ async fn test_parallel_scan_with_format(flat_format: bool) {
engine.stop().await.unwrap();
- scan_in_parallel(&mut env, region_id, &table_dir, 0, 1, flat_format).await;
+ scan_in_parallel(&mut env, region_id, &table_dir, 0, flat_format).await;
- scan_in_parallel(&mut env, region_id, &table_dir, 1, 1, flat_format).await;
+ scan_in_parallel(&mut env, region_id, &table_dir, 1, flat_format).await;
- scan_in_parallel(&mut env, region_id, &table_dir, 2, 1, flat_format).await;
+ scan_in_parallel(&mut env, region_id, &table_dir, 2, flat_format).await;
- scan_in_parallel(&mut env, region_id, &table_dir, 2, 8, flat_format).await;
+ scan_in_parallel(&mut env, region_id, &table_dir, 4, flat_format).await;
- scan_in_parallel(&mut env, region_id, &table_dir, 4, 8, flat_format).await;
-
- scan_in_parallel(&mut env, region_id, &table_dir, 8, 2, flat_format).await;
+ scan_in_parallel(&mut env, region_id, &table_dir, 8, flat_format).await;
}
diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs
index fdea7d547f..61db52484e 100644
--- a/src/mito2/src/engine/partition_filter_test.rs
+++ b/src/mito2/src/engine/partition_filter_test.rs
@@ -58,7 +58,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs
index 7726005b0b..afa505a3ee 100644
--- a/src/mito2/src/engine/projection_test.rs
+++ b/src/mito2/src/engine/projection_test.rs
@@ -84,7 +84,7 @@ async fn test_scan_projection_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -141,7 +141,7 @@ async fn test_scan_projection_without_primary_key_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs
index beb5e2644a..599547ec8d 100644
--- a/src/mito2/src/engine/prune_test.rs
+++ b/src/mito2/src/engine/prune_test.rs
@@ -32,7 +32,7 @@ async fn check_prune_row_groups(exprs: Vec, expected: &str, flat_format: b
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -180,7 +180,7 @@ async fn test_prune_memtable_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -264,7 +264,7 @@ async fn test_prune_memtable_complex_expr_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -327,7 +327,7 @@ async fn test_mem_range_prune_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -392,7 +392,7 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/remap_manifests_test.rs b/src/mito2/src/engine/remap_manifests_test.rs
index 339896450c..b893eb5b97 100644
--- a/src/mito2/src/engine/remap_manifests_test.rs
+++ b/src/mito2/src/engine/remap_manifests_test.rs
@@ -37,7 +37,7 @@ async fn test_remap_manifests_invalid_partition_expr_with_format(flat_format: bo
let mut env = TestEnv::with_prefix("invalid-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -83,7 +83,7 @@ async fn test_remap_manifests_invalid_region_state_with_format(flat_format: bool
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -123,7 +123,7 @@ async fn test_remap_manifests_invalid_input_regions_with_format(flat_format: boo
let mut env = TestEnv::with_prefix("invalid-input-regions").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -166,7 +166,7 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs
index 6357f01775..119b4493fd 100644
--- a/src/mito2/src/engine/scan_test.rs
+++ b/src/mito2/src/engine/scan_test.rs
@@ -41,7 +41,7 @@ async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -176,7 +176,7 @@ async fn test_max_concurrent_scan_files() {
async fn test_max_concurrent_scan_files_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
let config = MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
max_concurrent_scan_files: 2,
..Default::default()
};
@@ -235,7 +235,7 @@ async fn test_series_scan_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs
index fd90cd99f7..4fb15ab7fe 100644
--- a/src/mito2/src/engine/set_role_state_test.rs
+++ b/src/mito2/src/engine/set_role_state_test.rs
@@ -70,7 +70,7 @@ async fn test_set_role_state_gracefully_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -141,7 +141,7 @@ async fn test_set_role_state_gracefully_not_exist_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -166,7 +166,7 @@ async fn test_write_downgrading_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -329,7 +329,7 @@ async fn test_restricted_state_transitions_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs
index e47a77bea0..bd90779e0b 100644
--- a/src/mito2/src/engine/staging_test.rs
+++ b/src/mito2/src/engine/staging_test.rs
@@ -72,7 +72,7 @@ async fn test_staging_state_integration_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -130,7 +130,7 @@ async fn test_staging_blocks_alter_operations_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -171,7 +171,7 @@ async fn test_staging_blocks_truncate_operations_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -308,7 +308,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -505,7 +505,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -657,7 +657,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -883,7 +883,7 @@ async fn test_enter_staging_writes_partition_expr_change_action_with_format(flat
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -947,7 +947,7 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1032,7 +1032,7 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
None,
@@ -1156,7 +1156,7 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let partition_expr = default_partition_expr();
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs
index 6c3b91c130..17d73b1848 100644
--- a/src/mito2/src/engine/sync_test.rs
+++ b/src/mito2/src/engine/sync_test.rs
@@ -80,7 +80,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
// Open the region on the follower engine
let follower_engine = env
.create_follower_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -189,7 +189,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -224,7 +224,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
// Open the region on the follower engine
let follower_engine = env
.create_follower_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs
index 223cc2b488..818da17faa 100644
--- a/src/mito2/src/engine/truncate_test.rs
+++ b/src/mito2/src/engine/truncate_test.rs
@@ -41,7 +41,7 @@ async fn test_engine_truncate_region_basic_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-basic").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -104,7 +104,7 @@ async fn test_engine_put_data_after_truncate_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-put").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -180,7 +180,7 @@ async fn test_engine_truncate_after_flush_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-flush").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -270,7 +270,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-reopen").await;
let engine = env
.create_engine(MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -310,7 +310,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
@@ -355,7 +355,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager),
@@ -436,7 +436,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
- default_experimental_flat_format: flat_format,
+ default_flat_format: flat_format,
..Default::default()
},
)
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index fedac95d27..7be81dec8d 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -634,7 +634,7 @@ impl RegionFlushTask {
.options
.sst_format
.map(|f| f == FormatType::Flat)
- .unwrap_or(self.engine_config.default_experimental_flat_format);
+ .unwrap_or(self.engine_config.default_flat_format);
SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),
diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs
index 154d062e07..e1494aa47b 100644
--- a/src/mito2/src/memtable.rs
+++ b/src/mito2/src/memtable.rs
@@ -421,7 +421,7 @@ impl MemtableBuilderProvider {
let flat_format = options
.sst_format
.map(|format| format == FormatType::Flat)
- .unwrap_or(self.config.default_experimental_flat_format);
+ .unwrap_or(self.config.default_flat_format);
if flat_format {
if options.memtable.is_some() {
common_telemetry::info!(
diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs
index f56c807af3..c447685822 100644
--- a/src/mito2/src/read/scan_region.rs
+++ b/src/mito2/src/read/scan_region.rs
@@ -46,7 +46,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
-use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
+use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
@@ -219,8 +219,6 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_strategy: CacheStrategy,
- /// Capacity of the channel to send data from parallel scan tasks to the main task.
- parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
max_concurrent_scan_files: usize,
/// Whether to ignore inverted index.
@@ -251,7 +249,6 @@ impl ScanRegion {
access_layer,
request,
cache_strategy,
- parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
ignore_inverted_index: false,
ignore_fulltext_index: false,
@@ -263,16 +260,6 @@ impl ScanRegion {
}
}
- /// Sets parallel scan task channel size.
- #[must_use]
- pub(crate) fn with_parallel_scan_channel_size(
- mut self,
- parallel_scan_channel_size: usize,
- ) -> Self {
- self.parallel_scan_channel_size = parallel_scan_channel_size;
- self
- }
-
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
@@ -527,7 +514,6 @@ impl ScanRegion {
.with_inverted_index_appliers(inverted_index_appliers)
.with_bloom_filter_index_appliers(bloom_filter_appliers)
.with_fulltext_index_appliers(fulltext_index_appliers)
- .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
@@ -814,8 +800,6 @@ pub struct ScanInput {
pub(crate) cache_strategy: CacheStrategy,
/// Ignores file not found error.
ignore_file_not_found: bool,
- /// Capacity of the channel to send data from parallel scan tasks to the main task.
- pub(crate) parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
@@ -863,7 +847,6 @@ impl ScanInput {
files: Vec::new(),
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
- parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
@@ -928,16 +911,6 @@ impl ScanInput {
self
}
- /// Sets scan task channel size.
- #[must_use]
- pub(crate) fn with_parallel_scan_channel_size(
- mut self,
- parallel_scan_channel_size: usize,
- ) -> Self {
- self.parallel_scan_channel_size = parallel_scan_channel_size;
- self
- }
-
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
@@ -1072,6 +1045,7 @@ impl ScanInput {
&self,
sources: Vec,
semaphore: Arc,
+ channel_size: usize,
) -> Result> {
if sources.len() <= 1 {
return Ok(sources);
@@ -1081,7 +1055,7 @@ impl ScanInput {
let sources = sources
.into_iter()
.map(|source| {
- let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
+ let (sender, receiver) = mpsc::channel(channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
@@ -1256,6 +1230,7 @@ impl ScanInput {
&self,
sources: Vec,
semaphore: Arc,
+ channel_size: usize,
) -> Result> {
if sources.len() <= 1 {
return Ok(sources);
@@ -1265,7 +1240,7 @@ impl ScanInput {
let sources = sources
.into_iter()
.map(|source| {
- let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
+ let (sender, receiver) = mpsc::channel(channel_size);
self.spawn_flat_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Box::pin(stream) as _
diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs
index eee32e7835..597f592de6 100644
--- a/src/mito2/src/read/scan_util.rs
+++ b/src/mito2/src/read/scan_util.rs
@@ -48,11 +48,11 @@ use crate::sst::file::{FileTimeRange, RegionFileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
-use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
+use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
/// Per-file scan metrics.
#[derive(Default, Clone)]
@@ -1231,15 +1231,19 @@ const NUM_SERIES_THRESHOLD: u64 = 10240;
/// 60 samples per hour.
const BATCH_SIZE_THRESHOLD: u64 = 50;
-/// Returns true if splitting flat record batches may improve merge performance.
+/// Returns the estimated rows per batch after splitting if splitting flat record batches
+/// may improve merge performance. Returns `None` if splitting is not beneficial.
pub(crate) fn should_split_flat_batches_for_merge(
stream_ctx: &Arc,
range_meta: &RangeMeta,
-) -> bool {
+) -> Option {
// Number of files to split and scan.
let mut num_files_to_split = 0;
let mut num_mem_rows = 0;
let mut num_mem_series = 0;
+ // Total rows and series for estimating batch size after splitting.
+ let mut total_rows: u64 = 0;
+ let mut total_series: u64 = 0;
// Checks each file range, returns early if any range is not splittable.
// For mem ranges, we collect the total number of rows and series because the number of rows in a
// mem range may be too small.
@@ -1261,23 +1265,49 @@ pub(crate) fn should_split_flat_batches_for_merge(
debug_assert!(file.meta_ref().num_rows > 0);
if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
// We can't split batches in a file.
- return false;
+ return None;
} else {
num_files_to_split += 1;
+ total_rows += file.meta_ref().num_rows;
+ total_series += file.meta_ref().num_series;
}
}
// Skips non-file and non-mem ranges.
}
- if num_files_to_split > 0 {
+ let should_split = if num_files_to_split > 0 {
// We mainly consider file ranges because they have enough data for sampling.
true
- } else if num_mem_series > 0 && num_mem_rows > 0 {
- // If we don't have files to scan, we check whether to split by the memtable.
- can_split_series(num_mem_rows as u64, num_mem_series as u64)
+ } else if num_mem_series > 0
+ && num_mem_rows > 0
+ && can_split_series(num_mem_rows as u64, num_mem_series as u64)
+ {
+ total_rows += num_mem_rows as u64;
+ total_series += num_mem_series as u64;
+ true
} else {
false
+ };
+
+ if !should_split {
+ return None;
}
+
+ // Estimate rows per batch after splitting.
+ let estimated_batch_size = if total_series > 0 && total_rows > 0 {
+ ((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
+ } else {
+ // No valid estimate available, use a conservative fallback.
+ DEFAULT_READ_BATCH_SIZE / 4
+ };
+ Some(estimated_batch_size)
+}
+
+/// Computes the channel size for parallel scan based on the estimated rows per batch.
+/// The channel should buffer approximately `2 * DEFAULT_READ_BATCH_SIZE` rows.
+pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
+ let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
+ size.clamp(2, 64)
}
fn can_split_series(num_rows: u64, num_series: u64) -> bool {
@@ -1555,3 +1585,235 @@ pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeq
batches.push_back(record_batch.slice(start, rows_in_batch));
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+ use std::time::Instant;
+
+ use common_time::Timestamp;
+ use smallvec::{SmallVec, smallvec};
+ use store_api::storage::RegionId;
+
+ use super::*;
+ use crate::cache::CacheStrategy;
+ use crate::memtable::{
+ BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
+ MemtableRangeContext, MemtableStats,
+ };
+ use crate::read::projection::ProjectionMapper;
+ use crate::read::range::{MemRangeBuilder, SourceIndex};
+ use crate::read::scan_region::ScanInput;
+ use crate::sst::file::{FileHandle, FileMeta};
+ use crate::sst::file_purger::NoopFilePurger;
+ use crate::test_util::memtable_util::metadata_for_test;
+ use crate::test_util::scheduler_util::SchedulerEnv;
+
+ struct EmptyIterBuilder;
+
+ impl IterBuilder for EmptyIterBuilder {
+ fn build(&self, _metrics: Option) -> Result {
+ Ok(Box::new(std::iter::empty()))
+ }
+
+ fn is_record_batch(&self) -> bool {
+ true
+ }
+
+ fn build_record_batch(
+ &self,
+ _time_range: Option<(Timestamp, Timestamp)>,
+ _metrics: Option,
+ ) -> Result {
+ Ok(Box::new(std::iter::empty()))
+ }
+ }
+
+ async fn new_test_stream_ctx(
+ files: Vec,
+ memtables: Vec,
+ ) -> Arc {
+ let env = SchedulerEnv::new().await;
+ let metadata = metadata_for_test();
+ let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
+ let input = ScanInput::new(env.access_layer.clone(), mapper)
+ .with_cache(CacheStrategy::Disabled)
+ .with_memtables(memtables)
+ .with_files(files);
+
+ Arc::new(StreamContext {
+ input,
+ ranges: Vec::new(),
+ scan_fingerprint: None,
+ query_start: Instant::now(),
+ })
+ }
+
+ fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
+ let meta = FileMeta {
+ region_id: RegionId::new(123, 456),
+ file_id: Default::default(),
+ time_range: (
+ Timestamp::new_millisecond(0),
+ Timestamp::new_millisecond(1000),
+ ),
+ num_rows,
+ num_series,
+ ..Default::default()
+ };
+ FileHandle::new(meta, Arc::new(NoopFilePurger))
+ }
+
+ fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
+ let context = Arc::new(MemtableRangeContext::new(
+ 0,
+ Box::new(EmptyIterBuilder),
+ Default::default(),
+ ));
+ let stats = MemtableStats {
+ time_range: Some((
+ Timestamp::new_millisecond(0),
+ Timestamp::new_millisecond(1000),
+ )),
+ num_rows,
+ num_ranges: 1,
+ series_count,
+ ..Default::default()
+ };
+ let range = MemtableRange::new(context, stats.clone());
+ MemRangeBuilder::new(range, stats)
+ }
+
+ fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
+ let indices = row_group_indices
+ .iter()
+ .map(|row_group_index| SourceIndex {
+ index: row_group_index.index,
+ num_row_groups: 1,
+ })
+ .collect();
+
+ RangeMeta {
+ time_range: (
+ Timestamp::new_millisecond(0),
+ Timestamp::new_millisecond(1000),
+ ),
+ indices,
+ row_group_indices,
+ num_rows: 0,
+ }
+ }
+
+ #[tokio::test]
+ async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
+ let num_rows = SPLIT_ROW_THRESHOLD * 2;
+ let num_series = (num_rows / 100).max(1);
+ let stream_ctx =
+ new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
+ let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ }]);
+
+ assert_eq!(
+ Some((num_rows / num_series) as usize),
+ should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
+ let stream_ctx = new_test_stream_ctx(
+ vec![
+ new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
+ new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
+ ],
+ vec![],
+ )
+ .await;
+ let range_meta = new_test_range_meta(smallvec![
+ RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ },
+ RowGroupIndex {
+ index: 1,
+ row_group_index: 0,
+ }
+ ]);
+
+ assert_eq!(
+ None,
+ should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
+ let num_series =
+ (SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
+ let stream_ctx =
+ new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
+ let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ }]);
+
+ assert_eq!(
+ None,
+ should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
+ let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
+ let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ }]);
+
+ assert_eq!(
+ Some(50),
+ should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
+ );
+ }
+
+ #[tokio::test]
+ async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
+ let stream_ctx =
+ new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
+ let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
+ index: 0,
+ row_group_index: 0,
+ }]);
+
+ assert_eq!(
+ Some(DEFAULT_READ_BATCH_SIZE),
+ should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
+ );
+ }
+
+ #[test]
+ fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
+ assert_eq!(64, compute_parallel_channel_size(0));
+ assert_eq!(64, compute_parallel_channel_size(1));
+ }
+
+ #[test]
+ fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
+ assert_eq!(
+ 4,
+ compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
+ );
+ }
+
+ #[test]
+ fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
+ assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
+ assert_eq!(
+ 2,
+ compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
+ );
+ }
+}
diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs
index 49f173e7c9..15ab435425 100644
--- a/src/mito2/src/read/seq_scan.rs
+++ b/src/mito2/src/read/seq_scan.rs
@@ -43,8 +43,8 @@ use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::range::RangeMeta;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
- PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_flat_file_ranges,
- scan_flat_mem_ranges, should_split_flat_batches_for_merge,
+ PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
+ scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
@@ -176,7 +176,14 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
- Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
+ Self::build_flat_reader_from_sources(
+ stream_ctx,
+ sources,
+ None,
+ None,
+ compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
+ )
+ .await
}
/// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
@@ -187,13 +194,16 @@ impl SeqScan {
mut sources: Vec,
semaphore: Option>,
part_metrics: Option<&PartitionMetrics>,
+ channel_size: usize,
) -> Result {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
- sources = stream_ctx
- .input
- .create_parallel_flat_sources(sources, semaphore.clone())?;
+ sources = stream_ctx.input.create_parallel_flat_sources(
+ sources,
+ semaphore.clone(),
+ channel_size,
+ )?;
}
}
@@ -322,7 +332,7 @@ impl SeqScan {
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
- build_flat_sources(
+ let split_batch_size = build_flat_sources(
&stream_ctx,
&part_range,
compaction,
@@ -332,8 +342,11 @@ impl SeqScan {
file_scan_semaphore.clone(),
).await?;
+ let channel_size = compute_parallel_channel_size(
+ split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
+ );
let mut reader =
- Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
+ Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
.await?;
let mut metrics = ScannerMetrics {
@@ -529,6 +542,7 @@ impl fmt::Debug for SeqScan {
}
/// Builds flat sources for the partition range and push them to the `sources` vector.
+/// Returns the estimated rows per batch after splitting if splitting is applied, or `None`.
pub(crate) async fn build_flat_sources(
stream_ctx: &Arc,
part_range: &PartitionRange,
@@ -537,7 +551,7 @@ pub(crate) async fn build_flat_sources(
partition_pruner: Arc,
sources: &mut Vec,
semaphore: Option>,
-) -> Result<()> {
+) -> Result> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
@@ -561,10 +575,11 @@ pub(crate) async fn build_flat_sources(
};
let num_indices = range_meta.row_group_indices.len();
if num_indices == 0 {
- return Ok(());
+ return Ok(None);
}
- let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
+ let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
+ let should_split = split_batch_size.is_some();
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
@@ -642,7 +657,7 @@ pub(crate) async fn build_flat_sources(
);
}
- Ok(())
+ Ok(split_batch_size)
}
#[cfg(test)]
diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs
index d2e37af66a..bf7ed072ab 100644
--- a/src/mito2/src/read/series_scan.rs
+++ b/src/mito2/src/read/series_scan.rs
@@ -47,9 +47,12 @@ use crate::error::{
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
-use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
+use crate::read::scan_util::{
+ PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_parallel_channel_size,
+};
use crate::read::seq_scan::{SeqScan, build_flat_sources};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
+use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
@@ -482,10 +485,11 @@ impl SeriesDistributor {
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
+ let mut min_batch_size: Option = None;
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
- build_flat_sources(
+ let split_batch_size = build_flat_sources(
&self.stream_ctx,
part_range,
false,
@@ -495,15 +499,21 @@ impl SeriesDistributor {
self.semaphore.clone(),
)
.await?;
+ if let Some(size) = split_batch_size {
+ min_batch_size = Some(min_batch_size.map_or(size, |cur| cur.min(size)));
+ }
}
}
// Builds a flat reader that merge sources from all parts.
+ let channel_size =
+ compute_parallel_channel_size(min_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE));
let mut reader = SeqScan::build_flat_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
+ channel_size,
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs
index 9aa6454f75..b23e73557d 100644
--- a/src/mito2/src/region/opener.rs
+++ b/src/mito2/src/region/opener.rs
@@ -269,7 +269,7 @@ impl RegionOpener {
// Sets the sst_format based on options or flat_format flag
let sst_format = if let Some(format) = options.sst_format {
format
- } else if config.default_experimental_flat_format {
+ } else if config.default_flat_format {
options.sst_format = Some(FormatType::Flat);
FormatType::Flat
} else {
@@ -309,7 +309,7 @@ impl RegionOpener {
debug!(
"Create region {} with options: {:?}, default_flat_format: {}",
- region_id, options, config.default_experimental_flat_format
+ region_id, options, config.default_flat_format
);
let version = VersionBuilder::new(metadata, mutable)
@@ -626,8 +626,10 @@ pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut R
manifest.sst_format,
manifest.metadata.region_id,
);
- options.sst_format = Some(manifest.sst_format);
}
+ // Always set sst_format from manifest to ensure it's explicitly stored,
+ // even when the default matches the manifest value.
+ options.sst_format = Some(manifest.sst_format);
if let Some(manifest_append_mode) = manifest.append_mode
&& options.append_mode != manifest_append_mode
{
diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs
index 459aa8dd32..6fa560e90c 100644
--- a/src/mito2/src/worker/handle_alter.rs
+++ b/src/mito2/src/worker/handle_alter.rs
@@ -216,15 +216,6 @@ impl RegionWorkerLoop {
// If the format is unchanged, we also consider the option is altered.
if new_format != current_options.sst_format.unwrap_or_default() {
all_options_altered = false;
-
- // Validates the format type.
- ensure!(
- new_format == FormatType::Flat,
- store_api::metadata::InvalidRegionRequestSnafu {
- region_id: region.region_id,
- err: "Only allow changing format type to flat",
- }
- );
}
}
SetRegionOption::AppendMode(new_append_mode) => {
@@ -274,8 +265,6 @@ fn new_region_options_on_empty_memtable(
SetRegionOption::Format(format_str) => {
// Safety: handle_alter_region_options_fast() has validated this.
let new_format = format_str.parse::().unwrap();
- assert_eq!(FormatType::Flat, new_format);
-
current_options.sst_format = Some(new_format);
}
SetRegionOption::AppendMode(new_append_mode) => {
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 21e707e4d0..29d4256864 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1566,12 +1566,11 @@ index_cache_percent = 20
enable_refill_cache_on_read = true
manifest_cache_size = "256MiB"
sst_write_buffer_size = "8MiB"
-parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
scan_memory_on_exhausted = "fail"
min_compaction_interval = "0s"
-default_experimental_flat_format = false
+default_flat_format = true
[region_engine.mito.index]
aux_path = ""
diff --git a/tests/cases/standalone/common/alter/alter_format.result b/tests/cases/standalone/common/alter/alter_format.result
index d38c63997d..a1019a8c93 100644
--- a/tests/cases/standalone/common/alter/alter_format.result
+++ b/tests/cases/standalone/common/alter/alter_format.result
@@ -42,6 +42,26 @@ ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
Affected Rows: 0
+SHOW CREATE TABLE test_alt_format;
+
++-----------------+------------------------------------------------+
+| Table | Create Table |
++-----------------+------------------------------------------------+
+| test_alt_format | CREATE TABLE IF NOT EXISTS "test_alt_format" ( |
+| | "h" INT NULL, |
+| | "i" INT NULL DEFAULT 0, |
+| | "j" TIMESTAMP(3) NOT NULL, |
+| | "k" INT NULL, |
+| | TIME INDEX ("j"), |
+| | PRIMARY KEY ("h") |
+| | ) |
+| | |
+| | ENGINE=mito |
+| | WITH( |
+| | sst_format = 'flat' |
+| | ) |
++-----------------+------------------------------------------------+
+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
@@ -116,11 +136,68 @@ SELECT i, h FROM test_alt_format;
| 23 | 13 |
+----+----+
--- not allow to change from flat to primary_key
--- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
+-- allow to change from flat to primary_key
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
-Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
+Affected Rows: 0
+
+SHOW CREATE TABLE test_alt_format;
+
++-----------------+------------------------------------------------+
+| Table | Create Table |
++-----------------+------------------------------------------------+
+| test_alt_format | CREATE TABLE IF NOT EXISTS "test_alt_format" ( |
+| | "h" INT NULL, |
+| | "i" INT NULL DEFAULT 0, |
+| | "j" TIMESTAMP(3) NOT NULL, |
+| | "k" INT NULL, |
+| | TIME INDEX ("j"), |
+| | PRIMARY KEY ("h") |
+| | ) |
+| | |
+| | ENGINE=mito |
+| | WITH( |
+| | sst_format = 'primary_key' |
+| | ) |
++-----------------+------------------------------------------------+
+
+INSERT INTO test_alt_format (h, j, i) VALUES (14, 4, 34);
+
+Affected Rows: 1
+
+-- SQLNESS SORT_RESULT 3 1
+SELECT * FROM test_alt_format;
+
++----+----+-------------------------+----+
+| h | i | j | k |
++----+----+-------------------------+----+
+| 10 | 0 | 1970-01-01T00:00:00 | |
+| 11 | 0 | 1970-01-01T00:00:00.001 | |
+| 12 | 0 | 1970-01-01T00:00:00.002 | |
+| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
+| 14 | 34 | 1970-01-01T00:00:00.004 | |
++----+----+-------------------------+----+
+
+ADMIN flush_table('test_alt_format');
+
++--------------------------------------+
+| ADMIN flush_table('test_alt_format') |
++--------------------------------------+
+| 0 |
++--------------------------------------+
+
+-- SQLNESS SORT_RESULT 3 1
+SELECT * FROM test_alt_format;
+
++----+----+-------------------------+----+
+| h | i | j | k |
++----+----+-------------------------+----+
+| 10 | 0 | 1970-01-01T00:00:00 | |
+| 11 | 0 | 1970-01-01T00:00:00.001 | |
+| 12 | 0 | 1970-01-01T00:00:00.002 | |
+| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
+| 14 | 34 | 1970-01-01T00:00:00.004 | |
++----+----+-------------------------+----+
DROP TABLE test_alt_format;
@@ -167,6 +244,27 @@ ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
Affected Rows: 0
+SHOW CREATE TABLE alt_format_phy;
+
++----------------+-----------------------------------------------+
+| Table | Create Table |
++----------------+-----------------------------------------------+
+| alt_format_phy | CREATE TABLE IF NOT EXISTS "alt_format_phy" ( |
+| | "ts" TIMESTAMP(3) NOT NULL, |
+| | "val" DOUBLE NULL, |
+| | "host" STRING NULL, |
+| | "k" STRING NULL, |
+| | TIME INDEX ("ts"), |
+| | PRIMARY KEY ("host", "k") |
+| | ) |
+| | |
+| | ENGINE=metric |
+| | WITH( |
+| | physical_metric_table = '', |
+| | sst_format = 'flat' |
+| | ) |
++----------------+-----------------------------------------------+
+
SELECT * FROM t1 ORDER BY ts ASC;
+-------------+---+---------------------+------+
@@ -202,11 +300,47 @@ SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
| example.com | 2022-01-02T00:00:00 | 4.56 |
+-------------+---------------------+------+
--- not allow to change from flat to primary_key
--- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
+-- allow to change from flat to primary_key
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
-Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
+Affected Rows: 0
+
+SHOW CREATE TABLE alt_format_phy;
+
++----------------+-----------------------------------------------+
+| Table | Create Table |
++----------------+-----------------------------------------------+
+| alt_format_phy | CREATE TABLE IF NOT EXISTS "alt_format_phy" ( |
+| | "ts" TIMESTAMP(3) NOT NULL, |
+| | "val" DOUBLE NULL, |
+| | "host" STRING NULL, |
+| | "k" STRING NULL, |
+| | TIME INDEX ("ts"), |
+| | PRIMARY KEY ("host", "k") |
+| | ) |
+| | |
+| | ENGINE=metric |
+| | WITH( |
+| | physical_metric_table = '', |
+| | sst_format = 'primary_key' |
+| | ) |
++----------------+-----------------------------------------------+
+
+INSERT INTO t1 (ts, val, host) VALUES
+ ('2022-01-01 00:00:02', 5.0, 'example.com');
+
+Affected Rows: 1
+
+SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
+
++-------------+---------------------+------+
+| host | ts | val |
++-------------+---------------------+------+
+| example.com | 2022-01-01T00:00:00 | 1.23 |
+| example.com | 2022-01-01T00:00:01 | 3.0 |
+| example.com | 2022-01-01T00:00:02 | 5.0 |
+| example.com | 2022-01-02T00:00:00 | 4.56 |
++-------------+---------------------+------+
DROP TABLE t1;
diff --git a/tests/cases/standalone/common/alter/alter_format.sql b/tests/cases/standalone/common/alter/alter_format.sql
index e1472d28e1..c3b292875c 100644
--- a/tests/cases/standalone/common/alter/alter_format.sql
+++ b/tests/cases/standalone/common/alter/alter_format.sql
@@ -16,6 +16,8 @@ SELECT i, h FROM test_alt_format;
ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
+SHOW CREATE TABLE test_alt_format;
+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
@@ -37,10 +39,21 @@ SELECT * FROM test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
--- not allow to change from flat to primary_key
--- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
+-- allow to change from flat to primary_key
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
+SHOW CREATE TABLE test_alt_format;
+
+INSERT INTO test_alt_format (h, j, i) VALUES (14, 4, 34);
+
+-- SQLNESS SORT_RESULT 3 1
+SELECT * FROM test_alt_format;
+
+ADMIN flush_table('test_alt_format');
+
+-- SQLNESS SORT_RESULT 3 1
+SELECT * FROM test_alt_format;
+
DROP TABLE test_alt_format;
CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key");
@@ -62,6 +75,8 @@ SELECT * FROM t1 ORDER BY ts ASC;
ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
+SHOW CREATE TABLE alt_format_phy;
+
SELECT * FROM t1 ORDER BY ts ASC;
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
@@ -72,10 +87,16 @@ INSERT INTO t1 (ts, val, host) VALUES
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
--- not allow to change from flat to primary_key
--- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
+-- allow to change from flat to primary_key
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
+SHOW CREATE TABLE alt_format_phy;
+
+INSERT INTO t1 (ts, val, host) VALUES
+ ('2022-01-01 00:00:02', 5.0, 'example.com');
+
+SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
+
DROP TABLE t1;
DROP TABLE alt_format_phy;
diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template
index 3ec8a2f695..e68a76cc9a 100644
--- a/tests/conf/datanode-test.toml.template
+++ b/tests/conf/datanode-test.toml.template
@@ -6,7 +6,7 @@ rpc_runtime_size = 8
[[region_engine]]
[region_engine.mito]
{{ if enable_flat_format }}
-default_experimental_flat_format = true
+default_flat_format = true
{{ endif }}
[wal]
diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template
index 50c014e991..bcd263d0b5 100644
--- a/tests/conf/standalone-test.toml.template
+++ b/tests/conf/standalone-test.toml.template
@@ -5,7 +5,7 @@ require_lease_before_startup = true
[[region_engine]]
[region_engine.mito]
{{ if enable_flat_format }}
-default_experimental_flat_format = true
+default_flat_format = true
{{ endif }}
[wal]
diff --git a/tests/runner/src/cmd/bare.rs b/tests/runner/src/cmd/bare.rs
index e9a4ff8b79..58199f959e 100644
--- a/tests/runner/src/cmd/bare.rs
+++ b/tests/runner/src/cmd/bare.rs
@@ -103,7 +103,7 @@ pub struct BareCommand {
#[clap(long)]
extra_args: Vec,
- /// Enable flat format for storage engine (sets default_experimental_flat_format = true).
+ /// Enable flat format for storage engine (sets default_flat_format = true).
#[clap(long, default_value = "false")]
enable_flat_format: bool,
}