feat!: switch default sst format to flat (#7909)

* feat: support alter from primary_key to flat

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: alter flat to primary_key

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change default_experimental_flat_format to true

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: compute channel size from splitted batch size

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for split and channel size

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: always set sst_format from manifest on region open

sanitize_region_options did not set options.sst_format when the
default (PrimaryKey) matched the manifest value, leaving it as None
after reopen. This caused the alter format change to appear lost.

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: show create table after alteration

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor!: rename default_experimental_flat_format to default_flat_format

The flat format is no longer experimental. Remove "experimental" from
the config field name, doc comments, and all references.

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-03 12:14:02 +08:00
committed by GitHub
parent a9256f0310
commit 233e35c0c9
50 changed files with 913 additions and 288 deletions

View File

@@ -157,13 +157,12 @@
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.<br/>"fail" (default) fails fast and is the recommended option for most users.<br/>"wait" / "wait(<duration>)" waits for memory to become available. This is mainly<br/>for advanced tuning in bursty workloads where temporary contention is common and<br/>higher latency is acceptable.<br/>"wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.default_flat_format` | Bool | `true` | Whether to enable flat format as the default SST format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -550,13 +549,12 @@
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.scan_memory_limit` | String | `50%` | Memory limit for table scans across all queries.<br/>Supports absolute size (e.g., "2GB") or percentage of system memory (e.g., "20%").<br/>Setting it to 0 disables the limit. |
| `region_engine.mito.scan_memory_on_exhausted` | String | `fail` | Controls what happens when a scan cannot get memory immediately.<br/>"fail" (default) fails fast and is the recommended option for most users.<br/>"wait" / "wait(<duration>)" waits for memory to become available. This is mainly<br/>for advanced tuning in bursty workloads where temporary contention is common and<br/>higher latency is acceptable.<br/>"wait" means "wait(10s)", not unlimited waiting. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.default_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format as the default format. |
| `region_engine.mito.default_flat_format` | Bool | `true` | Whether to enable flat format as the default SST format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |

View File

@@ -520,9 +520,6 @@ manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
@@ -545,8 +542,8 @@ scan_memory_on_exhausted = "fail"
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format as the default format.
default_experimental_flat_format = false
## Whether to enable flat format as the default SST format.
default_flat_format = true
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -612,9 +612,6 @@ manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32
## Maximum number of SST files to scan concurrently.
max_concurrent_scan_files = 384
@@ -637,8 +634,8 @@ scan_memory_on_exhausted = "fail"
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format as the default format.
default_experimental_flat_format = false
## Whether to enable flat format as the default SST format.
default_flat_format = true
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -528,7 +528,7 @@ mod tests {
async fn test_bulk_insert_physical_region_passthrough() {
// Use flat format so that BulkMemtable is used (supports write_bulk).
let mito_config = MitoConfig {
default_experimental_flat_format: true,
default_flat_format: true,
..Default::default()
};
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
@@ -585,7 +585,7 @@ mod tests {
async fn test_bulk_insert_physical_region_empty_batch() {
// Use flat format so that BulkMemtable is used (supports write_bulk).
let mito_config = MitoConfig {
default_experimental_flat_format: true,
default_flat_format: true,
..Default::default()
};
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;

View File

@@ -121,6 +121,10 @@ mod tests {
.map(|path| path.replace(&e.file_id, "<file_id>"));
e.file_id = "<file_id>".to_string();
e.index_version = 0;
// Round down sizes to nearest 1000 to avoid exact size
// comparisons that break when the SST format changes.
e.file_size = e.file_size / 1000 * 1000;
e.index_file_size = e.index_file_size.map(|s| s / 1000 * 1000);
format!("\n{:?}", e)
})
.sorted()
@@ -129,12 +133,12 @@ mod tests {
assert_eq!(
debug_format,
r#"
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3487, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3217, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3471, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 4000, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, num_series: Some(8), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8), origin_region_id: 47261417473(11, 16777217), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 47261417474(11, 16777218), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3000, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(0), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3000, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, num_series: Some(4), min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4), origin_region_id: 94506057770(22, 16777258), node_id: None, visible: true }"#,
);
// list from storage
let storage_entries = mito

View File

@@ -322,11 +322,7 @@ impl DefaultCompactor {
.region_options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(
compaction_region
.engine_config
.default_experimental_flat_format,
);
.unwrap_or(compaction_region.engine_config.default_flat_format);
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();

View File

@@ -33,8 +33,6 @@ use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
/// Default maximum number of SST files to scan concurrently.
pub(crate) const DEFAULT_MAX_CONCURRENT_SCAN_FILES: usize = 384;
@@ -142,8 +140,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently (default 384).
pub max_concurrent_scan_files: usize,
/// Whether to allow stale entries read during replay.
@@ -177,9 +173,9 @@ pub struct MitoConfig {
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
/// Whether to enable experimental flat format as the default format.
/// Whether to enable flat format as the default SST format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
pub default_experimental_flat_format: bool,
pub default_flat_format: bool,
pub gc: GcConfig,
}
@@ -217,7 +213,6 @@ impl Default for MitoConfig {
enable_refill_cache_on_read: true,
manifest_cache_size: ReadableSize::mb(256),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
allow_stale_entries: false,
scan_memory_limit: MemoryLimit::default(),
@@ -230,7 +225,7 @@ impl Default for MitoConfig {
vector_index: VectorIndexConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
default_experimental_flat_format: false,
default_flat_format: true,
gc: GcConfig::default(),
};
@@ -295,14 +290,6 @@ impl MitoConfig {
);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(
"Sanitize scan channel size to {}",
self.parallel_scan_channel_size
);
}
// Sets write cache path if it is empty.
if self.write_cache_path.trim().is_empty() {
self.write_cache_path = data_home.to_string();

View File

@@ -1027,7 +1027,6 @@ impl EngineInner {
request,
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.config.max_concurrent_scan_files)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())

View File

@@ -141,7 +141,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -213,7 +213,7 @@ async fn test_alter_region_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -267,7 +267,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -318,7 +318,7 @@ async fn test_put_after_alter_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -387,7 +387,7 @@ async fn test_alter_region_retry_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -457,7 +457,7 @@ async fn test_alter_on_flushing_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -574,7 +574,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -681,7 +681,7 @@ async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -718,7 +718,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -816,7 +816,7 @@ async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -853,7 +853,7 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -916,7 +916,7 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -994,7 +994,7 @@ async fn test_alter_region_sst_format_with_flush() {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: false,
default_flat_format: false,
..Default::default()
})
.await;
@@ -1085,7 +1085,7 @@ async fn test_alter_region_sst_format_with_flush() {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: false,
default_flat_format: false,
..Default::default()
},
)
@@ -1118,7 +1118,7 @@ async fn test_alter_region_sst_format_without_flush() {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: false,
default_flat_format: false,
..Default::default()
})
.await;
@@ -1203,7 +1203,7 @@ async fn test_alter_region_sst_format_without_flush() {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: false,
default_flat_format: false,
..Default::default()
},
)
@@ -1231,6 +1231,250 @@ async fn test_alter_region_sst_format_without_flush() {
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_sst_format_flat_to_pk_with_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Inserts some data with flat format
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flushes to create SST files with flat format
flush_region(&engine, region_id, None).await;
let expected_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_data, batches.pretty_print().unwrap());
// Alters sst_format from flat to primary_key
let alter_format_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Format("primary_key".to_string())],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_format_request))
.await
.unwrap();
// Inserts more data after alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 6),
};
put_rows(&engine, region_id, rows).await;
// Flushes to create SST files with primary_key format
flush_region(&engine, region_id, None).await;
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
// Reopens region to verify format persists
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_flat_format: false,
..Default::default()
},
)
.await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_sst_format_flat_to_pk_without_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let check_format = |engine: &MitoEngine, expected: Option<FormatType>| {
let current_format = engine
.get_region(region_id)
.unwrap()
.version()
.options
.sst_format;
assert_eq!(current_format, expected);
};
check_format(&engine, Some(FormatType::Flat));
// Inserts some data with flat format
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Alters sst_format from flat to primary_key
let alter_format_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Format("primary_key".to_string())],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_format_request))
.await
.unwrap();
check_format(&engine, Some(FormatType::PrimaryKey));
// Inserts more data after alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 6),
};
put_rows(&engine, region_id, rows).await;
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
// Reopens region to verify format persists
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_flat_format: false,
..Default::default()
},
)
.await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
check_format(&engine, Some(FormatType::PrimaryKey));
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_append_mode_with_flush() {
common_telemetry::init_default_ut_logging();

View File

@@ -44,7 +44,7 @@ async fn test_append_mode_write_query_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -211,7 +211,7 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -238,7 +238,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -329,7 +329,7 @@ async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool)
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -376,7 +376,7 @@ async fn test_put_single_range_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -474,7 +474,7 @@ async fn test_put_single_range_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)

View File

@@ -62,7 +62,7 @@ async fn test_apply_staging_manifest_invalid_region_state_with_format(flat_forma
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -125,7 +125,7 @@ async fn test_apply_staging_manifest_mismatched_partition_expr_with_format(flat_
let mut env = TestEnv::with_prefix("mismatched-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -205,7 +205,7 @@ async fn test_apply_staging_manifest_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("success").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -406,7 +406,7 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
let mut env = TestEnv::with_prefix("invalid-files-to-add").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -483,7 +483,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo
let mut env = TestEnv::with_prefix("apply-change-edit-different-columns").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -599,7 +599,7 @@ async fn test_apply_staging_manifest_preserves_unflushed_memtable_with_format(fl
let mut env = TestEnv::with_prefix("apply-preserve-memtable").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -56,7 +56,7 @@ async fn test_engine_new_stop_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -93,7 +93,7 @@ async fn test_write_to_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("write-to-region").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -134,7 +134,7 @@ async fn test_region_replay_with_format(factory: Option<LogStoreFactory>, flat_f
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -169,7 +169,7 @@ async fn test_region_replay_with_format(factory: Option<LogStoreFactory>, flat_f
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -234,7 +234,7 @@ async fn test_write_query_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -278,7 +278,7 @@ async fn test_different_order_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -339,7 +339,7 @@ async fn test_different_order_and_type_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -403,7 +403,7 @@ async fn test_put_delete_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -465,7 +465,7 @@ async fn test_delete_not_null_fields_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -524,7 +524,7 @@ async fn test_put_overwrite_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -594,7 +594,7 @@ async fn test_absent_and_invalid_columns_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -650,7 +650,7 @@ async fn test_region_usage_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("region_usage").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -716,7 +716,7 @@ async fn test_engine_with_write_cache_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
}
.enable_write_cache(path, ReadableSize::mb(512), None);
@@ -765,7 +765,7 @@ async fn test_cache_null_primary_key_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
vector_cache_size: ReadableSize::mb(32),
..Default::default()
})
@@ -896,7 +896,7 @@ async fn test_list_ssts_with_format(
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1002,7 +1002,7 @@ async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expe
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -49,7 +49,7 @@ async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_f
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -135,7 +135,7 @@ async fn test_batch_catchup_with_format(factory: Option<LogStoreFactory>, flat_f
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -216,7 +216,7 @@ async fn test_batch_catchup_err_with_format(factory: Option<LogStoreFactory>, fl
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -49,7 +49,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -157,7 +157,7 @@ async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_form
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -193,7 +193,7 @@ async fn test_batch_open_err_with_format(factory: Option<LogStoreFactory>, flat_
.with_log_store_factory(factory.clone());
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -35,7 +35,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -97,7 +97,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -136,7 +136,7 @@ async fn test_bump_committed_sequence_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)

View File

@@ -701,7 +701,7 @@ async fn test_catchup_not_exist_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -29,7 +29,7 @@ async fn test_engine_close_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("close").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -147,7 +147,7 @@ async fn test_compaction_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -223,7 +223,7 @@ async fn test_infer_compaction_time_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -374,7 +374,7 @@ async fn test_compaction_overlapping_files_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -445,7 +445,7 @@ async fn test_compaction_region_with_overlapping_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -503,7 +503,7 @@ async fn test_compaction_region_with_overlapping_delete_all_with_format(flat_for
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -571,7 +571,7 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
// Ensure there is only one background worker for purge task.
max_background_purges: 1,
..Default::default()
@@ -730,7 +730,7 @@ async fn test_compaction_update_time_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -836,7 +836,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -938,7 +938,7 @@ async fn test_change_region_compaction_window_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -981,7 +981,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1040,7 +1040,7 @@ async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)

View File

@@ -41,7 +41,7 @@ async fn test_engine_copy_region_from_with_format(flat_format: bool, with_index:
let mut env = TestEnv::with_prefix("copy-region-from").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -156,7 +156,7 @@ async fn test_engine_copy_region_failure_with_format(flat_format: bool) {
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -283,7 +283,7 @@ async fn test_engine_copy_region_invalid_args_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -328,7 +328,7 @@ async fn test_engine_copy_region_unexpected_state_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -36,7 +36,7 @@ async fn test_engine_create_new_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("new-region").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -61,7 +61,7 @@ async fn test_engine_create_existing_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("create-existing").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("create-close-create").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -131,7 +131,7 @@ async fn test_engine_create_with_different_id_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -160,7 +160,7 @@ async fn test_engine_create_with_different_schema_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -190,7 +190,7 @@ async fn test_engine_create_with_different_primary_key_with_format(flat_format:
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_engine_create_with_options_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -253,7 +253,7 @@ async fn test_engine_create_with_custom_store_with_format(flat_format: bool) {
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -301,7 +301,7 @@ async fn test_engine_create_with_memtable_opts_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -353,7 +353,7 @@ async fn create_with_partition_expr_persists_manifest_with_format(flat_format: b
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -401,7 +401,7 @@ async fn test_engine_create_with_format_one_case(create_format: &str, default_fl
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: default_flat_format,
default_flat_format,
..Default::default()
})
.await;

View File

@@ -45,7 +45,7 @@ async fn test_engine_drop_region_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -175,7 +175,7 @@ async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool)
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,

View File

@@ -54,7 +54,7 @@ async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
let (tx, mut rx) = oneshot::channel();
let config = MitoConfig {
min_compaction_interval: Duration::from_secs(60 * 60),
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
};
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
@@ -154,7 +154,7 @@ async fn test_edit_region_fill_cache_with_format(flat_format: bool) {
MitoConfig {
// Write cache must be enabled to download the ingested SST file.
enable_write_cache: true,
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -268,7 +268,7 @@ async fn test_edit_region_concurrently_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
// Suppress the compaction to not impede the speed of this kinda stress testing.
min_compaction_interval: Duration::from_secs(60 * 60),
..Default::default()

View File

@@ -36,7 +36,7 @@ async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -49,7 +49,7 @@ async fn test_manual_flush_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_flush_engine_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -191,7 +191,7 @@ async fn test_write_stall_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -274,7 +274,7 @@ async fn test_flush_empty_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -447,7 +447,7 @@ async fn test_auto_flush_engine_with_format(flat_format: bool) {
.create_engine_with_time(
MitoConfig {
auto_flush_interval: Duration::from_secs(60 * 5),
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),
@@ -523,7 +523,7 @@ async fn test_flush_workers_with_format(flat_format: bool) {
.create_engine_with(
MitoConfig {
num_workers: 2,
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager.clone()),

View File

@@ -39,7 +39,7 @@ async fn test_merge_mode_write_query_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -107,7 +107,7 @@ async fn test_merge_mode_compaction_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_merge_mode_compaction_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)

View File

@@ -48,7 +48,7 @@ async fn test_engine_open_empty_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("open-empty").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -87,7 +87,7 @@ async fn test_engine_open_existing_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("open-exiting").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -126,7 +126,7 @@ async fn test_engine_reopen_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("reopen-region").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -153,7 +153,7 @@ async fn test_engine_open_readonly_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -207,7 +207,7 @@ async fn test_engine_region_open_with_options_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -260,7 +260,7 @@ async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool
let engine = env
.create_engine_with_multiple_object_stores(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -332,7 +332,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -376,7 +376,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -415,7 +415,7 @@ async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -462,7 +462,7 @@ async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bo
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -513,7 +513,7 @@ async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: b
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -569,7 +569,7 @@ async fn test_open_compaction_region() {
async fn test_open_compaction_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let mut mito_config = MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
};
mito_config

View File

@@ -33,13 +33,11 @@ async fn scan_in_parallel(
region_id: RegionId,
table_dir: &str,
parallelism: usize,
channel_size: usize,
flat_format: bool,
) {
let engine = env
.open_engine(MitoConfig {
default_experimental_flat_format: flat_format,
parallel_scan_channel_size: channel_size,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -85,7 +83,7 @@ async fn test_parallel_scan_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -146,15 +144,13 @@ async fn test_parallel_scan_with_format(flat_format: bool) {
engine.stop().await.unwrap();
scan_in_parallel(&mut env, region_id, &table_dir, 0, 1, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 0, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 1, 1, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 1, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 2, 1, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 2, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 2, 8, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 4, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 4, 8, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 8, 2, flat_format).await;
scan_in_parallel(&mut env, region_id, &table_dir, 8, flat_format).await;
}

View File

@@ -58,7 +58,7 @@ async fn test_partition_filter_basic_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -84,7 +84,7 @@ async fn test_scan_projection_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -141,7 +141,7 @@ async fn test_scan_projection_without_primary_key_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -32,7 +32,7 @@ async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str, flat_format: b
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -180,7 +180,7 @@ async fn test_prune_memtable_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -264,7 +264,7 @@ async fn test_prune_memtable_complex_expr_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -327,7 +327,7 @@ async fn test_mem_range_prune_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -392,7 +392,7 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -37,7 +37,7 @@ async fn test_remap_manifests_invalid_partition_expr_with_format(flat_format: bo
let mut env = TestEnv::with_prefix("invalid-partition-expr").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -83,7 +83,7 @@ async fn test_remap_manifests_invalid_region_state_with_format(flat_format: bool
let mut env = TestEnv::with_prefix("invalid-region-state").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -123,7 +123,7 @@ async fn test_remap_manifests_invalid_input_regions_with_format(flat_format: boo
let mut env = TestEnv::with_prefix("invalid-input-regions").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -166,7 +166,7 @@ async fn test_remap_manifests_success_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -41,7 +41,7 @@ async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -176,7 +176,7 @@ async fn test_max_concurrent_scan_files() {
async fn test_max_concurrent_scan_files_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
let config = MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
max_concurrent_scan_files: 2,
..Default::default()
};
@@ -235,7 +235,7 @@ async fn test_series_scan_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -70,7 +70,7 @@ async fn test_set_role_state_gracefully_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -141,7 +141,7 @@ async fn test_set_role_state_gracefully_not_exist_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -166,7 +166,7 @@ async fn test_write_downgrading_region_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -220,7 +220,7 @@ async fn test_unified_state_transitions_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -329,7 +329,7 @@ async fn test_restricted_state_transitions_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -72,7 +72,7 @@ async fn test_staging_state_integration_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -130,7 +130,7 @@ async fn test_staging_blocks_alter_operations_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -171,7 +171,7 @@ async fn test_staging_blocks_truncate_operations_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -308,7 +308,7 @@ async fn test_staging_write_partition_expr_version_with_format(flat_format: bool
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -505,7 +505,7 @@ async fn test_staging_manifest_directory_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -657,7 +657,7 @@ async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool)
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -883,7 +883,7 @@ async fn test_enter_staging_writes_partition_expr_change_action_with_format(flat
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -947,7 +947,7 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -1032,7 +1032,7 @@ async fn test_write_stall_on_enter_staging_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
None,
@@ -1156,7 +1156,7 @@ async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let partition_expr = default_partition_expr();
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -80,7 +80,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -112,7 +112,7 @@ async fn test_sync_after_flush_region_with_format(flat_format: bool) {
// Open the region on the follower engine
let follower_engine = env
.create_follower_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -189,7 +189,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -224,7 +224,7 @@ async fn test_sync_after_alter_region_with_format(flat_format: bool) {
// Open the region on the follower engine
let follower_engine = env
.create_follower_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;

View File

@@ -41,7 +41,7 @@ async fn test_engine_truncate_region_basic_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-basic").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -104,7 +104,7 @@ async fn test_engine_put_data_after_truncate_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-put").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -180,7 +180,7 @@ async fn test_engine_truncate_after_flush_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-flush").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -270,7 +270,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
let mut env = TestEnv::with_prefix("truncate-reopen").await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
})
.await;
@@ -310,7 +310,7 @@ async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)
@@ -355,7 +355,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
let engine = env
.create_engine_with(
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
Some(write_buffer_manager),
@@ -436,7 +436,7 @@ async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
default_flat_format: flat_format,
..Default::default()
},
)

View File

@@ -634,7 +634,7 @@ impl RegionFlushTask {
.options
.sst_format
.map(|f| f == FormatType::Flat)
.unwrap_or(self.engine_config.default_experimental_flat_format);
.unwrap_or(self.engine_config.default_flat_format);
SstWriteRequest {
op_type: OperationType::Flush,
metadata: version.metadata.clone(),

View File

@@ -421,7 +421,7 @@ impl MemtableBuilderProvider {
let flat_format = options
.sst_format
.map(|format| format == FormatType::Flat)
.unwrap_or(self.config.default_experimental_flat_format);
.unwrap_or(self.config.default_flat_format);
if flat_format {
if options.memtable.is_some() {
common_telemetry::info!(

View File

@@ -46,7 +46,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
#[cfg(feature = "enterprise")]
use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
@@ -219,8 +219,6 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_strategy: CacheStrategy,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
max_concurrent_scan_files: usize,
/// Whether to ignore inverted index.
@@ -251,7 +249,6 @@ impl ScanRegion {
access_layer,
request,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
ignore_inverted_index: false,
ignore_fulltext_index: false,
@@ -263,16 +260,6 @@ impl ScanRegion {
}
}
/// Sets parallel scan task channel size.
#[must_use]
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
@@ -527,7 +514,6 @@ impl ScanRegion {
.with_inverted_index_appliers(inverted_index_appliers)
.with_bloom_filter_index_appliers(bloom_filter_appliers)
.with_fulltext_index_appliers(fulltext_index_appliers)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_max_concurrent_scan_files(self.max_concurrent_scan_files)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
@@ -814,8 +800,6 @@ pub struct ScanInput {
pub(crate) cache_strategy: CacheStrategy,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Maximum number of SST files to scan concurrently.
pub(crate) max_concurrent_scan_files: usize,
/// Index appliers.
@@ -863,7 +847,6 @@ impl ScanInput {
files: Vec::new(),
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
inverted_index_appliers: [None, None],
bloom_filter_index_appliers: [None, None],
@@ -928,16 +911,6 @@ impl ScanInput {
self
}
/// Sets scan task channel size.
#[must_use]
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
/// Sets maximum number of SST files to scan concurrently.
#[must_use]
pub(crate) fn with_max_concurrent_scan_files(
@@ -1072,6 +1045,7 @@ impl ScanInput {
&self,
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
channel_size: usize,
) -> Result<Vec<Source>> {
if sources.len() <= 1 {
return Ok(sources);
@@ -1081,7 +1055,7 @@ impl ScanInput {
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
let (sender, receiver) = mpsc::channel(channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
@@ -1256,6 +1230,7 @@ impl ScanInput {
&self,
sources: Vec<BoxedRecordBatchStream>,
semaphore: Arc<Semaphore>,
channel_size: usize,
) -> Result<Vec<BoxedRecordBatchStream>> {
if sources.len() <= 1 {
return Ok(sources);
@@ -1265,7 +1240,7 @@ impl ScanInput {
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
let (sender, receiver) = mpsc::channel(channel_size);
self.spawn_flat_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Box::pin(stream) as _

View File

@@ -48,11 +48,11 @@ use crate::sst::file::{FileTimeRange, RegionFileId};
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
/// Per-file scan metrics.
#[derive(Default, Clone)]
@@ -1231,15 +1231,19 @@ const NUM_SERIES_THRESHOLD: u64 = 10240;
/// 60 samples per hour.
const BATCH_SIZE_THRESHOLD: u64 = 50;
/// Returns true if splitting flat record batches may improve merge performance.
/// Returns the estimated rows per batch after splitting if splitting flat record batches
/// may improve merge performance. Returns `None` if splitting is not beneficial.
pub(crate) fn should_split_flat_batches_for_merge(
stream_ctx: &Arc<StreamContext>,
range_meta: &RangeMeta,
) -> bool {
) -> Option<usize> {
// Number of files to split and scan.
let mut num_files_to_split = 0;
let mut num_mem_rows = 0;
let mut num_mem_series = 0;
// Total rows and series for estimating batch size after splitting.
let mut total_rows: u64 = 0;
let mut total_series: u64 = 0;
// Checks each file range, returns early if any range is not splittable.
// For mem ranges, we collect the total number of rows and series because the number of rows in a
// mem range may be too small.
@@ -1261,23 +1265,49 @@ pub(crate) fn should_split_flat_batches_for_merge(
debug_assert!(file.meta_ref().num_rows > 0);
if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
// We can't split batches in a file.
return false;
return None;
} else {
num_files_to_split += 1;
total_rows += file.meta_ref().num_rows;
total_series += file.meta_ref().num_series;
}
}
// Skips non-file and non-mem ranges.
}
if num_files_to_split > 0 {
let should_split = if num_files_to_split > 0 {
// We mainly consider file ranges because they have enough data for sampling.
true
} else if num_mem_series > 0 && num_mem_rows > 0 {
// If we don't have files to scan, we check whether to split by the memtable.
can_split_series(num_mem_rows as u64, num_mem_series as u64)
} else if num_mem_series > 0
&& num_mem_rows > 0
&& can_split_series(num_mem_rows as u64, num_mem_series as u64)
{
total_rows += num_mem_rows as u64;
total_series += num_mem_series as u64;
true
} else {
false
};
if !should_split {
return None;
}
// Estimate rows per batch after splitting.
let estimated_batch_size = if total_series > 0 && total_rows > 0 {
((total_rows / total_series) as usize).clamp(1, DEFAULT_READ_BATCH_SIZE)
} else {
// No valid estimate available, use a conservative fallback.
DEFAULT_READ_BATCH_SIZE / 4
};
Some(estimated_batch_size)
}
/// Computes the channel size for parallel scan based on the estimated rows per batch.
/// The channel should buffer approximately `2 * DEFAULT_READ_BATCH_SIZE` rows.
pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> usize {
let size = 2 * DEFAULT_READ_BATCH_SIZE / estimated_rows_per_batch.max(1);
size.clamp(2, 64)
}
fn can_split_series(num_rows: u64, num_series: u64) -> bool {
@@ -1555,3 +1585,235 @@ pub(crate) fn split_record_batch(record_batch: RecordBatch, batches: &mut VecDeq
batches.push_back(record_batch.slice(start, rows_in_batch));
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Instant;
use common_time::Timestamp;
use smallvec::{SmallVec, smallvec};
use store_api::storage::RegionId;
use super::*;
use crate::cache::CacheStrategy;
use crate::memtable::{
BoxedBatchIterator, BoxedRecordBatchIterator, IterBuilder, MemtableRange,
MemtableRangeContext, MemtableStats,
};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{MemRangeBuilder, SourceIndex};
use crate::read::scan_region::ScanInput;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::test_util::memtable_util::metadata_for_test;
use crate::test_util::scheduler_util::SchedulerEnv;
struct EmptyIterBuilder;
impl IterBuilder for EmptyIterBuilder {
fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
fn is_record_batch(&self) -> bool {
true
}
fn build_record_batch(
&self,
_time_range: Option<(Timestamp, Timestamp)>,
_metrics: Option<MemScanMetrics>,
) -> Result<BoxedRecordBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
}
async fn new_test_stream_ctx(
files: Vec<FileHandle>,
memtables: Vec<MemRangeBuilder>,
) -> Arc<StreamContext> {
let env = SchedulerEnv::new().await;
let metadata = metadata_for_test();
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let input = ScanInput::new(env.access_layer.clone(), mapper)
.with_cache(CacheStrategy::Disabled)
.with_memtables(memtables)
.with_files(files);
Arc::new(StreamContext {
input,
ranges: Vec::new(),
scan_fingerprint: None,
query_start: Instant::now(),
})
}
fn new_test_file(num_rows: u64, num_series: u64) -> FileHandle {
let meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id: Default::default(),
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
num_rows,
num_series,
..Default::default()
};
FileHandle::new(meta, Arc::new(NoopFilePurger))
}
fn new_test_memtable(num_rows: usize, series_count: usize) -> MemRangeBuilder {
let context = Arc::new(MemtableRangeContext::new(
0,
Box::new(EmptyIterBuilder),
Default::default(),
));
let stats = MemtableStats {
time_range: Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
)),
num_rows,
num_ranges: 1,
series_count,
..Default::default()
};
let range = MemtableRange::new(context, stats.clone());
MemRangeBuilder::new(range, stats)
}
fn new_test_range_meta(row_group_indices: SmallVec<[RowGroupIndex; 2]>) -> RangeMeta {
let indices = row_group_indices
.iter()
.map(|row_group_index| SourceIndex {
index: row_group_index.index,
num_row_groups: 1,
})
.collect();
RangeMeta {
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
indices,
row_group_indices,
num_rows: 0,
}
}
#[tokio::test]
async fn test_should_split_flat_batches_for_merge_uses_splittable_file_rows_per_series() {
let num_rows = SPLIT_ROW_THRESHOLD * 2;
let num_series = (num_rows / 100).max(1);
let stream_ctx =
new_test_stream_ctx(vec![new_test_file(num_rows, num_series)], vec![]).await;
let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
}]);
assert_eq!(
Some((num_rows / num_series) as usize),
should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
);
}
#[tokio::test]
async fn test_should_split_flat_batches_for_merge_skips_small_or_unknown_series_files() {
let stream_ctx = new_test_stream_ctx(
vec![
new_test_file(SPLIT_ROW_THRESHOLD.saturating_sub(1), 1),
new_test_file(SPLIT_ROW_THRESHOLD * 2, 0),
],
vec![],
)
.await;
let range_meta = new_test_range_meta(smallvec![
RowGroupIndex {
index: 0,
row_group_index: 0,
},
RowGroupIndex {
index: 1,
row_group_index: 0,
}
]);
assert_eq!(
None,
should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
);
}
#[tokio::test]
async fn test_should_split_flat_batches_for_merge_returns_none_for_unsplittable_file() {
let num_series =
(SPLIT_ROW_THRESHOLD / (BATCH_SIZE_THRESHOLD - 1)).max(NUM_SERIES_THRESHOLD) + 1;
let stream_ctx =
new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD, num_series)], vec![]).await;
let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
}]);
assert_eq!(
None,
should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
);
}
#[tokio::test]
async fn test_should_split_flat_batches_for_merge_falls_back_to_memtables() {
let stream_ctx = new_test_stream_ctx(vec![], vec![new_test_memtable(5_000, 100)]).await;
let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
}]);
assert_eq!(
Some(50),
should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
);
}
#[tokio::test]
async fn test_should_split_flat_batches_for_merge_clamps_estimate() {
let stream_ctx =
new_test_stream_ctx(vec![new_test_file(SPLIT_ROW_THRESHOLD * 2, 1)], vec![]).await;
let range_meta = new_test_range_meta(smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
}]);
assert_eq!(
Some(DEFAULT_READ_BATCH_SIZE),
should_split_flat_batches_for_merge(&stream_ctx, &range_meta)
);
}
#[test]
fn test_compute_parallel_channel_size_clamps_to_max_for_small_batches() {
assert_eq!(64, compute_parallel_channel_size(0));
assert_eq!(64, compute_parallel_channel_size(1));
}
#[test]
fn test_compute_parallel_channel_size_returns_expected_mid_range_size() {
assert_eq!(
4,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE / 2)
);
}
#[test]
fn test_compute_parallel_channel_size_clamps_to_min_for_large_batches() {
assert_eq!(2, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE));
assert_eq!(
2,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
);
}
}

View File

@@ -43,8 +43,8 @@ use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::range::RangeMeta;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_flat_file_ranges,
scan_flat_mem_ranges, should_split_flat_batches_for_merge,
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
@@ -176,7 +176,14 @@ impl SeqScan {
partition_ranges.len(),
sources.len()
);
Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
Self::build_flat_reader_from_sources(
stream_ctx,
sources,
None,
None,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
)
.await
}
/// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
@@ -187,13 +194,16 @@ impl SeqScan {
mut sources: Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
channel_size: usize,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
sources = stream_ctx
.input
.create_parallel_flat_sources(sources, semaphore.clone())?;
sources = stream_ctx.input.create_parallel_flat_sources(
sources,
semaphore.clone(),
channel_size,
)?;
}
}
@@ -322,7 +332,7 @@ impl SeqScan {
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_flat_sources(
let split_batch_size = build_flat_sources(
&stream_ctx,
&part_range,
compaction,
@@ -332,8 +342,11 @@ impl SeqScan {
file_scan_semaphore.clone(),
).await?;
let channel_size = compute_parallel_channel_size(
split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
);
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
.await?;
let mut metrics = ScannerMetrics {
@@ -529,6 +542,7 @@ impl fmt::Debug for SeqScan {
}
/// Builds flat sources for the partition range and push them to the `sources` vector.
/// Returns the estimated rows per batch after splitting if splitting is applied, or `None`.
pub(crate) async fn build_flat_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
@@ -537,7 +551,7 @@ pub(crate) async fn build_flat_sources(
partition_pruner: Arc<PartitionPruner>,
sources: &mut Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<()> {
) -> Result<Option<usize>> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
@@ -561,10 +575,11 @@ pub(crate) async fn build_flat_sources(
};
let num_indices = range_meta.row_group_indices.len();
if num_indices == 0 {
return Ok(());
return Ok(None);
}
let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
let should_split = split_batch_size.is_some();
sources.reserve(num_indices);
let mut ordered_sources = Vec::with_capacity(num_indices);
ordered_sources.resize_with(num_indices, || None);
@@ -642,7 +657,7 @@ pub(crate) async fn build_flat_sources(
);
}
Ok(())
Ok(split_batch_size)
}
#[cfg(test)]

View File

@@ -47,9 +47,12 @@ use crate::error::{
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_parallel_channel_size,
};
use crate::read::seq_scan::{SeqScan, build_flat_sources};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
@@ -482,10 +485,11 @@ impl SeriesDistributor {
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
let mut min_batch_size: Option<usize> = None;
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
build_flat_sources(
let split_batch_size = build_flat_sources(
&self.stream_ctx,
part_range,
false,
@@ -495,15 +499,21 @@ impl SeriesDistributor {
self.semaphore.clone(),
)
.await?;
if let Some(size) = split_batch_size {
min_batch_size = Some(min_batch_size.map_or(size, |cur| cur.min(size)));
}
}
}
// Builds a flat reader that merge sources from all parts.
let channel_size =
compute_parallel_channel_size(min_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE));
let mut reader = SeqScan::build_flat_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
Some(&part_metrics),
channel_size,
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();

View File

@@ -269,7 +269,7 @@ impl RegionOpener {
// Sets the sst_format based on options or flat_format flag
let sst_format = if let Some(format) = options.sst_format {
format
} else if config.default_experimental_flat_format {
} else if config.default_flat_format {
options.sst_format = Some(FormatType::Flat);
FormatType::Flat
} else {
@@ -309,7 +309,7 @@ impl RegionOpener {
debug!(
"Create region {} with options: {:?}, default_flat_format: {}",
region_id, options, config.default_experimental_flat_format
region_id, options, config.default_flat_format
);
let version = VersionBuilder::new(metadata, mutable)
@@ -626,8 +626,10 @@ pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut R
manifest.sst_format,
manifest.metadata.region_id,
);
options.sst_format = Some(manifest.sst_format);
}
// Always set sst_format from manifest to ensure it's explicitly stored,
// even when the default matches the manifest value.
options.sst_format = Some(manifest.sst_format);
if let Some(manifest_append_mode) = manifest.append_mode
&& options.append_mode != manifest_append_mode
{

View File

@@ -216,15 +216,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// If the format is unchanged, we also consider the option is altered.
if new_format != current_options.sst_format.unwrap_or_default() {
all_options_altered = false;
// Validates the format type.
ensure!(
new_format == FormatType::Flat,
store_api::metadata::InvalidRegionRequestSnafu {
region_id: region.region_id,
err: "Only allow changing format type to flat",
}
);
}
}
SetRegionOption::AppendMode(new_append_mode) => {
@@ -274,8 +265,6 @@ fn new_region_options_on_empty_memtable(
SetRegionOption::Format(format_str) => {
// Safety: handle_alter_region_options_fast() has validated this.
let new_format = format_str.parse::<FormatType>().unwrap();
assert_eq!(FormatType::Flat, new_format);
current_options.sst_format = Some(new_format);
}
SetRegionOption::AppendMode(new_append_mode) => {

View File

@@ -1566,12 +1566,11 @@ index_cache_percent = 20
enable_refill_cache_on_read = true
manifest_cache_size = "256MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
scan_memory_on_exhausted = "fail"
min_compaction_interval = "0s"
default_experimental_flat_format = false
default_flat_format = true
[region_engine.mito.index]
aux_path = ""

View File

@@ -42,6 +42,26 @@ ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
Affected Rows: 0
SHOW CREATE TABLE test_alt_format;
+-----------------+------------------------------------------------+
| Table | Create Table |
+-----------------+------------------------------------------------+
| test_alt_format | CREATE TABLE IF NOT EXISTS "test_alt_format" ( |
| | "h" INT NULL, |
| | "i" INT NULL DEFAULT 0, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | "k" INT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("h") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | sst_format = 'flat' |
| | ) |
+-----------------+------------------------------------------------+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
@@ -116,11 +136,68 @@ SELECT i, h FROM test_alt_format;
| 23 | 13 |
+----+----+
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
-- allow to change from flat to primary_key
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
Affected Rows: 0
SHOW CREATE TABLE test_alt_format;
+-----------------+------------------------------------------------+
| Table | Create Table |
+-----------------+------------------------------------------------+
| test_alt_format | CREATE TABLE IF NOT EXISTS "test_alt_format" ( |
| | "h" INT NULL, |
| | "i" INT NULL DEFAULT 0, |
| | "j" TIMESTAMP(3) NOT NULL, |
| | "k" INT NULL, |
| | TIME INDEX ("j"), |
| | PRIMARY KEY ("h") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | sst_format = 'primary_key' |
| | ) |
+-----------------+------------------------------------------------+
INSERT INTO test_alt_format (h, j, i) VALUES (14, 4, 34);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+----+-------------------------+----+
| h | i | j | k |
+----+----+-------------------------+----+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
| 12 | 0 | 1970-01-01T00:00:00.002 | |
| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
| 14 | 34 | 1970-01-01T00:00:00.004 | |
+----+----+-------------------------+----+
ADMIN flush_table('test_alt_format');
+--------------------------------------+
| ADMIN flush_table('test_alt_format') |
+--------------------------------------+
| 0 |
+--------------------------------------+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+----+-------------------------+----+
| h | i | j | k |
+----+----+-------------------------+----+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
| 12 | 0 | 1970-01-01T00:00:00.002 | |
| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
| 14 | 34 | 1970-01-01T00:00:00.004 | |
+----+----+-------------------------+----+
DROP TABLE test_alt_format;
@@ -167,6 +244,27 @@ ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
Affected Rows: 0
SHOW CREATE TABLE alt_format_phy;
+----------------+-----------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------+
| alt_format_phy | CREATE TABLE IF NOT EXISTS "alt_format_phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "k" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "k") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '', |
| | sst_format = 'flat' |
| | ) |
+----------------+-----------------------------------------------+
SELECT * FROM t1 ORDER BY ts ASC;
+-------------+---+---------------------+------+
@@ -202,11 +300,47 @@ SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
| example.com | 2022-01-02T00:00:00 | 4.56 |
+-------------+---------------------+------+
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
-- allow to change from flat to primary_key
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
Affected Rows: 0
SHOW CREATE TABLE alt_format_phy;
+----------------+-----------------------------------------------+
| Table | Create Table |
+----------------+-----------------------------------------------+
| alt_format_phy | CREATE TABLE IF NOT EXISTS "alt_format_phy" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | "k" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host", "k") |
| | ) |
| | |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '', |
| | sst_format = 'primary_key' |
| | ) |
+----------------+-----------------------------------------------+
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:02', 5.0, 'example.com');
Affected Rows: 1
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
+-------------+---------------------+------+
| host | ts | val |
+-------------+---------------------+------+
| example.com | 2022-01-01T00:00:00 | 1.23 |
| example.com | 2022-01-01T00:00:01 | 3.0 |
| example.com | 2022-01-01T00:00:02 | 5.0 |
| example.com | 2022-01-02T00:00:00 | 4.56 |
+-------------+---------------------+------+
DROP TABLE t1;

View File

@@ -16,6 +16,8 @@ SELECT i, h FROM test_alt_format;
ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
SHOW CREATE TABLE test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
@@ -37,10 +39,21 @@ SELECT * FROM test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
-- allow to change from flat to primary_key
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
SHOW CREATE TABLE test_alt_format;
INSERT INTO test_alt_format (h, j, i) VALUES (14, 4, 34);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
ADMIN flush_table('test_alt_format');
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
DROP TABLE test_alt_format;
CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key");
@@ -62,6 +75,8 @@ SELECT * FROM t1 ORDER BY ts ASC;
ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
SHOW CREATE TABLE alt_format_phy;
SELECT * FROM t1 ORDER BY ts ASC;
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
@@ -72,10 +87,16 @@ INSERT INTO t1 (ts, val, host) VALUES
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
-- allow to change from flat to primary_key
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
SHOW CREATE TABLE alt_format_phy;
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:02', 5.0, 'example.com');
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
DROP TABLE t1;
DROP TABLE alt_format_phy;

View File

@@ -6,7 +6,7 @@ rpc_runtime_size = 8
[[region_engine]]
[region_engine.mito]
{{ if enable_flat_format }}
default_experimental_flat_format = true
default_flat_format = true
{{ endif }}
[wal]

View File

@@ -5,7 +5,7 @@ require_lease_before_startup = true
[[region_engine]]
[region_engine.mito]
{{ if enable_flat_format }}
default_experimental_flat_format = true
default_flat_format = true
{{ endif }}
[wal]

View File

@@ -103,7 +103,7 @@ pub struct BareCommand {
#[clap(long)]
extra_args: Vec<String>,
/// Enable flat format for storage engine (sets default_experimental_flat_format = true).
/// Enable flat format for storage engine (sets default_flat_format = true).
#[clap(long, default_value = "false")]
enable_flat_format: bool,
}