fix: complete partial index search results in cache (#6403)

* fix: complete partial index search results in cache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add initial tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* cover issue case

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* TestEnv new -> async

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-06-27 15:40:14 +08:00
committed by GitHub
parent 8473a34fc9
commit ff559b2688
40 changed files with 718 additions and 286 deletions

View File

@@ -475,7 +475,7 @@ mod test {
async fn region_alive_keeper() {
common_telemetry::init_default_ut_logging();
let mut region_server = mock_region_server();
let mut engine_env = TestEnv::with_prefix("region-alive-keeper");
let mut engine_env = TestEnv::with_prefix("region-alive-keeper").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
let engine = Arc::new(engine);
region_server.register_engine(engine.clone());

View File

@@ -278,7 +278,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("close-region");
let mut engine_env = TestEnv::with_prefix("close-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -326,7 +326,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-region");
let mut engine_env = TestEnv::with_prefix("open-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -374,7 +374,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("open-not-exists-region");
let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);
@@ -406,7 +406,7 @@ mod tests {
let mut region_server = mock_region_server();
let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
let mut engine_env = TestEnv::with_prefix("downgrade-region");
let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
let engine = engine_env.create_engine(MitoConfig::default()).await;
region_server.register_engine(Arc::new(engine));
let region_id = RegionId::new(1024, 1);

View File

@@ -59,7 +59,7 @@ impl TestEnv {
/// Returns a new env with specific `prefix` and `config` for test.
pub async fn with_prefix_and_config(prefix: &str, config: EngineConfig) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mut mito_env = MitoTestEnv::with_prefix(prefix).await;
let mito = mito_env.create_engine(MitoConfig::default()).await;
let metric = MetricEngine::try_new(mito.clone(), config).unwrap();
Self {

View File

@@ -245,7 +245,7 @@ mod test {
let blob = create_inverted_index_blob().await;
// Init a test range reader in local fs.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let file_size = blob.len() as u64;
let store = env.init_object_store_manager();
let temp_path = "data";

View File

@@ -31,6 +31,11 @@ use crate::sst::parquet::row_selection::RowGroupSelection;
const INDEX_RESULT_TYPE: &str = "index_result";
/// Cache for storing index query results.
///
/// The `RowGroupSelection` is a collection of row groups that match the predicate.
///
/// Row groups can be partially searched. Row groups that not contained in `RowGroupSelection` are not searched.
/// User can retrieve the partial results and handle uncontained row groups required by the predicate subsequently.
pub struct IndexResultCache {
cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
}
@@ -64,6 +69,8 @@ impl IndexResultCache {
}
/// Puts a query result into the cache.
///
/// Allow user to put a partial result (not containing all row groups) into the cache.
pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
let key = (key, file_id);
let size = Self::index_result_cache_weight(&key, &result);
@@ -74,6 +81,9 @@ impl IndexResultCache {
}
/// Gets a query result from the cache.
///
/// Note: the returned `RowGroupSelection` only contains the row groups that are searched.
/// Caller should handle the uncontained row groups required by the predicate subsequently.
pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
let res = self.cache.get(&(key.clone(), file_id));
if res.is_some() {

View File

@@ -450,7 +450,7 @@ mod tests {
async fn test_write_and_upload_sst() {
// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mock_store = env.init_object_store_manager();
let path_provider = RegionFilePathFactory::new("test".to_string());
@@ -538,7 +538,7 @@ mod tests {
#[tokio::test]
async fn test_read_metadata_from_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
@@ -607,7 +607,7 @@ mod tests {
#[tokio::test]
async fn test_write_cache_clean_tmp_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();

View File

@@ -115,7 +115,7 @@ fn check_region_version(
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -211,7 +211,7 @@ fn build_rows_for_tags(
#[tokio::test]
async fn test_put_after_alter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
@@ -316,7 +316,7 @@ async fn test_put_after_alter() {
async fn test_alter_region_retry() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -374,7 +374,7 @@ async fn test_alter_region_retry() {
async fn test_alter_on_flushing() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -478,7 +478,7 @@ async fn test_alter_on_flushing() {
async fn test_alter_column_fulltext_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -597,7 +597,7 @@ async fn test_alter_column_fulltext_options() {
async fn test_alter_column_set_inverted_index() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -707,7 +707,7 @@ async fn test_alter_column_set_inverted_index() {
async fn test_alter_region_ttl_options() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(AlterFlushListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -757,7 +757,7 @@ async fn test_alter_region_ttl_options() {
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_append_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_append_mode_write_query() {
#[tokio::test]
async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -42,7 +42,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_new_stop() {
let mut env = TestEnv::with_prefix("engine-stop");
let mut env = TestEnv::with_prefix("engine-stop").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -69,7 +69,7 @@ async fn test_engine_new_stop() {
#[tokio::test]
async fn test_write_to_region() {
let mut env = TestEnv::with_prefix("write-to-region");
let mut env = TestEnv::with_prefix("write-to-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -97,7 +97,9 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("region-replay")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -173,7 +175,7 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_write_query_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -207,7 +209,7 @@ async fn test_write_query_region() {
#[tokio::test]
async fn test_different_order() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -268,7 +270,7 @@ async fn test_different_order() {
#[tokio::test]
async fn test_different_order_and_type() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -332,7 +334,7 @@ async fn test_different_order_and_type() {
async fn test_put_delete() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -384,7 +386,7 @@ async fn test_put_delete() {
#[tokio::test]
async fn test_delete_not_null_fields() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -433,7 +435,7 @@ async fn test_delete_not_null_fields() {
#[tokio::test]
async fn test_put_overwrite() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -493,7 +495,7 @@ async fn test_put_overwrite() {
#[tokio::test]
async fn test_absent_and_invalid_columns() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -541,7 +543,7 @@ async fn test_absent_and_invalid_columns() {
#[tokio::test]
async fn test_region_usage() {
let mut env = TestEnv::with_prefix("region_usage");
let mut env = TestEnv::with_prefix("region_usage").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -595,7 +597,7 @@ async fn test_region_usage() {
async fn test_engine_with_write_cache() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;
@@ -635,7 +637,7 @@ async fn test_engine_with_write_cache() {
#[tokio::test]
async fn test_cache_null_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
vector_cache_size: ReadableSize::mb(32),

View File

@@ -39,8 +39,9 @@ async fn test_batch_open(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
@@ -160,8 +161,9 @@ async fn test_batch_open_err(factory: Option<LogStoreFactory>) {
let Some(factory) = factory else {
return;
};
let mut env =
TestEnv::with_prefix("open-batch-regions-err").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("open-batch-regions-err")
.await
.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let mut options = HashMap::new();

View File

@@ -57,7 +57,9 @@ async fn test_catchup_with_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -175,8 +177,9 @@ async fn test_catchup_with_incorrect_last_entry_id(factory: Option<LogStoreFacto
return;
};
let mut env =
TestEnv::with_prefix("incorrect_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("incorrect_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -277,8 +280,9 @@ async fn test_catchup_without_last_entry_id(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_last_entry_id").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_last_entry_id")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -380,8 +384,9 @@ async fn test_catchup_with_manifest_update(factory: Option<LogStoreFactory>) {
return;
};
let mut env =
TestEnv::with_prefix("without_manifest_update").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("without_manifest_update")
.await
.with_log_store_factory(factory.clone());
let topic = prepare_test_for_kafka_log_store(&factory).await;
let leader_engine = env.create_engine(MitoConfig::default()).await;
let follower_engine = env.create_follower_engine(MitoConfig::default()).await;
@@ -545,7 +550,9 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::with_prefix("local_catchup").with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("local_catchup")
.await
.with_log_store_factory(factory.clone());
let leader_engine = env.create_engine(MitoConfig::default()).await;
let Some(LogStoreImpl::RaftEngine(log_store)) = env.get_log_store() else {
unreachable!()
@@ -686,7 +693,7 @@ async fn test_local_catchup(factory: Option<LogStoreFactory>) {
#[tokio::test]
async fn test_catchup_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);

View File

@@ -21,7 +21,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_engine_close_region() {
let mut env = TestEnv::with_prefix("close");
let mut env = TestEnv::with_prefix("close").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -136,7 +136,7 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -202,7 +202,7 @@ async fn test_compaction_region() {
#[tokio::test]
async fn test_infer_compaction_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -341,7 +341,7 @@ async fn test_infer_compaction_time_window() {
#[tokio::test]
async fn test_compaction_overlapping_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -402,7 +402,7 @@ async fn test_compaction_overlapping_files() {
#[tokio::test]
async fn test_compaction_region_with_overlapping() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -450,7 +450,7 @@ async fn test_compaction_region_with_overlapping() {
#[tokio::test]
async fn test_compaction_region_with_overlapping_delete_all() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -506,7 +506,7 @@ async fn test_compaction_region_with_overlapping_delete_all() {
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
@@ -590,7 +590,7 @@ async fn test_readonly_during_compaction() {
#[tokio::test]
async fn test_compaction_update_time_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -686,7 +686,7 @@ async fn test_compaction_update_time_window() {
#[tokio::test]
async fn test_change_region_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -811,7 +811,7 @@ async fn test_change_region_compaction_window() {
#[tokio::test]
async fn test_open_overwrite_compaction_window() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -26,7 +26,7 @@ use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder,
#[tokio::test]
async fn test_engine_create_new_region() {
let mut env = TestEnv::with_prefix("new-region");
let mut env = TestEnv::with_prefix("new-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -41,7 +41,7 @@ async fn test_engine_create_new_region() {
#[tokio::test]
async fn test_engine_create_existing_region() {
let mut env = TestEnv::with_prefix("create-existing");
let mut env = TestEnv::with_prefix("create-existing").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -61,7 +61,7 @@ async fn test_engine_create_existing_region() {
#[tokio::test]
async fn test_engine_create_close_create_region() {
// This test will trigger create_or_open function.
let mut env = TestEnv::with_prefix("create-close-create");
let mut env = TestEnv::with_prefix("create-close-create").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_engine_create_close_create_region() {
#[tokio::test]
async fn test_engine_create_with_different_id() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -110,7 +110,7 @@ async fn test_engine_create_with_different_id() {
#[tokio::test]
async fn test_engine_create_with_different_schema() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -130,7 +130,7 @@ async fn test_engine_create_with_different_schema() {
#[tokio::test]
async fn test_engine_create_with_different_primary_key() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_create_with_different_primary_key() {
#[tokio::test]
async fn test_engine_create_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -172,7 +172,7 @@ async fn test_engine_create_with_options() {
#[tokio::test]
async fn test_engine_create_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -204,7 +204,7 @@ async fn test_engine_create_with_custom_store() {
#[tokio::test]
async fn test_engine_create_with_memtable_opts() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -35,7 +35,7 @@ use crate::worker::DROPPING_MARKER_FILE;
async fn test_engine_drop_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
@@ -143,7 +143,7 @@ async fn test_engine_drop_region_for_custom_store() {
put_rows(engine, region_id, rows).await;
flush_region(engine, region_id, None).await;
}
let mut env = TestEnv::with_prefix("drop");
let mut env = TestEnv::with_prefix("drop").await;
let listener = Arc::new(DropListener::new(Duration::from_millis(100)));
let engine = env
.create_engine_with_multiple_object_stores(

View File

@@ -33,7 +33,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<RegionId>>>,
@@ -122,7 +122,7 @@ async fn test_edit_region_schedule_compaction() {
#[tokio::test]
async fn test_edit_region_fill_cache() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
struct EditRegionListener {
tx: Mutex<Option<oneshot::Sender<FileId>>>,
@@ -241,7 +241,7 @@ async fn test_edit_region_concurrently() {
}
}
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
// Suppress the compaction to not impede the speed of this kinda stress testing.

View File

@@ -28,7 +28,7 @@ use crate::test_util::{
async fn test_scan_without_filtering_deleted() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -41,7 +41,7 @@ use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
#[tokio::test]
async fn test_manual_flush() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -91,7 +91,7 @@ async fn test_manual_flush() {
#[tokio::test]
async fn test_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -161,7 +161,7 @@ async fn test_flush_engine() {
#[tokio::test]
async fn test_write_stall() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(StallListener::default());
let engine = env
@@ -236,7 +236,7 @@ async fn test_write_stall() {
#[tokio::test]
async fn test_flush_empty() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let engine = env
.create_engine_with(
@@ -289,7 +289,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
return;
};
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
@@ -396,7 +396,7 @@ impl MockTimeProvider {
#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
@@ -467,7 +467,7 @@ async fn test_auto_flush_engine() {
#[tokio::test]
async fn test_flush_workers() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let engine = env
@@ -554,7 +554,7 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::new().await.with_log_store_factory(factory.clone());
let engine = env
.create_engine_with(
MitoConfig::default(),

View File

@@ -31,7 +31,7 @@ use crate::test_util::{
async fn test_merge_mode_write_query() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -89,7 +89,7 @@ async fn test_merge_mode_write_query() {
async fn test_merge_mode_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
..Default::default()

View File

@@ -36,7 +36,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_open_empty() {
let mut env = TestEnv::with_prefix("open-empty");
let mut env = TestEnv::with_prefix("open-empty").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -63,7 +63,7 @@ async fn test_engine_open_empty() {
#[tokio::test]
async fn test_engine_open_existing() {
let mut env = TestEnv::with_prefix("open-exiting");
let mut env = TestEnv::with_prefix("open-exiting").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -90,7 +90,7 @@ async fn test_engine_open_existing() {
#[tokio::test]
async fn test_engine_reopen_region() {
let mut env = TestEnv::with_prefix("reopen-region");
let mut env = TestEnv::with_prefix("reopen-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -107,7 +107,7 @@ async fn test_engine_reopen_region() {
#[tokio::test]
async fn test_engine_open_readonly() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -150,7 +150,7 @@ async fn test_engine_open_readonly() {
#[tokio::test]
async fn test_engine_region_open_with_options() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -190,7 +190,7 @@ async fn test_engine_region_open_with_options() {
#[tokio::test]
async fn test_engine_region_open_with_custom_store() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env
.create_engine_with_multiple_object_stores(MitoConfig::default(), None, None, &["Gcs"])
.await;
@@ -244,7 +244,7 @@ async fn test_engine_region_open_with_custom_store() {
#[tokio::test]
async fn test_open_region_skip_wal_replay() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -344,7 +344,7 @@ async fn test_open_region_skip_wal_replay() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_ok() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok");
let mut env = TestEnv::with_prefix("wait-for-opening-region-ok").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -383,7 +383,7 @@ async fn test_open_region_wait_for_opening_region_ok() {
#[tokio::test]
async fn test_open_region_wait_for_opening_region_err() {
let mut env = TestEnv::with_prefix("wait-for-opening-region-err");
let mut env = TestEnv::with_prefix("wait-for-opening-region-err").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let worker = engine.inner.workers.worker(region_id);
@@ -428,7 +428,7 @@ async fn test_open_region_wait_for_opening_region_err() {
#[tokio::test]
async fn test_open_compaction_region() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let mut mito_config = MitoConfig::default();
mito_config
.sanitize(&env.data_home().display().to_string())

View File

@@ -73,7 +73,7 @@ async fn scan_in_parallel(
#[tokio::test]
async fn test_parallel_scan() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -53,7 +53,7 @@ fn build_rows_multi_tags_fields(
#[tokio::test]
async fn test_scan_projection() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{
};
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -147,7 +147,7 @@ fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
#[tokio::test]
async fn test_prune_memtable() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -221,7 +221,7 @@ async fn test_prune_memtable() {
#[tokio::test]
async fn test_prune_memtable_complex_expr() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -274,7 +274,7 @@ async fn test_prune_memtable_complex_expr() {
#[tokio::test]
async fn test_mem_range_prune() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -25,7 +25,7 @@ use crate::test_util::{
};
async fn test_last_row(append_mode: bool) {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -27,7 +27,7 @@ use crate::test_util::{CreateRequestBuilder, TestEnv};
#[tokio::test]
async fn test_scan_with_min_sst_sequence() {
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence");
let mut env = TestEnv::with_prefix("test_scan_with_min_sst_sequence").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -153,7 +153,7 @@ async fn test_scan_with_min_sst_sequence() {
#[tokio::test]
async fn test_series_scan() {
let mut env = TestEnv::with_prefix("test_series_scan");
let mut env = TestEnv::with_prefix("test_series_scan").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -32,7 +32,7 @@ async fn test_set_role_state_gracefully() {
SettableRegionRoleState::DowngradingLeader,
];
for settable_role_state in settable_role_states {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
@@ -101,7 +101,7 @@ async fn test_set_role_state_gracefully() {
#[tokio::test]
async fn test_set_role_state_gracefully_not_exist() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let non_exist_region_id = RegionId::new(1, 1);
@@ -116,7 +116,7 @@ async fn test_set_role_state_gracefully_not_exist() {
#[tokio::test]
async fn test_write_downgrading_region() {
let mut env = TestEnv::with_prefix("write-to-downgrading-region");
let mut env = TestEnv::with_prefix("write-to-downgrading-region").await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -71,7 +71,7 @@ async fn scan_check(
#[tokio::test]
async fn test_sync_after_flush_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
@@ -163,7 +163,7 @@ async fn test_sync_after_flush_region() {
async fn test_sync_after_alter_region() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);

View File

@@ -33,7 +33,7 @@ use crate::test_util::{
#[tokio::test]
async fn test_engine_truncate_region_basic() {
let mut env = TestEnv::with_prefix("truncate-basic");
let mut env = TestEnv::with_prefix("truncate-basic").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -83,7 +83,7 @@ async fn test_engine_truncate_region_basic() {
#[tokio::test]
async fn test_engine_put_data_after_truncate() {
let mut env = TestEnv::with_prefix("truncate-put");
let mut env = TestEnv::with_prefix("truncate-put").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -146,7 +146,7 @@ async fn test_engine_put_data_after_truncate() {
#[tokio::test]
async fn test_engine_truncate_after_flush() {
let mut env = TestEnv::with_prefix("truncate-flush");
let mut env = TestEnv::with_prefix("truncate-flush").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -223,7 +223,7 @@ async fn test_engine_truncate_after_flush() {
#[tokio::test]
async fn test_engine_truncate_reopen() {
let mut env = TestEnv::with_prefix("truncate-reopen");
let mut env = TestEnv::with_prefix("truncate-reopen").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Create the region.
@@ -282,7 +282,7 @@ async fn test_engine_truncate_reopen() {
#[tokio::test]
async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let mut env = TestEnv::with_prefix("truncate-during-flush").await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushTruncateListener::default());
let engine = env

View File

@@ -594,7 +594,7 @@ mod test {
#[tokio::test]
async fn create_manifest_manager() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
@@ -606,7 +606,7 @@ mod test {
#[tokio::test]
async fn open_manifest_manager() {
let env = TestEnv::new();
let env = TestEnv::new().await;
// Try to opens an empty manifest.
assert!(env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
@@ -637,7 +637,7 @@ mod test {
#[tokio::test]
async fn region_change_add_column() {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let mut manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
@@ -696,7 +696,7 @@ mod test {
let metadata = Arc::new(basic_region_metadata());
let data_home = create_temp_dir("");
let data_home_path = data_home.path().to_str().unwrap().to_string();
let env = TestEnv::with_data_home(data_home);
let env = TestEnv::with_data_home(data_home).await;
let manifest_dir = format!("{}/manifest", data_home_path);

View File

@@ -34,7 +34,7 @@ async fn build_manager(
compress_type: CompressionType,
) -> (TestEnv, RegionManifestManager) {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let env = TestEnv::new().await;
let manager = env
.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
.await

View File

@@ -128,6 +128,9 @@ impl BloomFilterIndexApplier {
/// list of row group ranges that match the predicates.
///
/// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
pub async fn apply(
&self,
file_id: FileId,
@@ -195,7 +198,6 @@ impl BloomFilterIndexApplier {
range.end -= start;
}
}
output.retain(|(_, ranges)| !ranges.is_empty());
Ok(output)
}
@@ -386,6 +388,9 @@ mod tests {
.apply(file_id, None, row_groups.into_iter())
.await
.unwrap()
.into_iter()
.filter(|(_, ranges)| !ranges.is_empty())
.collect()
})
}
}

View File

@@ -231,6 +231,9 @@ impl FulltextIndexApplier {
impl FulltextIndexApplier {
/// Applies coarse-grained fulltext index to the specified SST file.
/// Returns (row group id -> ranges) that match the queries.
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
pub async fn apply_coarse(
&self,
file_id: FileId,
@@ -367,7 +370,7 @@ impl FulltextIndexApplier {
/// Adjusts the coarse output. Makes the output ranges based on row group start.
fn adjust_coarse_output(
input: Vec<(usize, Range<usize>)>,
output: &mut Vec<(usize, Vec<Range<usize>>)>,
output: &mut [(usize, Vec<Range<usize>>)],
) {
// adjust ranges to be based on row group
for ((_, output), (_, input)) in output.iter_mut().zip(input) {
@@ -377,7 +380,6 @@ impl FulltextIndexApplier {
range.end -= start;
}
}
output.retain(|(_, ranges)| !ranges.is_empty());
}
/// Converts terms to predicates.

View File

@@ -625,6 +625,7 @@ mod tests {
.unwrap();
resp.map(|r| {
r.into_iter()
.filter(|(_, ranges)| !ranges.is_empty())
.map(|(row_group_id, _)| row_group_id as RowId)
.collect()
})

View File

@@ -14,10 +14,8 @@
use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use store_api::storage::ColumnId;
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{
@@ -113,13 +111,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_inverted_index_output(
&mut index_output.inverted_index,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -150,13 +149,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_fulltext_index_output(
&mut index_output.fulltext_index,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -187,13 +187,14 @@ impl Indexer {
return true;
};
let column_ids = indexer.column_ids().collect();
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_bloom_filter_output(
&mut index_output.bloom_filter,
row_count,
byte_count,
&indexer,
column_ids,
);
return true;
}
@@ -220,16 +221,16 @@ impl Indexer {
output: &mut InvertedIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &InvertedIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
fn fill_fulltext_index_output(
@@ -237,16 +238,16 @@ impl Indexer {
output: &mut FulltextIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &FulltextIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
fn fill_bloom_filter_output(
@@ -254,15 +255,15 @@ impl Indexer {
output: &mut BloomFilterOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &BloomFilterIndexer,
column_ids: Vec<ColumnId>,
) {
debug!(
"Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
"Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}, columns: {:?}",
self.region_id, self.file_id, byte_count, row_count, column_ids
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
output.columns = column_ids;
}
}

View File

@@ -88,11 +88,12 @@ pub struct SstInfo {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datafusion_expr::{col, lit, BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
@@ -104,12 +105,17 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::BatchReader;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
@@ -147,7 +153,7 @@ mod tests {
#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = FixedPathProvider {
@@ -205,7 +211,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_cache() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -275,7 +281,7 @@ mod tests {
#[tokio::test]
async fn test_parquet_metadata_eq() {
// create test env
let mut env = crate::test_util::TestEnv::new();
let mut env = crate::test_util::TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -318,7 +324,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_tag_filter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -370,7 +376,7 @@ mod tests {
#[tokio::test]
async fn test_read_empty_batch() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -407,7 +413,7 @@ mod tests {
#[tokio::test]
async fn test_read_with_field_filter() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
@@ -453,7 +459,7 @@ mod tests {
#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
@@ -544,7 +550,7 @@ mod tests {
async fn test_write_multiple_files() {
common_telemetry::init_default_ut_logging();
// create test env
let mut env = TestEnv::new();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
@@ -593,4 +599,300 @@ mod tests {
}
assert_eq!(total_rows, rows_read);
}
#[tokio::test]
async fn test_write_read_with_index() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string());
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 50;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 20),
new_batch_by_range(&["c", "f"], 0, 40),
new_batch_by_range(&["c", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let puffin_manager = env
.get_puffin_manager()
.build(object_store.clone(), file_path.clone());
let intermediate_manager = env.get_intermediate_manager();
let indexer_builder = IndexerBuilderImpl {
op_type: OperationType::Flush,
metadata: metadata.clone(),
row_group_size,
puffin_manager,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
segment_row_count: 1,
..Default::default()
},
},
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
)
.await;
let info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
assert!(info.index_metadata.inverted_index.index_size > 0);
assert_eq!(info.index_metadata.inverted_index.row_count, 200);
assert_eq!(info.index_metadata.inverted_index.columns, vec![0]);
assert!(info.index_metadata.bloom_filter.index_size > 0);
assert_eq!(info.index_metadata.bloom_filter.row_count, 200);
assert_eq!(info.index_metadata.bloom_filter.columns, vec![1]);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);
let handle = FileHandle::new(
FileMeta {
region_id: metadata.region_id,
file_id: info.file_id,
time_range: info.time_range,
level: 0,
file_size: info.file_size,
available_indexes: info.index_metadata.build_available_indexes(),
index_file_size: info.index_metadata.file_size,
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
},
Arc::new(NoopFilePurger),
);
let cache = Arc::new(
CacheManager::builder()
.index_result_cache_size(1024 * 1024)
.index_metadata_size(1024 * 1024)
.index_content_page_size(1024 * 1024)
.index_content_size(1024 * 1024)
.puffin_metadata_size(1024 * 1024)
.build(),
);
let index_result_cache = cache.index_result_cache().unwrap();
let build_inverted_index_applier = |exprs: &[Expr]| {
InvertedIndexApplierBuilder::new(
FILE_DIR.to_string(),
object_store.clone(),
&metadata,
HashSet::from_iter([0]),
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
.build(exprs)
.unwrap()
.map(Arc::new)
};
let build_bloom_filter_applier = |exprs: &[Expr]| {
BloomFilterIndexApplierBuilder::new(
FILE_DIR.to_string(),
object_store.clone(),
&metadata,
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
.build(exprs)
.unwrap()
.map(Arc::new)
};
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: tag_0 = "b"
//
// Row groups & rows pruning:
//
// Row Groups:
// - min-max: filter out row groups 1..=3
//
// Rows:
// - inverted index: hit row group 0, hit 20 rows
let preds = vec![col("tag_0").eq(lit("b"))];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3);
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 30);
let cached = index_result_cache
.get(
inverted_index_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
// inverted index will search all row groups
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: 50 <= ts && ts < 200 && tag_1 = "d"
//
// Row groups & rows pruning:
//
// Row Groups:
// - min-max: filter out row groups 0..=1
// - bloom filter: filter out row groups 2..=3
let preds = vec![
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
col("tag_1").eq(lit("d")),
];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(&mut reader, &[]).await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 2);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
assert!(!cached.contains_row_group(0));
assert!(!cached.contains_row_group(1));
// Remove the pred of `ts`, continue to use the pred of `tag_1`
// to test if cache works.
// Data: ts tag_0 tag_1
// Data: 0-20 [a, d]
// 0-20 [b, d]
// 0-20 [c, d]
// 0-40 [c, f]
// 100-200 [c, h]
//
// Pred: tag_1 = "d"
//
// Row groups & rows pruning:
//
// Row Groups:
// - bloom filter: filter out row groups 2..=3
//
// Rows:
// - bloom filter: hit row group 0, hit 50 rows
// hit row group 1, hit 10 rows
let preds = vec![col("tag_1").eq(lit("d"))];
let inverted_index_applier = build_inverted_index_applier(&preds);
let bloom_filter_applier = build_bloom_filter_applier(&preds);
let builder =
ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store.clone())
.predicate(Some(Predicate::new(preds)))
.inverted_index_applier(inverted_index_applier.clone())
.bloom_filter_index_applier(bloom_filter_applier.clone())
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
let mut reader = ParquetReader::new(Arc::new(context), selection)
.await
.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 20),
new_batch_by_range(&["b", "d"], 0, 20),
new_batch_by_range(&["c", "d"], 0, 10),
new_batch_by_range(&["c", "d"], 10, 20),
],
)
.await;
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 140);
let cached = index_result_cache
.get(
bloom_filter_applier.unwrap().predicate_key(),
handle.file_id(),
)
.unwrap();
assert!(cached.contains_row_group(0));
assert!(cached.contains_row_group(1));
assert!(cached.contains_row_group(2));
assert!(cached.contains_row_group(3));
}
}

View File

@@ -378,14 +378,24 @@ impl ParquetReaderBuilder {
}
let fulltext_filtered = self
.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
.prune_row_groups_by_fulltext_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
.await;
self.prune_row_groups_by_inverted_index(
row_group_size,
num_row_groups,
&mut output,
metrics,
)
.await;
if output.is_empty() {
return output;
}
@@ -412,7 +422,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_fulltext_index(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
@@ -425,14 +435,15 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_FULLTEXT,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
}
// Slow path: apply the index from the file.
@@ -441,9 +452,7 @@ impl ParquetReaderBuilder {
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(Some(res)) => {
RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups())
}
Ok(Some(res)) => RowGroupSelection::from_row_ids(res, row_group_size, num_row_groups),
Ok(None) => return false,
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
@@ -470,6 +479,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_inverted_index(
&self,
row_group_size: usize,
num_row_groups: usize,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
) -> bool {
@@ -482,14 +492,15 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_INVERTED,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
return true;
}
}
// Slow path: apply the index from the file.
@@ -498,9 +509,11 @@ impl ParquetReaderBuilder {
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await;
let selection = match apply_res {
Ok(output) => {
RowGroupSelection::from_inverted_index_apply_output(row_group_size, output)
}
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
return false;
@@ -534,27 +547,34 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_BLOOM,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
return true;
}
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let selection = match apply_res {
let mut selection = match apply_res {
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
@@ -562,6 +582,11 @@ impl ParquetReaderBuilder {
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id(),
@@ -589,27 +614,34 @@ impl ParquetReaderBuilder {
let predicate_key = index_applier.predicate_key();
// Fast path: return early if the result is in the cache.
if self.index_result_cache_get(
predicate_key,
self.file_handle.file_id(),
output,
metrics,
INDEX_TYPE_FULLTEXT,
) {
return true;
let cached = self
.cache_strategy
.index_result_cache()
.and_then(|cache| cache.get(predicate_key, self.file_handle.file_id()));
if let Some(result) = cached.as_ref() {
if all_required_row_groups_searched(output, result) {
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
return true;
}
}
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta
.row_groups()
.iter()
.enumerate()
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
rg.num_rows() as usize,
// Optimize: only search the row group that required by `output` and not stored in `cached`.
output.contains_non_empty_row_group(i)
&& cached
.as_ref()
.map(|c| !c.contains_row_group(i))
.unwrap_or(true),
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.await;
let selection = match apply_res {
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
}
@@ -620,6 +652,11 @@ impl ParquetReaderBuilder {
}
};
// New searched row groups are added to `selection`, concat them with `cached`.
if let Some(cached) = cached.as_ref() {
selection.concat(cached);
}
self.apply_index_result_and_update_cache(
predicate_key,
self.file_handle.file_id(),
@@ -674,24 +711,6 @@ impl ParquetReaderBuilder {
true
}
fn index_result_cache_get(
&self,
predicate_key: &PredicateKey,
file_id: FileId,
output: &mut RowGroupSelection,
metrics: &mut ReaderFilterMetrics,
index_type: &str,
) -> bool {
if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
let result = index_result_cache.get(predicate_key, file_id);
if let Some(result) = result {
apply_selection_and_update_metrics(output, &result, metrics, index_type);
return true;
}
}
false
}
fn apply_index_result_and_update_cache(
&self,
predicate_key: &PredicateKey,
@@ -725,6 +744,18 @@ fn apply_selection_and_update_metrics(
*output = intersection;
}
fn all_required_row_groups_searched(
required_row_groups: &RowGroupSelection,
cached_row_groups: &RowGroupSelection,
) -> bool {
required_row_groups.iter().all(|(rg_id, _)| {
// Row group with no rows is not required to search.
!required_row_groups.contains_non_empty_row_group(*rg_id)
// The row group is already searched.
|| cached_row_groups.contains_row_group(*rg_id)
})
}
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
@@ -1131,7 +1162,10 @@ impl Drop for ParquetReader {
impl ParquetReader {
/// Creates a new reader.
async fn new(context: FileRangeContextRef, mut selection: RowGroupSelection) -> Result<Self> {
pub(crate) async fn new(
context: FileRangeContextRef,
mut selection: RowGroupSelection,
) -> Result<Self> {
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context

View File

@@ -31,7 +31,7 @@ pub struct RowGroupSelection {
}
/// A row selection with its count.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
struct RowSelectionWithCount {
/// Row selection.
selection: RowSelection,
@@ -95,6 +95,7 @@ impl RowGroupSelection {
/// * The last row group may have fewer rows than `row_group_size`
pub fn from_inverted_index_apply_output(
row_group_size: usize,
num_row_groups: usize,
apply_output: ApplyOutput,
) -> Self {
// Step 1: Convert segment IDs to row ranges within row groups
@@ -116,7 +117,7 @@ impl RowGroupSelection {
// Step 2: Group ranges by row group ID and create row selections
let mut total_row_count = 0;
let mut total_selector_len = 0;
let selection_in_rg = row_group_ranges
let mut selection_in_rg = row_group_ranges
.chunk_by(|(row_group_id, _)| *row_group_id)
.into_iter()
.map(|(row_group_id, group)| {
@@ -141,7 +142,9 @@ impl RowGroupSelection {
},
)
})
.collect();
.collect::<BTreeMap<_, _>>();
Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
Self {
selection_in_rg,
@@ -173,7 +176,7 @@ impl RowGroupSelection {
// Step 2: Create row selections for each row group
let mut total_row_count = 0;
let mut total_selector_len = 0;
let selection_in_rg = row_group_to_row_ids
let mut selection_in_rg = row_group_to_row_ids
.into_iter()
.map(|(row_group_id, row_ids)| {
let selection =
@@ -191,7 +194,9 @@ impl RowGroupSelection {
},
)
})
.collect();
.collect::<BTreeMap<_, _>>();
Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
Self {
selection_in_rg,
@@ -324,19 +329,26 @@ impl RowGroupSelection {
}
/// Returns the first row group in the selection.
///
/// Skip the row group if the row count is 0.
pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
let (
while let Some((
row_group_id,
RowSelectionWithCount {
selection,
row_count,
selector_len,
},
) = self.selection_in_rg.pop_first()?;
)) = self.selection_in_rg.pop_first()
{
if row_count > 0 {
self.row_count -= row_count;
self.selector_len -= selector_len;
return Some((row_group_id, selection));
}
}
self.row_count -= row_count;
self.selector_len -= selector_len;
Some((row_group_id, selection))
None
}
/// Removes a row group from the selection.
@@ -363,6 +375,14 @@ impl RowGroupSelection {
self.selection_in_rg.contains_key(&row_group_id)
}
/// Returns true if the selection contains a row group with the given ID and the row selection is not empty.
pub fn contains_non_empty_row_group(&self, row_group_id: usize) -> bool {
self.selection_in_rg
.get(&row_group_id)
.map(|r| r.row_count > 0)
.unwrap_or(false)
}
/// Returns an iterator over the row groups in the selection.
pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
self.selection_in_rg
@@ -375,6 +395,32 @@ impl RowGroupSelection {
self.selector_len * size_of::<RowSelector>()
+ self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
}
/// Concatenates `other` into `self`. `other` must not contain row groups that `self` contains.
///
/// Panics if `self` contains row groups that `other` contains.
pub fn concat(&mut self, other: &Self) {
for (rg_id, other_rs) in other.selection_in_rg.iter() {
if self.selection_in_rg.contains_key(rg_id) {
panic!("row group {} is already in `self`", rg_id);
}
self.selection_in_rg.insert(*rg_id, other_rs.clone());
self.row_count += other_rs.row_count;
self.selector_len += other_rs.selector_len;
}
}
/// Fills the missing row groups with empty selections.
/// This is to indicate that the row groups are searched even if no rows are found.
fn fill_missing_row_groups(
selection_in_rg: &mut BTreeMap<usize, RowSelectionWithCount>,
num_row_groups: usize,
) {
for rg_id in 0..num_row_groups {
selection_in_rg.entry(rg_id).or_default();
}
}
}
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
@@ -678,15 +724,14 @@ mod tests {
let selection =
RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 0);
assert_eq!(selection.row_group_count(), 0);
assert!(selection.get(0).is_none());
assert_eq!(selection.row_group_count(), 3);
// Test with consecutive row IDs
let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
let selection =
RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
assert_eq!(selection.row_count(), 5);
assert_eq!(selection.row_group_count(), 1);
assert_eq!(selection.row_group_count(), 3);
let row_selection = selection.get(0).unwrap();
assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
@@ -1047,4 +1092,25 @@ mod tests {
let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
assert!(!empty_selection.contains_row_group(0));
}
#[test]
fn test_concat() {
let row_group_size = 100;
let ranges1 = vec![
(0, vec![5..15]), // Within [0, 100)
(1, vec![5..15]), // Within [0, 100)
];
let ranges2 = vec![
(2, vec![5..15]), // Within [0, 100)
(3, vec![5..15]), // Within [0, 100)
];
let mut selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
selection1.concat(&selection2);
assert_eq!(selection1.row_count(), 40);
assert_eq!(selection1.row_group_count(), 4);
}
}

View File

@@ -202,6 +202,8 @@ pub(crate) enum LogStoreImpl {
pub struct TestEnv {
/// Path to store data.
data_home: TempDir,
intermediate_manager: IntermediateManager,
puffin_manager: PuffinManagerFactory,
log_store: Option<LogStoreImpl>,
log_store_factory: LogStoreFactory,
object_store_manager: Option<ObjectStoreManagerRef>,
@@ -209,44 +211,33 @@ pub struct TestEnv {
kv_backend: KvBackendRef,
}
impl Default for TestEnv {
fn default() -> Self {
TestEnv::new()
}
}
impl TestEnv {
/// Returns a new env with empty prefix for test.
pub fn new() -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(""),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager,
kv_backend,
}
pub async fn new() -> TestEnv {
Self::with_prefix("").await
}
/// Returns a new env with specific `prefix` for test.
pub fn with_prefix(prefix: &str) -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(prefix),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager,
kv_backend,
}
pub async fn with_prefix(prefix: &str) -> TestEnv {
Self::with_data_home(create_temp_dir(prefix)).await
}
/// Returns a new env with specific `data_home` for test.
pub fn with_data_home(data_home: TempDir) -> TestEnv {
pub async fn with_data_home(data_home: TempDir) -> TestEnv {
let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
let index_aux_path = data_home.path().join("index_aux");
let puffin_manager = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intermediate_manager = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
TestEnv {
data_home,
intermediate_manager,
puffin_manager,
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
@@ -587,17 +578,15 @@ impl TestEnv {
local_store: ObjectStore,
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
let write_cache = WriteCache::new(
local_store,
capacity,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)
.await
.unwrap();
Arc::new(write_cache)
}
@@ -608,17 +597,15 @@ impl TestEnv {
path: &str,
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let write_cache = WriteCache::new_fs(path, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
let write_cache = WriteCache::new_fs(
path,
capacity,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)
.await
.unwrap();
Arc::new(write_cache)
}
@@ -634,6 +621,14 @@ impl TestEnv {
pub(crate) fn get_log_store(&self) -> Option<LogStoreImpl> {
self.log_store.as_ref().cloned()
}
pub fn get_puffin_manager(&self) -> PuffinManagerFactory {
self.puffin_manager.clone()
}
pub fn get_intermediate_manager(&self) -> IntermediateManager {
self.intermediate_manager.clone()
}
}
/// Builder to mock a [RegionCreateRequest].

View File

@@ -20,7 +20,7 @@ use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
use datatypes::value::ValueRef;
use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
use parquet::file::metadata::ParquetMetaData;
@@ -57,7 +57,12 @@ pub fn sst_region_metadata() -> RegionMetadata {
"tag_1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
)
.with_skipping_options(SkippingIndexOptions {
granularity: 1,
..Default::default()
})
.unwrap(),
semantic_type: SemanticType::Tag,
column_id: 1,
})

View File

@@ -1142,7 +1142,7 @@ mod tests {
#[tokio::test]
async fn test_worker_group_start_stop() {
let env = TestEnv::with_prefix("group-stop");
let env = TestEnv::with_prefix("group-stop").await;
let group = env
.create_worker_group(MitoConfig {
num_workers: 4,