diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 268c1f4855..63e7c029ae 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -129,10 +129,20 @@ fn assert_column_metadatas(column_name: &[(&str, ColumnId)], column_metadatas: & #[tokio::test] async fn test_alter_region() { + test_alter_region_with_format(false).await; + test_alter_region_with_format(true).await; +} + +async fn test_alter_region_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -197,7 +207,15 @@ async fn test_alter_region() { ); // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -239,8 +257,18 @@ fn build_rows_for_tags( #[tokio::test] async fn test_put_after_alter() { + test_put_after_alter_with_format(false).await; + test_put_after_alter_with_format(true).await; +} + +async fn test_put_after_alter_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -284,7 +312,15 @@ async fn test_put_after_alter() { scan_check_after_alter(&engine, region_id, expected).await; // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -339,10 +375,20 @@ async fn test_put_after_alter() { #[tokio::test] async fn test_alter_region_retry() { + test_alter_region_retry_with_format(false).await; + test_alter_region_retry_with_format(true).await; +} + +async fn test_alter_region_retry_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -397,12 +443,25 @@ async fn test_alter_region_retry() { #[tokio::test] async fn test_alter_on_flushing() { + test_alter_on_flushing_with_format(false).await; + test_alter_on_flushing_with_format(true).await; +} + +async fn test_alter_on_flushing_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); @@ -501,12 +560,25 @@ async fn test_alter_on_flushing() { #[tokio::test] async fn test_alter_column_fulltext_options() { + test_alter_column_fulltext_options_with_format(false).await; + test_alter_column_fulltext_options_with_format(true).await; +} + +async fn test_alter_column_fulltext_options_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); @@ -603,7 +675,15 @@ async fn test_alter_column_fulltext_options() { check_region_version(&engine, region_id, 1, 3, 1, 3); // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -624,12 +704,25 @@ async fn test_alter_column_fulltext_options() { #[tokio::test] async fn test_alter_column_set_inverted_index() { + test_alter_column_set_inverted_index_with_format(false).await; + test_alter_column_set_inverted_index_with_format(true).await; +} + +async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); @@ -717,7 +810,15 @@ async fn test_alter_column_set_inverted_index() { check_region_version(&engine, region_id, 1, 3, 1, 3); // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -738,12 +839,25 @@ async fn test_alter_column_set_inverted_index() { #[tokio::test] async fn test_alter_region_ttl_options() { + test_alter_region_ttl_options_with_format(false).await; + test_alter_region_ttl_options_with_format(true).await; +} + +async fn test_alter_region_ttl_options_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(AlterFlushListener::default()); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); @@ -788,12 +902,25 @@ async fn test_alter_region_ttl_options() { #[tokio::test] async fn test_write_stall_on_altering() { + test_write_stall_on_altering_with_format(false).await; + test_write_stall_on_altering_with_format(true).await; +} + +async fn test_write_stall_on_altering_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(NotifyRegionChangeResultListener::default()); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 3357887fd7..ccdcbb3372 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -29,10 +29,20 @@ use crate::test_util::{ #[tokio::test] async fn test_append_mode_write_query() { + test_append_mode_write_query_with_format(false).await; + test_append_mode_write_query_with_format(true).await; +} + +async fn test_append_mode_write_query_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -89,9 +99,15 @@ async fn test_append_mode_write_query() { #[tokio::test] async fn test_append_mode_compaction() { + test_append_mode_compaction_with_format(false).await; + test_append_mode_compaction_with_format(true).await; +} + +async fn test_append_mode_compaction_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, ..Default::default() }) .await; @@ -190,6 +206,7 @@ async fn test_append_mode_compaction() { .reopen_engine( engine, MitoConfig { + default_experimental_flat_format: flat_format, ..Default::default() }, ) diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index f88b01be58..38b4df7a9e 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -48,8 +48,18 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_new_stop() { + test_engine_new_stop_with_format(false).await; + test_engine_new_stop_with_format(true).await; +} + +async fn test_engine_new_stop_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("engine-stop").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -75,8 +85,18 @@ async fn test_engine_new_stop() { #[tokio::test] async fn test_write_to_region() { + test_write_to_region_with_format(false).await; + test_write_to_region_with_format(true).await; +} + +async fn test_write_to_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("write-to-region").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -97,8 +117,12 @@ async fn test_write_to_region() { } #[apply(multiple_log_store_factories)] - async fn test_region_replay(factory: Option) { + test_region_replay_with_format(factory.clone(), false).await; + test_region_replay_with_format(factory, true).await; +} + +async fn test_region_replay_with_format(factory: Option, flat_format: bool) { use common_wal::options::{KafkaWalOptions, WalOptions}; common_telemetry::init_default_ut_logging(); @@ -108,7 +132,12 @@ async fn test_region_replay(factory: Option) { let mut env = TestEnv::with_prefix("region-replay") .await .with_log_store_factory(factory.clone()); - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let topic = prepare_test_for_kafka_log_store(&factory).await; @@ -136,7 +165,15 @@ async fn test_region_replay(factory: Option) { }; put_rows(&engine, region_id, rows).await; - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; let mut options = HashMap::new(); if let Some(topic) = &topic { @@ -189,8 +226,18 @@ async fn test_region_replay(factory: Option) { #[tokio::test] async fn test_write_query_region() { + test_write_query_region_with_format(false).await; + test_write_query_region_with_format(true).await; +} + +async fn test_write_query_region_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -223,8 +270,18 @@ async fn test_write_query_region() { #[tokio::test] async fn test_different_order() { + test_different_order_with_format(false).await; + test_different_order_with_format(true).await; +} + +async fn test_different_order_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build(); @@ -274,8 +331,18 @@ async fn test_different_order() { #[tokio::test] async fn test_different_order_and_type() { + test_different_order_and_type_with_format(false).await; + test_different_order_and_type_with_format(true).await; +} + +async fn test_different_order_and_type_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); // tag_0, tag_1, field_0, field_1, ts, @@ -326,10 +393,20 @@ async fn test_different_order_and_type() { #[tokio::test] async fn test_put_delete() { + test_put_delete_with_format(false).await; + test_put_delete_with_format(true).await; +} + +async fn test_put_delete_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -380,8 +457,18 @@ async fn test_put_delete() { #[tokio::test] async fn test_delete_not_null_fields() { + test_delete_not_null_fields_with_format(false).await; + test_delete_not_null_fields_with_format(true).await; +} + +async fn test_delete_not_null_fields_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().all_not_null(true).build(); @@ -429,8 +516,18 @@ async fn test_delete_not_null_fields() { #[tokio::test] async fn test_put_overwrite() { + test_put_overwrite_with_format(false).await; + test_put_overwrite_with_format(true).await; +} + +async fn test_put_overwrite_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -489,8 +586,18 @@ async fn test_put_overwrite() { #[tokio::test] async fn test_absent_and_invalid_columns() { + test_absent_and_invalid_columns_with_format(false).await; + test_absent_and_invalid_columns_with_format(true).await; +} + +async fn test_absent_and_invalid_columns_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); // tag_0, field_0, field_1, ts, @@ -531,8 +638,18 @@ async fn test_absent_and_invalid_columns() { #[tokio::test] async fn test_region_usage() { + test_region_usage_with_format(false).await; + test_region_usage_with_format(true).await; +} + +async fn test_region_usage_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("region_usage").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -583,11 +700,20 @@ async fn test_region_usage() { #[tokio::test] async fn test_engine_with_write_cache() { + test_engine_with_write_cache_with_format(false).await; + test_engine_with_write_cache_with_format(true).await; +} + +async fn test_engine_with_write_cache_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let path = env.data_home().to_str().unwrap().to_string(); - let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None); + let mito_config = MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + } + .enable_write_cache(path, ReadableSize::mb(512), None); let engine = env.create_engine(mito_config).await; let region_id = RegionId::new(1, 1); @@ -625,9 +751,15 @@ async fn test_engine_with_write_cache() { #[tokio::test] async fn test_cache_null_primary_key() { + test_cache_null_primary_key_with_format(false).await; + test_cache_null_primary_key_with_format(true).await; +} + +async fn test_cache_null_primary_key_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, vector_cache_size: ReadableSize::mb(32), ..Default::default() }) @@ -726,8 +858,40 @@ async fn test_cache_null_primary_key() { #[tokio::test] async fn test_list_ssts() { + test_list_ssts_with_format(false, r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#" +StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; + test_list_ssts_with_format(true, r#" +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" +StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } +StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; +} + +async fn test_list_ssts_with_format( + flat_format: bool, + expected_manifest_ssts: &str, + expected_storage_ssts: &str, +) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // Create 3 regions and write some rows to each of them let region_ids = vec![ @@ -784,13 +948,7 @@ async fn test_list_ssts() { .sorted() .collect::>() .join(""); - assert_eq!( - debug_format, - r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# - ); + assert_eq!(debug_format, expected_manifest_ssts,); // list from storage let storage_entries = engine @@ -808,26 +966,37 @@ ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: .sorted() .collect::>() .join(""); - assert_eq!( - debug_format, - r#" -StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } -StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } -StorageSstEntry { file_path: "test/11_0000000002/.parquet", file_size: None, last_modified_ms: None, node_id: None } -StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } -StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } -StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"# - ); + assert_eq!(debug_format, expected_storage_ssts,); } #[tokio::test] async fn test_all_index_metas_list_all_types() { + test_all_index_metas_list_all_types_with_format(false, r#" +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; + test_all_index_metas_list_all_types_with_format(true, r#" +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 92, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":66,\"inverted_index_size\":92,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } +PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await; +} + +async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expect_format: &str) { use datatypes::schema::{ FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions, SkippingIndexType, }; let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // One region with both fulltext backends and inverted index enabled, plus bloom skipping index let region_id = RegionId::new(11, 1); @@ -956,13 +1125,5 @@ async fn test_all_index_metas_list_all_types() { .map(|entry| format!("\n{:?}", entry)) .collect::(); - assert_eq!( - debug_format, - r#" -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None } -PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"# - ); + assert_eq!(debug_format, expect_format); } diff --git a/src/mito2/src/engine/batch_open_test.rs b/src/mito2/src/engine/batch_open_test.rs index 17eead987c..c718ef248c 100644 --- a/src/mito2/src/engine/batch_open_test.rs +++ b/src/mito2/src/engine/batch_open_test.rs @@ -35,6 +35,11 @@ use crate::test_util::{ #[apply(multiple_log_store_factories)] async fn test_batch_open(factory: Option) { + test_batch_open_with_format(factory.clone(), false).await; + test_batch_open_with_format(factory, true).await; +} + +async fn test_batch_open_with_format(factory: Option, flat_format: bool) { common_telemetry::init_default_ut_logging(); let Some(factory) = factory else { return; @@ -42,7 +47,12 @@ async fn test_batch_open(factory: Option) { let mut env = TestEnv::with_prefix("open-batch-regions") .await .with_log_store_factory(factory.clone()); - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let topic = prepare_test_for_kafka_log_store(&factory).await; let num_regions = 3u32; @@ -143,7 +153,15 @@ async fn test_batch_open(factory: Option) { )); // Reopen engine. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; let mut results = engine .handle_batch_open_requests(4, requests) .await @@ -161,6 +179,11 @@ async fn test_batch_open(factory: Option) { #[apply(multiple_log_store_factories)] async fn test_batch_open_err(factory: Option) { + test_batch_open_err_with_format(factory.clone(), false).await; + test_batch_open_err_with_format(factory, true).await; +} + +async fn test_batch_open_err_with_format(factory: Option, flat_format: bool) { common_telemetry::init_default_ut_logging(); let Some(factory) = factory else { return; @@ -168,7 +191,12 @@ async fn test_batch_open_err(factory: Option) { let mut env = TestEnv::with_prefix("open-batch-regions-err") .await .with_log_store_factory(factory.clone()); - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let topic = prepare_test_for_kafka_log_store(&factory).await; let mut options = HashMap::new(); if let Some(topic) = &topic { diff --git a/src/mito2/src/engine/bump_committed_sequence_test.rs b/src/mito2/src/engine/bump_committed_sequence_test.rs index edf29deb6c..00d2c0f51c 100644 --- a/src/mito2/src/engine/bump_committed_sequence_test.rs +++ b/src/mito2/src/engine/bump_committed_sequence_test.rs @@ -26,9 +26,19 @@ use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows #[tokio::test] async fn test_bump_committed_sequence() { + test_bump_committed_sequence_with_format(false).await; + test_bump_committed_sequence_with_format(true).await; +} + +async fn test_bump_committed_sequence_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -83,7 +93,15 @@ async fn test_bump_committed_sequence() { assert_eq!(region.version_control.committed_sequence(), 43); // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -114,7 +132,15 @@ async fn test_bump_committed_sequence() { assert_eq!(region.version_control.current().version.flushed_sequence, 0); // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index 57e5551775..f0ee6e13f5 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -692,8 +692,18 @@ async fn test_local_catchup(factory: Option) { #[tokio::test] async fn test_catchup_not_exist() { + test_catchup_not_exist_with_format(false).await; + test_catchup_not_exist_with_format(true).await; +} + +async fn test_catchup_not_exist_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let non_exist_region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/close_test.rs b/src/mito2/src/engine/close_test.rs index 81c9c47b7c..965a4f6fff 100644 --- a/src/mito2/src/engine/close_test.rs +++ b/src/mito2/src/engine/close_test.rs @@ -21,8 +21,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_engine_close_region() { + test_engine_close_region_with_format(false).await; + test_engine_close_region_with_format(true).await; +} + +async fn test_engine_close_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("close").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); // It's okay to close a region doesn't exist. diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 57791221f9..a385d64fb5 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -135,9 +135,19 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec { #[tokio::test] async fn test_compaction_region() { + test_compaction_region_with_format(false).await; + test_compaction_region_with_format(true).await; +} + +async fn test_compaction_region_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -201,9 +211,19 @@ async fn test_compaction_region() { #[tokio::test] async fn test_infer_compaction_time_window() { + test_infer_compaction_time_window_with_format(false).await; + test_infer_compaction_time_window_with_format(true).await; +} + +async fn test_infer_compaction_time_window_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -342,9 +362,19 @@ async fn test_infer_compaction_time_window() { #[tokio::test] async fn test_compaction_overlapping_files() { + test_compaction_overlapping_files_with_format(false).await; + test_compaction_overlapping_files_with_format(true).await; +} + +async fn test_compaction_overlapping_files_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -403,9 +433,19 @@ async fn test_compaction_overlapping_files() { #[tokio::test] async fn test_compaction_region_with_overlapping() { + test_compaction_region_with_overlapping_with_format(false).await; + test_compaction_region_with_overlapping_with_format(true).await; +} + +async fn test_compaction_region_with_overlapping_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -451,9 +491,19 @@ async fn test_compaction_region_with_overlapping() { #[tokio::test] async fn test_compaction_region_with_overlapping_delete_all() { + test_compaction_region_with_overlapping_delete_all_with_format(false).await; + test_compaction_region_with_overlapping_delete_all_with_format(true).await; +} + +async fn test_compaction_region_with_overlapping_delete_all_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); @@ -507,12 +557,18 @@ async fn test_compaction_region_with_overlapping_delete_all() { // For issue https://github.com/GreptimeTeam/greptimedb/issues/3633 #[tokio::test] async fn test_readonly_during_compaction() { + test_readonly_during_compaction_with_format(false).await; + test_readonly_during_compaction_with_format(true).await; +} + +async fn test_readonly_during_compaction_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(CompactionListener::default()); let engine = env .create_engine_with( MitoConfig { + default_experimental_flat_format: flat_format, // Ensure there is only one background worker for purge task. max_background_purges: 1, ..Default::default() @@ -592,9 +648,19 @@ async fn test_readonly_during_compaction() { #[tokio::test] async fn test_compaction_update_time_window() { + test_compaction_update_time_window_with_format(false).await; + test_compaction_update_time_window_with_format(true).await; +} + +async fn test_compaction_update_time_window_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); @@ -688,9 +754,19 @@ async fn test_compaction_update_time_window() { #[tokio::test] async fn test_change_region_compaction_window() { + test_change_region_compaction_window_with_format(false).await; + test_change_region_compaction_window_with_format(true).await; +} + +async fn test_change_region_compaction_window_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); @@ -785,7 +861,15 @@ async fn test_change_region_compaction_window() { } // Reopen region. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -815,9 +899,19 @@ async fn test_change_region_compaction_window() { #[tokio::test] async fn test_open_overwrite_compaction_window() { + test_open_overwrite_compaction_window_with_format(false).await; + test_open_overwrite_compaction_window_with_format(true).await; +} + +async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); @@ -869,7 +963,15 @@ async fn test_open_overwrite_compaction_window() { ("compaction.type".to_string(), "twcs".to_string()), ("compaction.twcs.time_window".to_string(), "2h".to_string()), ]); - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index de4c199d53..7ba7aab225 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -28,8 +28,18 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_create_new_region() { + test_engine_create_new_region_with_format(false).await; + test_engine_create_new_region_with_format(true).await; +} + +async fn test_engine_create_new_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("new-region").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -43,8 +53,18 @@ async fn test_engine_create_new_region() { #[tokio::test] async fn test_engine_create_existing_region() { + test_engine_create_existing_region_with_format(false).await; + test_engine_create_existing_region_with_format(true).await; +} + +async fn test_engine_create_existing_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("create-existing").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); @@ -62,9 +82,19 @@ async fn test_engine_create_existing_region() { #[tokio::test] async fn test_engine_create_close_create_region() { + test_engine_create_close_create_region_with_format(false).await; + test_engine_create_close_create_region_with_format(true).await; +} + +async fn test_engine_create_close_create_region_with_format(flat_format: bool) { // This test will trigger create_or_open function. let mut env = TestEnv::with_prefix("create-close-create").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); @@ -93,8 +123,18 @@ async fn test_engine_create_close_create_region() { #[tokio::test] async fn test_engine_create_with_different_id() { + test_engine_create_with_different_id_with_format(false).await; + test_engine_create_with_different_id_with_format(true).await; +} + +async fn test_engine_create_with_different_id_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); @@ -112,8 +152,18 @@ async fn test_engine_create_with_different_id() { #[tokio::test] async fn test_engine_create_with_different_schema() { + test_engine_create_with_different_schema_with_format(false).await; + test_engine_create_with_different_schema_with_format(true).await; +} + +async fn test_engine_create_with_different_schema_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new(); @@ -132,8 +182,18 @@ async fn test_engine_create_with_different_schema() { #[tokio::test] async fn test_engine_create_with_different_primary_key() { + test_engine_create_with_different_primary_key_with_format(false).await; + test_engine_create_with_different_primary_key_with_format(true).await; +} + +async fn test_engine_create_with_different_primary_key_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let builder = CreateRequestBuilder::new().tag_num(2); @@ -152,8 +212,18 @@ async fn test_engine_create_with_different_primary_key() { #[tokio::test] async fn test_engine_create_with_options() { + test_engine_create_with_options_with_format(false).await; + test_engine_create_with_options_with_format(true).await; +} + +async fn test_engine_create_with_options_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -174,9 +244,22 @@ async fn test_engine_create_with_options() { #[tokio::test] async fn test_engine_create_with_custom_store() { + test_engine_create_with_custom_store_with_format(false).await; + test_engine_create_with_custom_store_with_format(true).await; +} + +async fn test_engine_create_with_custom_store_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let engine = env - .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) + .create_engine_with_multiple_object_stores( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + None, + &["Gcs"], + ) .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -210,8 +293,18 @@ async fn test_engine_create_with_custom_store() { #[tokio::test] async fn test_engine_create_with_memtable_opts() { + test_engine_create_with_memtable_opts_with_format(false).await; + test_engine_create_with_memtable_opts_with_format(true).await; +} + +async fn test_engine_create_with_memtable_opts_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -252,8 +345,18 @@ async fn test_engine_create_with_memtable_opts() { #[tokio::test] async fn create_with_partition_expr_persists_manifest() { + create_with_partition_expr_persists_manifest_with_format(false).await; + create_with_partition_expr_persists_manifest_with_format(true).await; +} + +async fn create_with_partition_expr_persists_manifest_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let expr_json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#; diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 6cc38822af..b6231aa5a2 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -33,12 +33,25 @@ use crate::worker::DROPPING_MARKER_FILE; #[tokio::test] async fn test_engine_drop_region() { + test_engine_drop_region_with_format(false).await; + test_engine_drop_region_with_format(true).await; +} + +async fn test_engine_drop_region_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::with_prefix("drop").await; let listener = Arc::new(DropListener::new(Duration::from_millis(100))); let engine = env - .create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None) + .create_engine_with( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) .await; let region_id = RegionId::new(1, 1); @@ -107,6 +120,11 @@ async fn test_engine_drop_region() { #[tokio::test] async fn test_engine_drop_region_for_custom_store() { + test_engine_drop_region_for_custom_store_with_format(false).await; + test_engine_drop_region_for_custom_store_with_format(true).await; +} + +async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); async fn setup( engine: &MitoEngine, @@ -148,7 +166,10 @@ async fn test_engine_drop_region_for_custom_store() { let listener = Arc::new(DropListener::new(Duration::from_millis(100))); let engine = env .create_engine_with_multiple_object_stores( - MitoConfig::default(), + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, None, Some(listener.clone()), &["Gcs"], diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 63601b6cd6..01bdf60070 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -33,6 +33,11 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_edit_region_schedule_compaction() { + test_edit_region_schedule_compaction_with_format(false).await; + test_edit_region_schedule_compaction_with_format(true).await; +} + +async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) { let mut env = TestEnv::new().await; struct EditRegionListener { @@ -49,6 +54,7 @@ async fn test_edit_region_schedule_compaction() { let (tx, mut rx) = oneshot::channel(); let config = MitoConfig { min_compaction_interval: Duration::from_secs(60 * 60), + default_experimental_flat_format: flat_format, ..Default::default() }; let time_provider = Arc::new(MockTimeProvider::new(current_time_millis())); @@ -124,6 +130,11 @@ async fn test_edit_region_schedule_compaction() { #[tokio::test] async fn test_edit_region_fill_cache() { + test_edit_region_fill_cache_with_format(false).await; + test_edit_region_fill_cache_with_format(true).await; +} + +async fn test_edit_region_fill_cache_with_format(flat_format: bool) { let mut env = TestEnv::new().await; struct EditRegionListener { @@ -143,6 +154,7 @@ async fn test_edit_region_fill_cache() { MitoConfig { // Write cache must be enabled to download the ingested SST file. enable_write_cache: true, + default_experimental_flat_format: flat_format, ..Default::default() }, None, @@ -200,6 +212,11 @@ async fn test_edit_region_fill_cache() { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_edit_region_concurrently() { + test_edit_region_concurrently_with_format(false).await; + test_edit_region_concurrently_with_format(true).await; +} + +async fn test_edit_region_concurrently_with_format(flat_format: bool) { const EDITS_PER_TASK: usize = 10; let tasks_count = 10; @@ -251,6 +268,7 @@ async fn test_edit_region_concurrently() { let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, // Suppress the compaction to not impede the speed of this kinda stress testing. min_compaction_interval: Duration::from_secs(60 * 60), ..Default::default() diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 5499e4e168..c40fc7ba02 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -26,10 +26,20 @@ use crate::test_util::{ #[tokio::test] async fn test_scan_without_filtering_deleted() { + test_scan_without_filtering_deleted_with_format(false).await; + test_scan_without_filtering_deleted_with_format(true).await; +} + +async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 39808b91bc..1224aac116 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -41,8 +41,18 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; #[tokio::test] async fn test_manual_flush() { + test_manual_flush_with_format(false).await; + test_manual_flush_with_format(true).await; +} + +async fn test_manual_flush_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -91,12 +101,20 @@ async fn test_manual_flush() { #[tokio::test] async fn test_flush_engine() { + test_flush_engine_with_format(false).await; + test_flush_engine_with_format(true).await; +} + +async fn test_flush_engine_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); let engine = env .create_engine_with( - MitoConfig::default(), + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, Some(write_buffer_manager.clone()), Some(listener.clone()), None, @@ -162,12 +180,20 @@ async fn test_flush_engine() { #[tokio::test] async fn test_write_stall() { + test_write_stall_with_format(false).await; + test_write_stall_with_format(true).await; +} + +async fn test_write_stall_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(StallListener::default()); let engine = env .create_engine_with( - MitoConfig::default(), + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, Some(write_buffer_manager.clone()), Some(listener.clone()), None, @@ -238,11 +264,19 @@ async fn test_write_stall() { #[tokio::test] async fn test_flush_empty() { + test_flush_empty_with_format(false).await; + test_flush_empty_with_format(true).await; +} + +async fn test_flush_empty_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let engine = env .create_engine_with( - MitoConfig::default(), + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, Some(write_buffer_manager.clone()), None, None, @@ -399,6 +433,11 @@ impl MockTimeProvider { #[tokio::test] async fn test_auto_flush_engine() { + test_auto_flush_engine_with_format(false).await; + test_auto_flush_engine_with_format(true).await; +} + +async fn test_auto_flush_engine_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); @@ -408,6 +447,7 @@ async fn test_auto_flush_engine() { .create_engine_with_time( MitoConfig { auto_flush_interval: Duration::from_secs(60 * 5), + default_experimental_flat_format: flat_format, ..Default::default() }, Some(write_buffer_manager.clone()), @@ -470,6 +510,11 @@ async fn test_auto_flush_engine() { #[tokio::test] async fn test_flush_workers() { + test_flush_workers_with_format(false).await; + test_flush_workers_with_format(true).await; +} + +async fn test_flush_workers_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); @@ -477,6 +522,7 @@ async fn test_flush_workers() { .create_engine_with( MitoConfig { num_workers: 2, + default_experimental_flat_format: flat_format, ..Default::default() }, Some(write_buffer_manager.clone()), diff --git a/src/mito2/src/engine/merge_mode_test.rs b/src/mito2/src/engine/merge_mode_test.rs index 986edf8013..097d5e2b91 100644 --- a/src/mito2/src/engine/merge_mode_test.rs +++ b/src/mito2/src/engine/merge_mode_test.rs @@ -29,10 +29,20 @@ use crate::test_util::{ #[tokio::test] async fn test_merge_mode_write_query() { + test_merge_mode_write_query_with_format(false).await; + test_merge_mode_write_query_with_format(true).await; +} + +async fn test_merge_mode_write_query_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -87,11 +97,17 @@ async fn test_merge_mode_write_query() { #[tokio::test] async fn test_merge_mode_compaction() { + test_merge_mode_compaction_with_format(false).await; + test_merge_mode_compaction_with_format(true).await; +} + +async fn test_merge_mode_compaction_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, ..Default::default() }) .await; @@ -204,6 +220,7 @@ async fn test_merge_mode_compaction() { .reopen_engine( engine, MitoConfig { + default_experimental_flat_format: flat_format, ..Default::default() }, ) diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 0a4ef67e1f..52bd6cfda7 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -40,8 +40,18 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_open_empty() { + test_engine_open_empty_with_format(false).await; + test_engine_open_empty_with_format(true).await; +} + +async fn test_engine_open_empty_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("open-empty").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let err = engine @@ -69,8 +79,18 @@ async fn test_engine_open_empty() { #[tokio::test] async fn test_engine_open_existing() { + test_engine_open_existing_with_format(false).await; + test_engine_open_existing_with_format(true).await; +} + +async fn test_engine_open_existing_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("open-exiting").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -98,8 +118,18 @@ async fn test_engine_open_existing() { #[tokio::test] async fn test_engine_reopen_region() { + test_engine_reopen_region_with_format(false).await; + test_engine_reopen_region_with_format(true).await; +} + +async fn test_engine_reopen_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("reopen-region").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -115,8 +145,18 @@ async fn test_engine_reopen_region() { #[tokio::test] async fn test_engine_open_readonly() { + test_engine_open_readonly_with_format(false).await; + test_engine_open_readonly_with_format(true).await; +} + +async fn test_engine_open_readonly_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -158,8 +198,18 @@ async fn test_engine_open_readonly() { #[tokio::test] async fn test_engine_region_open_with_options() { + test_engine_region_open_with_options_with_format(false).await; + test_engine_region_open_with_options_with_format(true).await; +} + +async fn test_engine_region_open_with_options_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -200,9 +250,22 @@ async fn test_engine_region_open_with_options() { #[tokio::test] async fn test_engine_region_open_with_custom_store() { + test_engine_region_open_with_custom_store_with_format(false).await; + test_engine_region_open_with_custom_store_with_format(true).await; +} + +async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool) { let mut env = TestEnv::new().await; let engine = env - .create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"]) + .create_engine_with_multiple_object_stores( + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + None, + None, + &["Gcs"], + ) .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -260,8 +323,18 @@ async fn test_engine_region_open_with_custom_store() { #[tokio::test] async fn test_open_region_skip_wal_replay() { + test_open_region_skip_wal_replay_with_format(false).await; + test_open_region_skip_wal_replay_with_format(true).await; +} + +async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -298,7 +371,15 @@ async fn test_open_region_skip_wal_replay() { }; put_rows(&engine, region_id, rows).await; - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; // Skip the WAL replay . engine .handle_request( @@ -329,7 +410,15 @@ async fn test_open_region_skip_wal_replay() { assert_eq!(expected, batches.pretty_print().unwrap()); // Replay the WAL. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; // Open the region again with options. engine .handle_request( @@ -364,8 +453,18 @@ async fn test_open_region_skip_wal_replay() { #[tokio::test] async fn test_open_region_wait_for_opening_region_ok() { + test_open_region_wait_for_opening_region_ok_with_format(false).await; + test_open_region_wait_for_opening_region_ok_with_format(true).await; +} + +async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let worker = engine.inner.workers.worker(region_id); let (tx, rx) = oneshot::channel(); @@ -405,8 +504,18 @@ async fn test_open_region_wait_for_opening_region_ok() { #[tokio::test] async fn test_open_region_wait_for_opening_region_err() { + test_open_region_wait_for_opening_region_err_with_format(false).await; + test_open_region_wait_for_opening_region_err_with_format(true).await; +} + +async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let worker = engine.inner.workers.worker(region_id); let (tx, rx) = oneshot::channel(); @@ -452,8 +561,16 @@ async fn test_open_region_wait_for_opening_region_err() { #[tokio::test] async fn test_open_compaction_region() { + test_open_compaction_region_with_format(false).await; + test_open_compaction_region_with_format(true).await; +} + +async fn test_open_compaction_region_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let mut mito_config = MitoConfig::default(); + let mut mito_config = MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }; mito_config .sanitize(&env.data_home().display().to_string()) .unwrap(); diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index abdc0776a4..cf5b6491a7 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -34,9 +34,11 @@ async fn scan_in_parallel( table_dir: &str, parallelism: usize, channel_size: usize, + flat_format: bool, ) { let engine = env .open_engine(MitoConfig { + default_experimental_flat_format: flat_format, parallel_scan_channel_size: channel_size, ..Default::default() }) @@ -75,8 +77,18 @@ async fn scan_in_parallel( #[tokio::test] async fn test_parallel_scan() { + test_parallel_scan_with_format(false).await; + test_parallel_scan_with_format(true).await; +} + +async fn test_parallel_scan_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() @@ -134,15 +146,15 @@ async fn test_parallel_scan() { engine.stop().await.unwrap(); - scan_in_parallel(&mut env, region_id, &table_dir, 0, 1).await; + scan_in_parallel(&mut env, region_id, &table_dir, 0, 1, flat_format).await; - scan_in_parallel(&mut env, region_id, &table_dir, 1, 1).await; + scan_in_parallel(&mut env, region_id, &table_dir, 1, 1, flat_format).await; - scan_in_parallel(&mut env, region_id, &table_dir, 2, 1).await; + scan_in_parallel(&mut env, region_id, &table_dir, 2, 1, flat_format).await; - scan_in_parallel(&mut env, region_id, &table_dir, 2, 8).await; + scan_in_parallel(&mut env, region_id, &table_dir, 2, 8, flat_format).await; - scan_in_parallel(&mut env, region_id, &table_dir, 4, 8).await; + scan_in_parallel(&mut env, region_id, &table_dir, 4, 8, flat_format).await; - scan_in_parallel(&mut env, region_id, &table_dir, 8, 2).await; + scan_in_parallel(&mut env, region_id, &table_dir, 8, 2, flat_format).await; } diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index edae798560..9e9509b6c9 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -53,8 +53,18 @@ fn build_rows_multi_tags_fields( #[tokio::test] async fn test_scan_projection() { + test_scan_projection_with_format(false).await; + test_scan_projection_with_format(true).await; +} + +async fn test_scan_projection_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); // [tag_0, tag_1, field_0, field_1, ts] diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index ccac5428c9..b260024043 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -26,9 +26,14 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema, }; -async fn check_prune_row_groups(exprs: Vec, expected: &str) { +async fn check_prune_row_groups(exprs: Vec, expected: &str, flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -67,6 +72,11 @@ async fn check_prune_row_groups(exprs: Vec, expected: &str) { #[tokio::test] async fn test_read_parquet_stats() { + test_read_parquet_stats_with_format(false).await; + test_read_parquet_stats_with_format(true).await; +} + +async fn test_read_parquet_stats_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); check_prune_row_groups( @@ -88,12 +98,18 @@ async fn test_read_parquet_stats() { | 8 | 8.0 | 1970-01-01T00:00:08 | | 9 | 9.0 | 1970-01-01T00:00:09 | +-------+---------+---------------------+", + flat_format, ) .await; } #[tokio::test] async fn test_prune_tag() { + test_prune_tag_with_format(false).await; + test_prune_tag_with_format(true).await; +} + +async fn test_prune_tag_with_format(flat_format: bool) { // prune result: only row group 1&2 check_prune_row_groups( vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))], @@ -107,12 +123,18 @@ async fn test_prune_tag() { | 8 | 8.0 | 1970-01-01T00:00:08 | | 9 | 9.0 | 1970-01-01T00:00:09 | +-------+---------+---------------------+", + flat_format, ) .await; } #[tokio::test] async fn test_prune_tag_and_field() { + test_prune_tag_and_field_with_format(false).await; + test_prune_tag_and_field_with_format(true).await; +} + +async fn test_prune_tag_and_field_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); // prune result: only row group 1 check_prune_row_groups( @@ -128,6 +150,7 @@ async fn test_prune_tag_and_field() { | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | +-------+---------+---------------------+", + flat_format, ) .await; } @@ -147,8 +170,18 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr { #[tokio::test] async fn test_prune_memtable() { + test_prune_memtable_with_format(false).await; + test_prune_memtable_with_format(true).await; +} + +async fn test_prune_memtable_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); @@ -221,8 +254,18 @@ async fn test_prune_memtable() { #[tokio::test] async fn test_prune_memtable_complex_expr() { + test_prune_memtable_complex_expr_with_format(false).await; + test_prune_memtable_complex_expr_with_format(true).await; +} + +async fn test_prune_memtable_complex_expr_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -274,8 +317,18 @@ async fn test_prune_memtable_complex_expr() { #[tokio::test] async fn test_mem_range_prune() { + test_mem_range_prune_with_format(false).await; + test_mem_range_prune_with_format(true).await; +} + +async fn test_mem_range_prune_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 63e703a1c4..46f4cc6cf2 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -29,8 +29,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv}; #[tokio::test] async fn test_scan_with_min_sst_sequence() { + test_scan_with_min_sst_sequence_with_format(false).await; + test_scan_with_min_sst_sequence_with_format(true).await; +} + +async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -155,8 +165,14 @@ async fn test_scan_with_min_sst_sequence() { #[tokio::test] async fn test_max_concurrent_scan_files() { + test_max_concurrent_scan_files_with_format(false).await; + test_max_concurrent_scan_files_with_format(true).await; +} + +async fn test_max_concurrent_scan_files_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await; let config = MitoConfig { + default_experimental_flat_format: flat_format, max_concurrent_scan_files: 2, ..Default::default() }; @@ -206,9 +222,14 @@ async fn test_max_concurrent_scan_files() { } #[tokio::test] -async fn test_series_scan() { +async fn test_series_scan_primarykey() { let mut env = TestEnv::with_prefix("test_series_scan").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: false, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new() @@ -345,3 +366,149 @@ async fn test_series_scan() { +-------+---------+---------------------+"; check_result(expected); } + +#[tokio::test] +async fn test_series_scan_flat() { + let mut env = TestEnv::with_prefix("test_series_scan").await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: true, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.time_window", "1h") + .build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let put_flush_rows = async |start, end| { + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(start, end), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + }; + // generates 3 SST files + put_flush_rows(0, 3).await; + put_flush_rows(2, 6).await; + put_flush_rows(3600, 3603).await; + // Put to memtable. + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(7200, 7203), + }; + test_util::put_rows(&engine, region_id, rows).await; + + let request = ScanRequest { + distribution: Some(TimeSeriesDistribution::PerSeries), + ..Default::default() + }; + let scanner = engine.scanner(region_id, request).await.unwrap(); + let Scanner::Series(mut scanner) = scanner else { + panic!("Scanner should be series scan"); + }; + // 3 partition ranges for 3 time window. + assert_eq!( + 3, + scanner.properties().partitions[0].len(), + "unexpected ranges: {:?}", + scanner.properties().partitions + ); + let raw_ranges: Vec<_> = scanner + .properties() + .partitions + .iter() + .flatten() + .cloned() + .collect(); + let mut new_ranges = Vec::with_capacity(3); + for range in raw_ranges { + new_ranges.push(vec![range]); + } + scanner + .prepare(PrepareRequest { + ranges: Some(new_ranges), + ..Default::default() + }) + .unwrap(); + + let metrics_set = ExecutionPlanMetricsSet::default(); + + let mut partition_batches = vec![vec![]; 3]; + let mut streams: Vec<_> = (0..3) + .map(|partition| { + let stream = scanner + .scan_partition(&Default::default(), &metrics_set, partition) + .unwrap(); + Some(stream) + }) + .collect(); + let mut num_done = 0; + let mut schema = None; + // Pull streams in round-robin fashion to get the consistent output from the sender. + while num_done < 3 { + if schema.is_none() { + schema = Some(streams[0].as_ref().unwrap().schema().clone()); + } + for i in 0..3 { + let Some(mut stream) = streams[i].take() else { + continue; + }; + let Some(rb) = stream.try_next().await.unwrap() else { + num_done += 1; + continue; + }; + partition_batches[i].push(rb); + streams[i] = Some(stream); + } + } + + let mut check_result = |expected| { + let batches = + RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); + }; + + // Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202 + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 7200 | 7200.0 | 1970-01-01T02:00:00 | +| 7201 | 7201.0 | 1970-01-01T02:00:01 | +| 7202 | 7202.0 | 1970-01-01T02:00:02 | ++-------+---------+---------------------+"; + check_result(expected); + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 3600 | 3600.0 | 1970-01-01T01:00:00 | +| 3601 | 3601.0 | 1970-01-01T01:00:01 | +| 3602 | 3602.0 | 1970-01-01T01:00:02 | ++-------+---------+---------------------+"; + check_result(expected); + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | ++-------+---------+---------------------+"; + check_result(expected); +} diff --git a/src/mito2/src/engine/set_role_state_test.rs b/src/mito2/src/engine/set_role_state_test.rs index 30818caccd..53e1717ae6 100644 --- a/src/mito2/src/engine/set_role_state_test.rs +++ b/src/mito2/src/engine/set_role_state_test.rs @@ -57,13 +57,23 @@ fn assert_invalid_transition_response(response: &SetRegionRoleStateResponse) { #[tokio::test] async fn test_set_role_state_gracefully() { + test_set_role_state_gracefully_with_format(false).await; + test_set_role_state_gracefully_with_format(true).await; +} + +async fn test_set_role_state_gracefully_with_format(flat_format: bool) { let settable_role_states = [ SettableRegionRoleState::Follower, SettableRegionRoleState::DowngradingLeader, ]; for settable_role_state in settable_role_states { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -122,8 +132,18 @@ async fn test_set_role_state_gracefully() { #[tokio::test] async fn test_set_role_state_gracefully_not_exist() { + test_set_role_state_gracefully_not_exist_with_format(false).await; + test_set_role_state_gracefully_not_exist_with_format(true).await; +} + +async fn test_set_role_state_gracefully_not_exist_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let non_exist_region_id = RegionId::new(1, 1); @@ -137,8 +157,18 @@ async fn test_set_role_state_gracefully_not_exist() { #[tokio::test] async fn test_write_downgrading_region() { + test_write_downgrading_region_with_format(false).await; + test_write_downgrading_region_with_format(true).await; +} + +async fn test_write_downgrading_region_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("write-to-downgrading-region").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -180,8 +210,18 @@ async fn test_write_downgrading_region() { #[tokio::test] async fn test_unified_state_transitions() { + test_unified_state_transitions_with_format(false).await; + test_unified_state_transitions_with_format(true).await; +} + +async fn test_unified_state_transitions_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -279,8 +319,18 @@ async fn test_unified_state_transitions() { #[tokio::test] async fn test_restricted_state_transitions() { + test_restricted_state_transitions_with_format(false).await; + test_restricted_state_transitions_with_format(true).await; +} + +async fn test_restricted_state_transitions_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index b801617826..6d802a5d9d 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -31,8 +31,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows #[tokio::test] async fn test_staging_state_integration() { + test_staging_state_integration_with_format(false).await; + test_staging_state_integration_with_format(true).await; +} + +async fn test_staging_state_integration_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -79,8 +89,18 @@ async fn test_staging_state_integration() { #[tokio::test] async fn test_staging_blocks_alter_operations() { + test_staging_blocks_alter_operations_with_format(false).await; + test_staging_blocks_alter_operations_with_format(true).await; +} + +async fn test_staging_blocks_alter_operations_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -110,8 +130,18 @@ async fn test_staging_blocks_alter_operations() { #[tokio::test] async fn test_staging_blocks_truncate_operations() { + test_staging_blocks_truncate_operations_with_format(false).await; + test_staging_blocks_truncate_operations_with_format(true).await; +} + +async fn test_staging_blocks_truncate_operations_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -186,8 +216,18 @@ async fn test_staging_state_validation_patterns() { #[tokio::test] async fn test_staging_manifest_directory() { + test_staging_manifest_directory_with_format(false).await; + test_staging_manifest_directory_with_format(true).await; +} + +async fn test_staging_manifest_directory_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1024, 0); let request = CreateRequestBuilder::new().build(); @@ -267,8 +307,18 @@ async fn test_staging_manifest_directory() { #[tokio::test] async fn test_staging_exit_success_with_manifests() { + test_staging_exit_success_with_manifests_with_format(false).await; + test_staging_exit_success_with_manifests_with_format(true).await; +} + +async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) { let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1024, 0); let request = CreateRequestBuilder::new().build(); diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 8116d874a7..5d6d5802f2 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -71,9 +71,19 @@ async fn scan_check( #[tokio::test] async fn test_sync_after_flush_region() { + test_sync_after_flush_region_with_format(false).await; + test_sync_after_flush_region_with_format(true).await; +} + +async fn test_sync_after_flush_region_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); env.get_schema_metadata_manager() .register_region_table_info( @@ -100,7 +110,12 @@ async fn test_sync_after_flush_region() { put_rows(&engine, region_id, rows).await; // Open the region on the follower engine - let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + let follower_engine = env + .create_follower_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; follower_engine .handle_request( region_id, @@ -164,10 +179,20 @@ async fn test_sync_after_flush_region() { #[tokio::test] async fn test_sync_after_alter_region() { + test_sync_after_alter_region_with_format(false).await; + test_sync_after_alter_region_with_format(true).await; +} + +async fn test_sync_after_alter_region_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; let region_id = RegionId::new(1, 1); let request = CreateRequestBuilder::new().build(); @@ -197,7 +222,12 @@ async fn test_sync_after_alter_region() { put_rows(&engine, region_id, rows).await; // Open the region on the follower engine - let follower_engine = env.create_follower_engine(MitoConfig::default()).await; + let follower_engine = env + .create_follower_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; follower_engine .handle_request( region_id, diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index 0686db95c1..223cc2b488 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -33,8 +33,18 @@ use crate::test_util::{ #[tokio::test] async fn test_engine_truncate_region_basic() { + test_engine_truncate_region_basic_with_format(false).await; + test_engine_truncate_region_basic_with_format(true).await; +} + +async fn test_engine_truncate_region_basic_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("truncate-basic").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // Create the region. let region_id = RegionId::new(1, 1); @@ -86,8 +96,18 @@ async fn test_engine_truncate_region_basic() { #[tokio::test] async fn test_engine_put_data_after_truncate() { + test_engine_put_data_after_truncate_with_format(false).await; + test_engine_put_data_after_truncate_with_format(true).await; +} + +async fn test_engine_put_data_after_truncate_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("truncate-put").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // Create the region. let region_id = RegionId::new(1, 1); @@ -152,8 +172,18 @@ async fn test_engine_put_data_after_truncate() { #[tokio::test] async fn test_engine_truncate_after_flush() { + test_engine_truncate_after_flush_with_format(false).await; + test_engine_truncate_after_flush_with_format(true).await; +} + +async fn test_engine_truncate_after_flush_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("truncate-flush").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // Create the region. let region_id = RegionId::new(1, 1); @@ -232,8 +262,18 @@ async fn test_engine_truncate_after_flush() { #[tokio::test] async fn test_engine_truncate_reopen() { + test_engine_truncate_reopen_with_format(false).await; + test_engine_truncate_reopen_with_format(true).await; +} + +async fn test_engine_truncate_reopen_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("truncate-reopen").await; - let engine = env.create_engine(MitoConfig::default()).await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; // Create the region. let region_id = RegionId::new(1, 1); @@ -266,7 +306,15 @@ async fn test_engine_truncate_reopen() { .unwrap(); // Reopen the region again. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id, @@ -295,13 +343,21 @@ async fn test_engine_truncate_reopen() { #[tokio::test] async fn test_engine_truncate_during_flush() { + test_engine_truncate_during_flush_with_format(false).await; + test_engine_truncate_during_flush_with_format(true).await; +} + +async fn test_engine_truncate_during_flush_with_format(flat_format: bool) { init_default_ut_logging(); let mut env = TestEnv::with_prefix("truncate-during-flush").await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushTruncateListener::default()); let engine = env .create_engine_with( - MitoConfig::default(), + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, Some(write_buffer_manager), Some(listener.clone()), None, @@ -376,7 +432,15 @@ async fn test_engine_truncate_during_flush() { assert_eq!(sequence, truncated_sequence); // Reopen the engine. - let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }, + ) + .await; engine .handle_request( region_id,