mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
test: run engine unit tests for flat format (#7119)
* test: support flat in basic_test Signed-off-by: evenyag <realevenyag@gmail.com> * test: support flat in alter_test Signed-off-by: evenyag <realevenyag@gmail.com> * test: support flat for append_mode_test Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update bump_committed_sequence_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update close_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update compaction_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update create_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update edit_region_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update merge_mode_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update parallel_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update projection_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update prune_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update row_selector_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update scan_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update drop_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update filter_deleted_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update sync_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update set_role_state_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update staging_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update truncate_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update catchup_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update flush_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update open_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: update batch_open_test to test both formats Signed-off-by: evenyag <realevenyag@gmail.com> * test: fix all flat format tests Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -129,10 +129,20 @@ fn assert_column_metadatas(column_name: &[(&str, ColumnId)], column_metadatas: &
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region() {
|
||||
test_alter_region_with_format(false).await;
|
||||
test_alter_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_region_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -197,7 +207,15 @@ async fn test_alter_region() {
|
||||
);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -239,8 +257,18 @@ fn build_rows_for_tags(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_after_alter() {
|
||||
test_put_after_alter_with_format(false).await;
|
||||
test_put_after_alter_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_put_after_alter_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
@@ -284,7 +312,15 @@ async fn test_put_after_alter() {
|
||||
scan_check_after_alter(&engine, region_id, expected).await;
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -339,10 +375,20 @@ async fn test_put_after_alter() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region_retry() {
|
||||
test_alter_region_retry_with_format(false).await;
|
||||
test_alter_region_retry_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_region_retry_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -397,12 +443,25 @@ async fn test_alter_region_retry() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_on_flushing() {
|
||||
test_alter_on_flushing_with_format(false).await;
|
||||
test_alter_on_flushing_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_on_flushing_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(AlterFlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -501,12 +560,25 @@ async fn test_alter_on_flushing() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_column_fulltext_options() {
|
||||
test_alter_column_fulltext_options_with_format(false).await;
|
||||
test_alter_column_fulltext_options_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_column_fulltext_options_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(AlterFlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -603,7 +675,15 @@ async fn test_alter_column_fulltext_options() {
|
||||
check_region_version(&engine, region_id, 1, 3, 1, 3);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -624,12 +704,25 @@ async fn test_alter_column_fulltext_options() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_column_set_inverted_index() {
|
||||
test_alter_column_set_inverted_index_with_format(false).await;
|
||||
test_alter_column_set_inverted_index_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_column_set_inverted_index_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(AlterFlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -717,7 +810,15 @@ async fn test_alter_column_set_inverted_index() {
|
||||
check_region_version(&engine, region_id, 1, 3, 1, 3);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -738,12 +839,25 @@ async fn test_alter_column_set_inverted_index() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_alter_region_ttl_options() {
|
||||
test_alter_region_ttl_options_with_format(false).await;
|
||||
test_alter_region_ttl_options_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(AlterFlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -788,12 +902,25 @@ async fn test_alter_region_ttl_options() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_stall_on_altering() {
|
||||
test_write_stall_on_altering_with_format(false).await;
|
||||
test_write_stall_on_altering_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_stall_on_altering_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(NotifyRegionChangeResultListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -29,10 +29,20 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_mode_write_query() {
|
||||
test_append_mode_write_query_with_format(false).await;
|
||||
test_append_mode_write_query_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_append_mode_write_query_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -89,9 +99,15 @@ async fn test_append_mode_write_query() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_mode_compaction() {
|
||||
test_append_mode_compaction_with_format(false).await;
|
||||
test_append_mode_compaction_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_append_mode_compaction_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@@ -190,6 +206,7 @@ async fn test_append_mode_compaction() {
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
|
||||
@@ -48,8 +48,18 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_new_stop() {
|
||||
test_engine_new_stop_with_format(false).await;
|
||||
test_engine_new_stop_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_new_stop_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("engine-stop").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -75,8 +85,18 @@ async fn test_engine_new_stop() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_to_region() {
|
||||
test_write_to_region_with_format(false).await;
|
||||
test_write_to_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_to_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("write-to-region").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -97,8 +117,12 @@ async fn test_write_to_region() {
|
||||
}
|
||||
|
||||
#[apply(multiple_log_store_factories)]
|
||||
|
||||
async fn test_region_replay(factory: Option<LogStoreFactory>) {
|
||||
test_region_replay_with_format(factory.clone(), false).await;
|
||||
test_region_replay_with_format(factory, true).await;
|
||||
}
|
||||
|
||||
async fn test_region_replay_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
|
||||
use common_wal::options::{KafkaWalOptions, WalOptions};
|
||||
|
||||
common_telemetry::init_default_ut_logging();
|
||||
@@ -108,7 +132,12 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
|
||||
let mut env = TestEnv::with_prefix("region-replay")
|
||||
.await
|
||||
.with_log_store_factory(factory.clone());
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let topic = prepare_test_for_kafka_log_store(&factory).await;
|
||||
@@ -136,7 +165,15 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut options = HashMap::new();
|
||||
if let Some(topic) = &topic {
|
||||
@@ -189,8 +226,18 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_query_region() {
|
||||
test_write_query_region_with_format(false).await;
|
||||
test_write_query_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_query_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -223,8 +270,18 @@ async fn test_write_query_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_different_order() {
|
||||
test_different_order_with_format(false).await;
|
||||
test_different_order_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_different_order_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().tag_num(2).field_num(2).build();
|
||||
@@ -274,8 +331,18 @@ async fn test_different_order() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_different_order_and_type() {
|
||||
test_different_order_and_type_with_format(false).await;
|
||||
test_different_order_and_type_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_different_order_and_type_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// tag_0, tag_1, field_0, field_1, ts,
|
||||
@@ -326,10 +393,20 @@ async fn test_different_order_and_type() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_delete() {
|
||||
test_put_delete_with_format(false).await;
|
||||
test_put_delete_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_put_delete_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -380,8 +457,18 @@ async fn test_put_delete() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_not_null_fields() {
|
||||
test_delete_not_null_fields_with_format(false).await;
|
||||
test_delete_not_null_fields_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_delete_not_null_fields_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().all_not_null(true).build();
|
||||
@@ -429,8 +516,18 @@ async fn test_delete_not_null_fields() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_overwrite() {
|
||||
test_put_overwrite_with_format(false).await;
|
||||
test_put_overwrite_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_put_overwrite_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -489,8 +586,18 @@ async fn test_put_overwrite() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_absent_and_invalid_columns() {
|
||||
test_absent_and_invalid_columns_with_format(false).await;
|
||||
test_absent_and_invalid_columns_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_absent_and_invalid_columns_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// tag_0, field_0, field_1, ts,
|
||||
@@ -531,8 +638,18 @@ async fn test_absent_and_invalid_columns() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_usage() {
|
||||
test_region_usage_with_format(false).await;
|
||||
test_region_usage_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_region_usage_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("region_usage").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -583,11 +700,20 @@ async fn test_region_usage() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_with_write_cache() {
|
||||
test_engine_with_write_cache_with_format(false).await;
|
||||
test_engine_with_write_cache_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_with_write_cache_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let path = env.data_home().to_str().unwrap().to_string();
|
||||
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
|
||||
let mito_config = MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
}
|
||||
.enable_write_cache(path, ReadableSize::mb(512), None);
|
||||
let engine = env.create_engine(mito_config).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -625,9 +751,15 @@ async fn test_engine_with_write_cache() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cache_null_primary_key() {
|
||||
test_cache_null_primary_key_with_format(false).await;
|
||||
test_cache_null_primary_key_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_cache_null_primary_key_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
vector_cache_size: ReadableSize::mb(32),
|
||||
..Default::default()
|
||||
})
|
||||
@@ -726,8 +858,40 @@ async fn test_cache_null_primary_key() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_ssts() {
|
||||
test_list_ssts_with_format(false, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"# ,r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
|
||||
test_list_ssts_with_format(true, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2855, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
|
||||
}
|
||||
|
||||
async fn test_list_ssts_with_format(
|
||||
flat_format: bool,
|
||||
expected_manifest_ssts: &str,
|
||||
expected_storage_ssts: &str,
|
||||
) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create 3 regions and write some rows to each of them
|
||||
let region_ids = vec![
|
||||
@@ -784,13 +948,7 @@ async fn test_list_ssts() {
|
||||
.sorted()
|
||||
.collect::<Vec<_>>()
|
||||
.join("");
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2531, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#
|
||||
);
|
||||
assert_eq!(debug_format, expected_manifest_ssts,);
|
||||
|
||||
// list from storage
|
||||
let storage_entries = engine
|
||||
@@ -808,26 +966,37 @@ ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id:
|
||||
.sorted()
|
||||
.collect::<Vec<_>>()
|
||||
.join("");
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#
|
||||
);
|
||||
assert_eq!(debug_format, expected_storage_ssts,);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_all_index_metas_list_all_types() {
|
||||
test_all_index_metas_list_all_types_with_format(false, r#"
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
test_all_index_metas_list_all_types_with_format(true, r#"
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6144), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6144), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6144), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 92, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":66,\"inverted_index_size\":92,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6144), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
}
|
||||
|
||||
async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expect_format: &str) {
|
||||
use datatypes::schema::{
|
||||
FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
|
||||
};
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// One region with both fulltext backends and inverted index enabled, plus bloom skipping index
|
||||
let region_id = RegionId::new(11, 1);
|
||||
@@ -956,13 +1125,5 @@ async fn test_all_index_metas_list_all_types() {
|
||||
.map(|entry| format!("\n{:?}", entry))
|
||||
.collect::<String>();
|
||||
|
||||
assert_eq!(
|
||||
debug_format,
|
||||
r#"
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "bloom_filter", target_type: "column", target_key: "3", target_json: "{\"column\":3}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_bloom", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 87, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "fulltext_tantivy", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 1104, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "0", target_json: "{\"column\":0}", blob_size: 70, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":44,\"inverted_index_size\":70,\"null_bitmap_size\":8,\"relative_fst_offset\":26,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6032), index_type: "inverted", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#
|
||||
);
|
||||
assert_eq!(debug_format, expect_format);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,11 @@ use crate::test_util::{
|
||||
|
||||
#[apply(multiple_log_store_factories)]
|
||||
async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
test_batch_open_with_format(factory.clone(), false).await;
|
||||
test_batch_open_with_format(factory, true).await;
|
||||
}
|
||||
|
||||
async fn test_batch_open_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let Some(factory) = factory else {
|
||||
return;
|
||||
@@ -42,7 +47,12 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
let mut env = TestEnv::with_prefix("open-batch-regions")
|
||||
.await
|
||||
.with_log_store_factory(factory.clone());
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let topic = prepare_test_for_kafka_log_store(&factory).await;
|
||||
|
||||
let num_regions = 3u32;
|
||||
@@ -143,7 +153,15 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
));
|
||||
|
||||
// Reopen engine.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let mut results = engine
|
||||
.handle_batch_open_requests(4, requests)
|
||||
.await
|
||||
@@ -161,6 +179,11 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
|
||||
|
||||
#[apply(multiple_log_store_factories)]
|
||||
async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
|
||||
test_batch_open_err_with_format(factory.clone(), false).await;
|
||||
test_batch_open_err_with_format(factory, true).await;
|
||||
}
|
||||
|
||||
async fn test_batch_open_err_with_format(factory: Option<LogStoreFactory>, flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let Some(factory) = factory else {
|
||||
return;
|
||||
@@ -168,7 +191,12 @@ async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
|
||||
let mut env = TestEnv::with_prefix("open-batch-regions-err")
|
||||
.await
|
||||
.with_log_store_factory(factory.clone());
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let topic = prepare_test_for_kafka_log_store(&factory).await;
|
||||
let mut options = HashMap::new();
|
||||
if let Some(topic) = &topic {
|
||||
|
||||
@@ -26,9 +26,19 @@ use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bump_committed_sequence() {
|
||||
test_bump_committed_sequence_with_format(false).await;
|
||||
test_bump_committed_sequence_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_bump_committed_sequence_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -83,7 +93,15 @@ async fn test_bump_committed_sequence() {
|
||||
assert_eq!(region.version_control.committed_sequence(), 43);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -114,7 +132,15 @@ async fn test_bump_committed_sequence() {
|
||||
assert_eq!(region.version_control.current().version.flushed_sequence, 0);
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
|
||||
@@ -692,8 +692,18 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_catchup_not_exist() {
|
||||
test_catchup_not_exist_with_format(false).await;
|
||||
test_catchup_not_exist_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_catchup_not_exist_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let non_exist_region_id = RegionId::new(1, 1);
|
||||
|
||||
|
||||
@@ -21,8 +21,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_close_region() {
|
||||
test_engine_close_region_with_format(false).await;
|
||||
test_engine_close_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_close_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("close").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// It's okay to close a region doesn't exist.
|
||||
|
||||
@@ -135,9 +135,19 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_region() {
|
||||
test_compaction_region_with_format(false).await;
|
||||
test_compaction_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_compaction_region_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -201,9 +211,19 @@ async fn test_compaction_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_infer_compaction_time_window() {
|
||||
test_infer_compaction_time_window_with_format(false).await;
|
||||
test_infer_compaction_time_window_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_infer_compaction_time_window_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -342,9 +362,19 @@ async fn test_infer_compaction_time_window() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_overlapping_files() {
|
||||
test_compaction_overlapping_files_with_format(false).await;
|
||||
test_compaction_overlapping_files_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_compaction_overlapping_files_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -403,9 +433,19 @@ async fn test_compaction_overlapping_files() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_region_with_overlapping() {
|
||||
test_compaction_region_with_overlapping_with_format(false).await;
|
||||
test_compaction_region_with_overlapping_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_compaction_region_with_overlapping_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -451,9 +491,19 @@ async fn test_compaction_region_with_overlapping() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
test_compaction_region_with_overlapping_delete_all_with_format(false).await;
|
||||
test_compaction_region_with_overlapping_delete_all_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_compaction_region_with_overlapping_delete_all_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -507,12 +557,18 @@ async fn test_compaction_region_with_overlapping_delete_all() {
|
||||
// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633
|
||||
#[tokio::test]
|
||||
async fn test_readonly_during_compaction() {
|
||||
test_readonly_during_compaction_with_format(false).await;
|
||||
test_readonly_during_compaction_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_readonly_during_compaction_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let listener = Arc::new(CompactionListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
// Ensure there is only one background worker for purge task.
|
||||
max_background_purges: 1,
|
||||
..Default::default()
|
||||
@@ -592,9 +648,19 @@ async fn test_readonly_during_compaction() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compaction_update_time_window() {
|
||||
test_compaction_update_time_window_with_format(false).await;
|
||||
test_compaction_update_time_window_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_compaction_update_time_window_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -688,9 +754,19 @@ async fn test_compaction_update_time_window() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_change_region_compaction_window() {
|
||||
test_change_region_compaction_window_with_format(false).await;
|
||||
test_change_region_compaction_window_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_change_region_compaction_window_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -785,7 +861,15 @@ async fn test_change_region_compaction_window() {
|
||||
}
|
||||
|
||||
// Reopen region.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -815,9 +899,19 @@ async fn test_change_region_compaction_window() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_overwrite_compaction_window() {
|
||||
test_open_overwrite_compaction_window_with_format(false).await;
|
||||
test_open_overwrite_compaction_window_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_open_overwrite_compaction_window_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -869,7 +963,15 @@ async fn test_open_overwrite_compaction_window() {
|
||||
("compaction.type".to_string(), "twcs".to_string()),
|
||||
("compaction.twcs.time_window".to_string(), "2h".to_string()),
|
||||
]);
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
|
||||
@@ -28,8 +28,18 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_new_region() {
|
||||
test_engine_create_new_region_with_format(false).await;
|
||||
test_engine_create_new_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_new_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("new-region").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -43,8 +53,18 @@ async fn test_engine_create_new_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_existing_region() {
|
||||
test_engine_create_existing_region_with_format(false).await;
|
||||
test_engine_create_existing_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_existing_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("create-existing").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
@@ -62,9 +82,19 @@ async fn test_engine_create_existing_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_close_create_region() {
|
||||
test_engine_create_close_create_region_with_format(false).await;
|
||||
test_engine_create_close_create_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_close_create_region_with_format(flat_format: bool) {
|
||||
// This test will trigger create_or_open function.
|
||||
let mut env = TestEnv::with_prefix("create-close-create").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
@@ -93,8 +123,18 @@ async fn test_engine_create_close_create_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_different_id() {
|
||||
test_engine_create_with_different_id_with_format(false).await;
|
||||
test_engine_create_with_different_id_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_different_id_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
@@ -112,8 +152,18 @@ async fn test_engine_create_with_different_id() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_different_schema() {
|
||||
test_engine_create_with_different_schema_with_format(false).await;
|
||||
test_engine_create_with_different_schema_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_different_schema_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
@@ -132,8 +182,18 @@ async fn test_engine_create_with_different_schema() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_different_primary_key() {
|
||||
test_engine_create_with_different_primary_key_with_format(false).await;
|
||||
test_engine_create_with_different_primary_key_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_different_primary_key_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let builder = CreateRequestBuilder::new().tag_num(2);
|
||||
@@ -152,8 +212,18 @@ async fn test_engine_create_with_different_primary_key() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_options() {
|
||||
test_engine_create_with_options_with_format(false).await;
|
||||
test_engine_create_with_options_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_options_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -174,9 +244,22 @@ async fn test_engine_create_with_options() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_custom_store() {
|
||||
test_engine_create_with_custom_store_with_format(false).await;
|
||||
test_engine_create_with_custom_store_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_custom_store_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
|
||||
.create_engine_with_multiple_object_stores(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
None,
|
||||
&["Gcs"],
|
||||
)
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -210,8 +293,18 @@ async fn test_engine_create_with_custom_store() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_create_with_memtable_opts() {
|
||||
test_engine_create_with_memtable_opts_with_format(false).await;
|
||||
test_engine_create_with_memtable_opts_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_create_with_memtable_opts_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -252,8 +345,18 @@ async fn test_engine_create_with_memtable_opts() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_with_partition_expr_persists_manifest() {
|
||||
create_with_partition_expr_persists_manifest_with_format(false).await;
|
||||
create_with_partition_expr_persists_manifest_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn create_with_partition_expr_persists_manifest_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let expr_json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
|
||||
|
||||
@@ -33,12 +33,25 @@ use crate::worker::DROPPING_MARKER_FILE;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_drop_region() {
|
||||
test_engine_drop_region_with_format(false).await;
|
||||
test_engine_drop_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_drop_region_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::with_prefix("drop").await;
|
||||
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
|
||||
let engine = env
|
||||
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()), None)
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -107,6 +120,11 @@ async fn test_engine_drop_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_drop_region_for_custom_store() {
|
||||
test_engine_drop_region_for_custom_store_with_format(false).await;
|
||||
test_engine_drop_region_for_custom_store_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_drop_region_for_custom_store_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
async fn setup(
|
||||
engine: &MitoEngine,
|
||||
@@ -148,7 +166,10 @@ async fn test_engine_drop_region_for_custom_store() {
|
||||
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
|
||||
let engine = env
|
||||
.create_engine_with_multiple_object_stores(
|
||||
MitoConfig::default(),
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
Some(listener.clone()),
|
||||
&["Gcs"],
|
||||
|
||||
@@ -33,6 +33,11 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_edit_region_schedule_compaction() {
|
||||
test_edit_region_schedule_compaction_with_format(false).await;
|
||||
test_edit_region_schedule_compaction_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_edit_region_schedule_compaction_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
|
||||
struct EditRegionListener {
|
||||
@@ -49,6 +54,7 @@ async fn test_edit_region_schedule_compaction() {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let config = MitoConfig {
|
||||
min_compaction_interval: Duration::from_secs(60 * 60),
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
};
|
||||
let time_provider = Arc::new(MockTimeProvider::new(current_time_millis()));
|
||||
@@ -124,6 +130,11 @@ async fn test_edit_region_schedule_compaction() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_edit_region_fill_cache() {
|
||||
test_edit_region_fill_cache_with_format(false).await;
|
||||
test_edit_region_fill_cache_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_edit_region_fill_cache_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
|
||||
struct EditRegionListener {
|
||||
@@ -143,6 +154,7 @@ async fn test_edit_region_fill_cache() {
|
||||
MitoConfig {
|
||||
// Write cache must be enabled to download the ingested SST file.
|
||||
enable_write_cache: true,
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
@@ -200,6 +212,11 @@ async fn test_edit_region_fill_cache() {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_edit_region_concurrently() {
|
||||
test_edit_region_concurrently_with_format(false).await;
|
||||
test_edit_region_concurrently_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_edit_region_concurrently_with_format(flat_format: bool) {
|
||||
const EDITS_PER_TASK: usize = 10;
|
||||
let tasks_count = 10;
|
||||
|
||||
@@ -251,6 +268,7 @@ async fn test_edit_region_concurrently() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
// Suppress the compaction to not impede the speed of this kinda stress testing.
|
||||
min_compaction_interval: Duration::from_secs(60 * 60),
|
||||
..Default::default()
|
||||
|
||||
@@ -26,10 +26,20 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_without_filtering_deleted() {
|
||||
test_scan_without_filtering_deleted_with_format(false).await;
|
||||
test_scan_without_filtering_deleted_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
|
||||
@@ -41,8 +41,18 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manual_flush() {
|
||||
test_manual_flush_with_format(false).await;
|
||||
test_manual_flush_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_manual_flush_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -91,12 +101,20 @@ async fn test_manual_flush() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_flush_engine() {
|
||||
test_flush_engine_with_format(false).await;
|
||||
test_flush_engine_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_flush_engine_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig::default(),
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
@@ -162,12 +180,20 @@ async fn test_flush_engine() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_stall() {
|
||||
test_write_stall_with_format(false).await;
|
||||
test_write_stall_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_stall_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(StallListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig::default(),
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
@@ -238,11 +264,19 @@ async fn test_write_stall() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_flush_empty() {
|
||||
test_flush_empty_with_format(false).await;
|
||||
test_flush_empty_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_flush_empty_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig::default(),
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
None,
|
||||
None,
|
||||
@@ -399,6 +433,11 @@ impl MockTimeProvider {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_auto_flush_engine() {
|
||||
test_auto_flush_engine_with_format(false).await;
|
||||
test_auto_flush_engine_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_auto_flush_engine_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushListener::default());
|
||||
@@ -408,6 +447,7 @@ async fn test_auto_flush_engine() {
|
||||
.create_engine_with_time(
|
||||
MitoConfig {
|
||||
auto_flush_interval: Duration::from_secs(60 * 5),
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
@@ -470,6 +510,11 @@ async fn test_auto_flush_engine() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_flush_workers() {
|
||||
test_flush_workers_with_format(false).await;
|
||||
test_flush_workers_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_flush_workers_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushListener::default());
|
||||
@@ -477,6 +522,7 @@ async fn test_flush_workers() {
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
num_workers: 2,
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
|
||||
@@ -29,10 +29,20 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_mode_write_query() {
|
||||
test_merge_mode_write_query_with_format(false).await;
|
||||
test_merge_mode_write_query_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_merge_mode_write_query_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -87,11 +97,17 @@ async fn test_merge_mode_write_query() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_mode_compaction() {
|
||||
test_merge_mode_compaction_with_format(false).await;
|
||||
test_merge_mode_compaction_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_merge_mode_compaction_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
@@ -204,6 +220,7 @@ async fn test_merge_mode_compaction() {
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
|
||||
@@ -40,8 +40,18 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_open_empty() {
|
||||
test_engine_open_empty_with_format(false).await;
|
||||
test_engine_open_empty_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_open_empty_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("open-empty").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let err = engine
|
||||
@@ -69,8 +79,18 @@ async fn test_engine_open_empty() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_open_existing() {
|
||||
test_engine_open_existing_with_format(false).await;
|
||||
test_engine_open_existing_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_open_existing_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("open-exiting").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -98,8 +118,18 @@ async fn test_engine_open_existing() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_reopen_region() {
|
||||
test_engine_reopen_region_with_format(false).await;
|
||||
test_engine_reopen_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_reopen_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("reopen-region").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -115,8 +145,18 @@ async fn test_engine_reopen_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_open_readonly() {
|
||||
test_engine_open_readonly_with_format(false).await;
|
||||
test_engine_open_readonly_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_open_readonly_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -158,8 +198,18 @@ async fn test_engine_open_readonly() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_region_open_with_options() {
|
||||
test_engine_region_open_with_options_with_format(false).await;
|
||||
test_engine_region_open_with_options_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_region_open_with_options_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -200,9 +250,22 @@ async fn test_engine_region_open_with_options() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_region_open_with_custom_store() {
|
||||
test_engine_region_open_with_custom_store_with_format(false).await;
|
||||
test_engine_region_open_with_custom_store_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_region_open_with_custom_store_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
|
||||
.create_engine_with_multiple_object_stores(
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
None,
|
||||
&["Gcs"],
|
||||
)
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -260,8 +323,18 @@ async fn test_engine_region_open_with_custom_store() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_region_skip_wal_replay() {
|
||||
test_open_region_skip_wal_replay_with_format(false).await;
|
||||
test_open_region_skip_wal_replay_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_open_region_skip_wal_replay_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -298,7 +371,15 @@ async fn test_open_region_skip_wal_replay() {
|
||||
};
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
// Skip the WAL replay .
|
||||
engine
|
||||
.handle_request(
|
||||
@@ -329,7 +410,15 @@ async fn test_open_region_skip_wal_replay() {
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
|
||||
// Replay the WAL.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
// Open the region again with options.
|
||||
engine
|
||||
.handle_request(
|
||||
@@ -364,8 +453,18 @@ async fn test_open_region_skip_wal_replay() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_region_wait_for_opening_region_ok() {
|
||||
test_open_region_wait_for_opening_region_ok_with_format(false).await;
|
||||
test_open_region_wait_for_opening_region_ok_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_open_region_wait_for_opening_region_ok_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let worker = engine.inner.workers.worker(region_id);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -405,8 +504,18 @@ async fn test_open_region_wait_for_opening_region_ok() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_region_wait_for_opening_region_err() {
|
||||
test_open_region_wait_for_opening_region_err_with_format(false).await;
|
||||
test_open_region_wait_for_opening_region_err_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_open_region_wait_for_opening_region_err_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let worker = engine.inner.workers.worker(region_id);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -452,8 +561,16 @@ async fn test_open_region_wait_for_opening_region_err() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_compaction_region() {
|
||||
test_open_compaction_region_with_format(false).await;
|
||||
test_open_compaction_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_open_compaction_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let mut mito_config = MitoConfig::default();
|
||||
let mut mito_config = MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
};
|
||||
mito_config
|
||||
.sanitize(&env.data_home().display().to_string())
|
||||
.unwrap();
|
||||
|
||||
@@ -34,9 +34,11 @@ async fn scan_in_parallel(
|
||||
table_dir: &str,
|
||||
parallelism: usize,
|
||||
channel_size: usize,
|
||||
flat_format: bool,
|
||||
) {
|
||||
let engine = env
|
||||
.open_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
parallel_scan_channel_size: channel_size,
|
||||
..Default::default()
|
||||
})
|
||||
@@ -75,8 +77,18 @@ async fn scan_in_parallel(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parallel_scan() {
|
||||
test_parallel_scan_with_format(false).await;
|
||||
test_parallel_scan_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_parallel_scan_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
@@ -134,15 +146,15 @@ async fn test_parallel_scan() {
|
||||
|
||||
engine.stop().await.unwrap();
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 0, 1).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 0, 1, flat_format).await;
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 1, 1).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 1, 1, flat_format).await;
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 2, 1).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 2, 1, flat_format).await;
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 2, 8).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 2, 8, flat_format).await;
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 4, 8).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 4, 8, flat_format).await;
|
||||
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 8, 2).await;
|
||||
scan_in_parallel(&mut env, region_id, &table_dir, 8, 2, flat_format).await;
|
||||
}
|
||||
|
||||
@@ -53,8 +53,18 @@ fn build_rows_multi_tags_fields(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_projection() {
|
||||
test_scan_projection_with_format(false).await;
|
||||
test_scan_projection_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_scan_projection_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
// [tag_0, tag_1, field_0, field_1, ts]
|
||||
|
||||
@@ -26,9 +26,14 @@ use crate::test_util::{
|
||||
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, rows_schema,
|
||||
};
|
||||
|
||||
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
|
||||
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str, flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -67,6 +72,11 @@ async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_parquet_stats() {
|
||||
test_read_parquet_stats_with_format(false).await;
|
||||
test_read_parquet_stats_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_read_parquet_stats_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
check_prune_row_groups(
|
||||
@@ -88,12 +98,18 @@ async fn test_read_parquet_stats() {
|
||||
| 8 | 8.0 | 1970-01-01T00:00:08 |
|
||||
| 9 | 9.0 | 1970-01-01T00:00:09 |
|
||||
+-------+---------+---------------------+",
|
||||
flat_format,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_tag() {
|
||||
test_prune_tag_with_format(false).await;
|
||||
test_prune_tag_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_prune_tag_with_format(flat_format: bool) {
|
||||
// prune result: only row group 1&2
|
||||
check_prune_row_groups(
|
||||
vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))],
|
||||
@@ -107,12 +123,18 @@ async fn test_prune_tag() {
|
||||
| 8 | 8.0 | 1970-01-01T00:00:08 |
|
||||
| 9 | 9.0 | 1970-01-01T00:00:09 |
|
||||
+-------+---------+---------------------+",
|
||||
flat_format,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_tag_and_field() {
|
||||
test_prune_tag_and_field_with_format(false).await;
|
||||
test_prune_tag_and_field_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_prune_tag_and_field_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
// prune result: only row group 1
|
||||
check_prune_row_groups(
|
||||
@@ -128,6 +150,7 @@ async fn test_prune_tag_and_field() {
|
||||
| 6 | 6.0 | 1970-01-01T00:00:06 |
|
||||
| 7 | 7.0 | 1970-01-01T00:00:07 |
|
||||
+-------+---------+---------------------+",
|
||||
flat_format,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -147,8 +170,18 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_memtable() {
|
||||
test_prune_memtable_with_format(false).await;
|
||||
test_prune_memtable_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_prune_memtable_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -221,8 +254,18 @@ async fn test_prune_memtable() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prune_memtable_complex_expr() {
|
||||
test_prune_memtable_complex_expr_with_format(false).await;
|
||||
test_prune_memtable_complex_expr_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_prune_memtable_complex_expr_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -274,8 +317,18 @@ async fn test_prune_memtable_complex_expr() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_range_prune() {
|
||||
test_mem_range_prune_with_format(false).await;
|
||||
test_mem_range_prune_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_mem_range_prune_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
@@ -29,8 +29,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_with_min_sst_sequence() {
|
||||
test_scan_with_min_sst_sequence_with_format(false).await;
|
||||
test_scan_with_min_sst_sequence_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_scan_with_min_sst_sequence_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -155,8 +165,14 @@ async fn test_scan_with_min_sst_sequence() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_max_concurrent_scan_files() {
|
||||
test_max_concurrent_scan_files_with_format(false).await;
|
||||
test_max_concurrent_scan_files_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_max_concurrent_scan_files_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("test_max_concurrent_scan_files").await;
|
||||
let config = MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
max_concurrent_scan_files: 2,
|
||||
..Default::default()
|
||||
};
|
||||
@@ -206,9 +222,14 @@ async fn test_max_concurrent_scan_files() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_series_scan() {
|
||||
async fn test_series_scan_primarykey() {
|
||||
let mut env = TestEnv::with_prefix("test_series_scan").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: false,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
@@ -345,3 +366,149 @@ async fn test_series_scan() {
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_series_scan_flat() {
|
||||
let mut env = TestEnv::with_prefix("test_series_scan").await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new()
|
||||
.insert_option("compaction.type", "twcs")
|
||||
.insert_option("compaction.twcs.time_window", "1h")
|
||||
.build();
|
||||
let column_schemas = test_util::rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let put_flush_rows = async |start, end| {
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(start, end),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
test_util::flush_region(&engine, region_id, None).await;
|
||||
};
|
||||
// generates 3 SST files
|
||||
put_flush_rows(0, 3).await;
|
||||
put_flush_rows(2, 6).await;
|
||||
put_flush_rows(3600, 3603).await;
|
||||
// Put to memtable.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: test_util::build_rows(7200, 7203),
|
||||
};
|
||||
test_util::put_rows(&engine, region_id, rows).await;
|
||||
|
||||
let request = ScanRequest {
|
||||
distribution: Some(TimeSeriesDistribution::PerSeries),
|
||||
..Default::default()
|
||||
};
|
||||
let scanner = engine.scanner(region_id, request).await.unwrap();
|
||||
let Scanner::Series(mut scanner) = scanner else {
|
||||
panic!("Scanner should be series scan");
|
||||
};
|
||||
// 3 partition ranges for 3 time window.
|
||||
assert_eq!(
|
||||
3,
|
||||
scanner.properties().partitions[0].len(),
|
||||
"unexpected ranges: {:?}",
|
||||
scanner.properties().partitions
|
||||
);
|
||||
let raw_ranges: Vec<_> = scanner
|
||||
.properties()
|
||||
.partitions
|
||||
.iter()
|
||||
.flatten()
|
||||
.cloned()
|
||||
.collect();
|
||||
let mut new_ranges = Vec::with_capacity(3);
|
||||
for range in raw_ranges {
|
||||
new_ranges.push(vec![range]);
|
||||
}
|
||||
scanner
|
||||
.prepare(PrepareRequest {
|
||||
ranges: Some(new_ranges),
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let metrics_set = ExecutionPlanMetricsSet::default();
|
||||
|
||||
let mut partition_batches = vec![vec![]; 3];
|
||||
let mut streams: Vec<_> = (0..3)
|
||||
.map(|partition| {
|
||||
let stream = scanner
|
||||
.scan_partition(&Default::default(), &metrics_set, partition)
|
||||
.unwrap();
|
||||
Some(stream)
|
||||
})
|
||||
.collect();
|
||||
let mut num_done = 0;
|
||||
let mut schema = None;
|
||||
// Pull streams in round-robin fashion to get the consistent output from the sender.
|
||||
while num_done < 3 {
|
||||
if schema.is_none() {
|
||||
schema = Some(streams[0].as_ref().unwrap().schema().clone());
|
||||
}
|
||||
for i in 0..3 {
|
||||
let Some(mut stream) = streams[i].take() else {
|
||||
continue;
|
||||
};
|
||||
let Some(rb) = stream.try_next().await.unwrap() else {
|
||||
num_done += 1;
|
||||
continue;
|
||||
};
|
||||
partition_batches[i].push(rb);
|
||||
streams[i] = Some(stream);
|
||||
}
|
||||
}
|
||||
|
||||
let mut check_result = |expected| {
|
||||
let batches =
|
||||
RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap();
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
};
|
||||
|
||||
// Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 0 | 0.0 | 1970-01-01T00:00:00 |
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 2 | 2.0 | 1970-01-01T00:00:02 |
|
||||
| 3 | 3.0 | 1970-01-01T00:00:03 |
|
||||
| 7200 | 7200.0 | 1970-01-01T02:00:00 |
|
||||
| 7201 | 7201.0 | 1970-01-01T02:00:01 |
|
||||
| 7202 | 7202.0 | 1970-01-01T02:00:02 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 3600 | 3600.0 | 1970-01-01T01:00:00 |
|
||||
| 3601 | 3601.0 | 1970-01-01T01:00:01 |
|
||||
| 3602 | 3602.0 | 1970-01-01T01:00:02 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 4 | 4.0 | 1970-01-01T00:00:04 |
|
||||
| 5 | 5.0 | 1970-01-01T00:00:05 |
|
||||
+-------+---------+---------------------+";
|
||||
check_result(expected);
|
||||
}
|
||||
|
||||
@@ -57,13 +57,23 @@ fn assert_invalid_transition_response(response: &SetRegionRoleStateResponse) {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_role_state_gracefully() {
|
||||
test_set_role_state_gracefully_with_format(false).await;
|
||||
test_set_role_state_gracefully_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_set_role_state_gracefully_with_format(flat_format: bool) {
|
||||
let settable_role_states = [
|
||||
SettableRegionRoleState::Follower,
|
||||
SettableRegionRoleState::DowngradingLeader,
|
||||
];
|
||||
for settable_role_state in settable_role_states {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -122,8 +132,18 @@ async fn test_set_role_state_gracefully() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_role_state_gracefully_not_exist() {
|
||||
test_set_role_state_gracefully_not_exist_with_format(false).await;
|
||||
test_set_role_state_gracefully_not_exist_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_set_role_state_gracefully_not_exist_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let non_exist_region_id = RegionId::new(1, 1);
|
||||
|
||||
@@ -137,8 +157,18 @@ async fn test_set_role_state_gracefully_not_exist() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_downgrading_region() {
|
||||
test_write_downgrading_region_with_format(false).await;
|
||||
test_write_downgrading_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_write_downgrading_region_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -180,8 +210,18 @@ async fn test_write_downgrading_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_unified_state_transitions() {
|
||||
test_unified_state_transitions_with_format(false).await;
|
||||
test_unified_state_transitions_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_unified_state_transitions_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -279,8 +319,18 @@ async fn test_unified_state_transitions() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_restricted_state_transitions() {
|
||||
test_restricted_state_transitions_with_format(false).await;
|
||||
test_restricted_state_transitions_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_restricted_state_transitions_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
@@ -31,8 +31,18 @@ use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_state_integration() {
|
||||
test_staging_state_integration_with_format(false).await;
|
||||
test_staging_state_integration_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_staging_state_integration_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -79,8 +89,18 @@ async fn test_staging_state_integration() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_blocks_alter_operations() {
|
||||
test_staging_blocks_alter_operations_with_format(false).await;
|
||||
test_staging_blocks_alter_operations_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_staging_blocks_alter_operations_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -110,8 +130,18 @@ async fn test_staging_blocks_alter_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_blocks_truncate_operations() {
|
||||
test_staging_blocks_truncate_operations_with_format(false).await;
|
||||
test_staging_blocks_truncate_operations_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_staging_blocks_truncate_operations_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -186,8 +216,18 @@ async fn test_staging_state_validation_patterns() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_manifest_directory() {
|
||||
test_staging_manifest_directory_with_format(false).await;
|
||||
test_staging_manifest_directory_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_staging_manifest_directory_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1024, 0);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -267,8 +307,18 @@ async fn test_staging_manifest_directory() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_staging_exit_success_with_manifests() {
|
||||
test_staging_exit_success_with_manifests_with_format(false).await;
|
||||
test_staging_exit_success_with_manifests_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1024, 0);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
@@ -71,9 +71,19 @@ async fn scan_check(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_after_flush_region() {
|
||||
test_sync_after_flush_region_with_format(false).await;
|
||||
test_sync_after_flush_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_sync_after_flush_region_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let region_id = RegionId::new(1, 1);
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
@@ -100,7 +110,12 @@ async fn test_sync_after_flush_region() {
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
// Open the region on the follower engine
|
||||
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
|
||||
let follower_engine = env
|
||||
.create_follower_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
follower_engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -164,10 +179,20 @@ async fn test_sync_after_flush_region() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sync_after_alter_region() {
|
||||
test_sync_after_alter_region_with_format(false).await;
|
||||
test_sync_after_alter_region_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_sync_after_alter_region_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
@@ -197,7 +222,12 @@ async fn test_sync_after_alter_region() {
|
||||
put_rows(&engine, region_id, rows).await;
|
||||
|
||||
// Open the region on the follower engine
|
||||
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
|
||||
let follower_engine = env
|
||||
.create_follower_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
follower_engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
|
||||
@@ -33,8 +33,18 @@ use crate::test_util::{
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_truncate_region_basic() {
|
||||
test_engine_truncate_region_basic_with_format(false).await;
|
||||
test_engine_truncate_region_basic_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_truncate_region_basic_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("truncate-basic").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create the region.
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -86,8 +96,18 @@ async fn test_engine_truncate_region_basic() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_put_data_after_truncate() {
|
||||
test_engine_put_data_after_truncate_with_format(false).await;
|
||||
test_engine_put_data_after_truncate_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_put_data_after_truncate_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("truncate-put").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create the region.
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -152,8 +172,18 @@ async fn test_engine_put_data_after_truncate() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_truncate_after_flush() {
|
||||
test_engine_truncate_after_flush_with_format(false).await;
|
||||
test_engine_truncate_after_flush_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_truncate_after_flush_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("truncate-flush").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create the region.
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -232,8 +262,18 @@ async fn test_engine_truncate_after_flush() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_truncate_reopen() {
|
||||
test_engine_truncate_reopen_with_format(false).await;
|
||||
test_engine_truncate_reopen_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_truncate_reopen_with_format(flat_format: bool) {
|
||||
let mut env = TestEnv::with_prefix("truncate-reopen").await;
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
// Create the region.
|
||||
let region_id = RegionId::new(1, 1);
|
||||
@@ -266,7 +306,15 @@ async fn test_engine_truncate_reopen() {
|
||||
.unwrap();
|
||||
|
||||
// Reopen the region again.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
@@ -295,13 +343,21 @@ async fn test_engine_truncate_reopen() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_truncate_during_flush() {
|
||||
test_engine_truncate_during_flush_with_format(false).await;
|
||||
test_engine_truncate_during_flush_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_engine_truncate_during_flush_with_format(flat_format: bool) {
|
||||
init_default_ut_logging();
|
||||
let mut env = TestEnv::with_prefix("truncate-during-flush").await;
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushTruncateListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig::default(),
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager),
|
||||
Some(listener.clone()),
|
||||
None,
|
||||
@@ -376,7 +432,15 @@ async fn test_engine_truncate_during_flush() {
|
||||
assert_eq!(sequence, truncated_sequence);
|
||||
|
||||
// Reopen the engine.
|
||||
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
|
||||
let engine = env
|
||||
.reopen_engine(
|
||||
engine,
|
||||
MitoConfig {
|
||||
default_experimental_flat_format: flat_format,
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
engine
|
||||
.handle_request(
|
||||
region_id,
|
||||
|
||||
Reference in New Issue
Block a user