From 03a29c6591e97de34196069d5c196e501016d3e1 Mon Sep 17 00:00:00 2001 From: Sicong Hu Date: Fri, 24 Oct 2025 11:24:13 +0800 Subject: [PATCH] fix: correct test_index_build_type_compact (#7137) Signed-off-by: SNC123 --- src/mito2/src/engine/index_build_test.rs | 17 +++--- src/mito2/src/engine/listener.rs | 63 +++++++++++++++----- src/mito2/src/sst/index.rs | 25 ++++++++ src/mito2/src/worker.rs | 11 +++- src/mito2/src/worker/handle_rebuild_index.rs | 6 +- 5 files changed, 91 insertions(+), 31 deletions(-) diff --git a/src/mito2/src/engine/index_build_test.rs b/src/mito2/src/engine/index_build_test.rs index 6fe27929e5..404aa3ad01 100644 --- a/src/mito2/src/engine/index_build_test.rs +++ b/src/mito2/src/engine/index_build_test.rs @@ -32,11 +32,6 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, reopen_region, rows_schema, }; -// wait listener receives enough success count. -async fn wait_finish(listener: &IndexBuildListener, times: usize) { - listener.wait_finish(times).await; -} - fn async_build_mode_config(is_create_on_flush: bool) -> MitoConfig { let mut config = MitoConfig::default(); config.index.build_mode = IndexBuildMode::Async; @@ -84,7 +79,7 @@ fn assert_listener_counts( expected_success_count: usize, ) { assert_eq!(listener.begin_count(), expected_begin_count); - assert_eq!(listener.success_count(), expected_success_count); + assert_eq!(listener.finish_count(), expected_success_count); } #[tokio::test] @@ -155,7 +150,7 @@ async fn test_index_build_type_flush() { flush_region(&engine, region_id, None).await; // After 2 index build task are finished, 2 index files should exist. - wait_finish(&listener, 2).await; + listener.wait_finish(2).await; let scanner = engine .scanner(region_id, ScanRequest::default()) .await @@ -204,6 +199,8 @@ async fn test_index_build_type_compact() { put_and_flush(&engine, region_id, &column_schemas, 15..25).await; put_and_flush(&engine, region_id, &column_schemas, 40..50).await; + // all index build tasks begin means flush tasks are all finished. + listener.wait_begin(4).await; // Before compaction is triggered, files should be 4, and not all index files are built. let scanner = engine .scanner(region_id, ScanRequest::default()) @@ -216,8 +213,8 @@ async fn test_index_build_type_compact() { // This explicit compaction call serves to make the process deterministic for the test. compact(&engine, region_id).await; + listener.wait_begin(5).await; // 4 flush + 1 compaction begin // Before compaction is triggered, files should be 2, and not all index files are built. - listener.clear_success_count(); let scanner = engine .scanner(region_id, ScanRequest::default()) .await @@ -226,7 +223,7 @@ async fn test_index_build_type_compact() { assert!(num_of_index_files(&engine, &scanner, region_id).await < 2); // Wait a while to make sure index build tasks are finished. - wait_finish(&listener, 2).await; + listener.wait_stop(5).await; // 4 flush + 1 compaction = some abort + some finish let scanner = engine .scanner(region_id, ScanRequest::default()) .await @@ -292,7 +289,7 @@ async fn test_index_build_type_schema_change() { .handle_request(region_id, RegionRequest::Alter(set_index_request)) .await .unwrap(); - wait_finish(&listener, 1).await; + listener.wait_finish(1).await; let scanner = engine .scanner(region_id, ScanRequest::default()) .await diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 317c3cdfd0..ebc20ac280 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -75,10 +75,13 @@ pub trait EventListener: Send + Sync { async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {} /// Notifies the listener that the index build task is executed successfully. - async fn on_index_build_success(&self, _region_file_id: RegionFileId) {} + async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {} /// Notifies the listener that the index build task is started. async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {} + + /// Notifies the listener that the index build task is aborted. + async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {} } pub type EventListenerRef = Arc; @@ -309,45 +312,75 @@ impl EventListener for NotifyRegionChangeResultListener { #[derive(Default)] pub struct IndexBuildListener { - notify: Notify, - success_count: AtomicUsize, - start_count: AtomicUsize, + begin_count: AtomicUsize, + begin_notify: Notify, + finish_count: AtomicUsize, + finish_notify: Notify, + abort_count: AtomicUsize, + abort_notify: Notify, + // stop means finished or aborted + stop_notify: Notify, } impl IndexBuildListener { /// Wait until index build is done for `times` times. pub async fn wait_finish(&self, times: usize) { - while self.success_count.load(Ordering::Relaxed) < times { - self.notify.notified().await; + while self.finish_count.load(Ordering::Relaxed) < times { + self.finish_notify.notified().await; + } + } + + /// Wait until index build is stopped for `times` times. + pub async fn wait_stop(&self, times: usize) { + while self.finish_count.load(Ordering::Relaxed) + self.abort_count.load(Ordering::Relaxed) + < times + { + self.stop_notify.notified().await; + } + } + + /// Wait until index build is begun for `times` times. + pub async fn wait_begin(&self, times: usize) { + while self.begin_count.load(Ordering::Relaxed) < times { + self.begin_notify.notified().await; } } /// Clears the success count. - pub fn clear_success_count(&self) { - self.success_count.store(0, Ordering::Relaxed); + pub fn clear_finish_count(&self) { + self.finish_count.store(0, Ordering::Relaxed); } /// Returns the success count. - pub fn success_count(&self) -> usize { - self.success_count.load(Ordering::Relaxed) + pub fn finish_count(&self) -> usize { + self.finish_count.load(Ordering::Relaxed) } /// Returns the start count. pub fn begin_count(&self) -> usize { - self.start_count.load(Ordering::Relaxed) + self.begin_count.load(Ordering::Relaxed) } } #[async_trait] impl EventListener for IndexBuildListener { - async fn on_index_build_success(&self, region_file_id: RegionFileId) { + async fn on_index_build_finish(&self, region_file_id: RegionFileId) { info!("Region {} index build successfully", region_file_id); - self.success_count.fetch_add(1, Ordering::Relaxed); - self.notify.notify_one(); + self.finish_count.fetch_add(1, Ordering::Relaxed); + self.finish_notify.notify_one(); + self.stop_notify.notify_one(); } async fn on_index_build_begin(&self, region_file_id: RegionFileId) { info!("Region {} index build begin", region_file_id); - self.start_count.fetch_add(1, Ordering::Relaxed); + self.begin_count.fetch_add(1, Ordering::Relaxed); + self.begin_notify.notify_one(); + } + + async fn on_index_build_abort(&self, region_file_id: RegionFileId) { + info!("Region {} index build aborted", region_file_id); + self.abort_count.fetch_add(1, Ordering::Relaxed); + self.abort_notify.notify_one(); + self.stop_notify.notify_one(); } } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 8ad7f6ef01..cc8469332a 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -62,6 +62,7 @@ use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::parquet::SstInfo; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::PrimaryKeyArray; +use crate::worker::WorkerListener; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; @@ -451,6 +452,7 @@ pub struct IndexBuildTask { pub file_meta: FileMeta, pub reason: IndexBuildType, pub access_layer: AccessLayerRef, + pub(crate) listener: WorkerListener, pub(crate) manifest_ctx: ManifestContextRef, pub write_cache: Option, pub file_purger: FilePurgerRef, @@ -486,6 +488,12 @@ impl IndexBuildTask { } async fn do_index_build(&mut self, version_control: VersionControlRef) { + self.listener + .on_index_build_begin(RegionFileId::new( + self.file_meta.region_id, + self.file_meta.file_id, + )) + .await; match self.index_build(version_control).await { Ok(outcome) => self.on_success(outcome).await, Err(e) => { @@ -540,6 +548,12 @@ impl IndexBuildTask { if !self.check_sst_file_exists(&version_control).await { // Calls abort to clean up index files. indexer.abort().await; + self.listener + .on_index_build_abort(RegionFileId::new( + self.file_meta.region_id, + self.file_meta.file_id, + )) + .await; return Ok(IndexBuildOutcome::Aborted(format!( "SST file not found during index build, region: {}, file_id: {}", self.file_meta.region_id, self.file_meta.file_id @@ -575,6 +589,12 @@ impl IndexBuildTask { if !self.check_sst_file_exists(&version_control).await { // Calls abort to clean up index files. indexer.abort().await; + self.listener + .on_index_build_abort(RegionFileId::new( + self.file_meta.region_id, + self.file_meta.file_id, + )) + .await; return Ok(IndexBuildOutcome::Aborted(format!( "SST file not found during index build, region: {}, file_id: {}", self.file_meta.region_id, self.file_meta.file_id @@ -1192,6 +1212,7 @@ mod tests { }, reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), manifest_ctx, write_cache: None, file_purger, @@ -1242,6 +1263,7 @@ mod tests { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), manifest_ctx, write_cache: None, file_purger, @@ -1309,6 +1331,7 @@ mod tests { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), manifest_ctx, write_cache: None, file_purger, @@ -1405,6 +1428,7 @@ mod tests { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), manifest_ctx, write_cache: None, file_purger, @@ -1485,6 +1509,7 @@ mod tests { file_meta: file_meta.clone(), reason: IndexBuildType::Flush, access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), manifest_ctx, write_cache: Some(write_cache.clone()), file_purger, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 87c25cd964..322141fd1b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1220,10 +1220,10 @@ impl WorkerListener { } } - pub(crate) async fn on_index_build_success(&self, _region_file_id: RegionFileId) { + pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) { #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { - listener.on_index_build_success(_region_file_id).await; + listener.on_index_build_finish(_region_file_id).await; } } @@ -1233,6 +1233,13 @@ impl WorkerListener { listener.on_index_build_begin(_region_file_id).await; } } + + pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener.on_index_build_abort(_region_file_id).await; + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_rebuild_index.rs b/src/mito2/src/worker/handle_rebuild_index.rs index 71f9bc206f..38ca07f1a9 100644 --- a/src/mito2/src/worker/handle_rebuild_index.rs +++ b/src/mito2/src/worker/handle_rebuild_index.rs @@ -71,6 +71,7 @@ impl RegionWorkerLoop { file_meta: file.meta_ref().clone(), reason: build_type, access_layer: access_layer.clone(), + listener: self.listener.clone(), manifest_ctx: region.manifest_ctx.clone(), write_cache: self.cache_manager.write_cache().cloned(), file_purger: file.file_purger(), @@ -172,9 +173,6 @@ impl RegionWorkerLoop { let _ = self .index_build_scheduler .schedule_build(®ion.version_control, task); - self.listener - .on_index_build_begin(RegionFileId::new(region_id, file_handle.meta_ref().file_id)) - .await; } // Wait for all index build tasks to finish and notify the caller. common_runtime::spawn_global(async move { @@ -212,7 +210,7 @@ impl RegionWorkerLoop { ); for file_meta in &request.edit.files_to_add { self.listener - .on_index_build_success(RegionFileId::new(region_id, file_meta.file_id)) + .on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id)) .await; } }