fix: correct test_index_build_type_compact (#7137)

Signed-off-by: SNC123 <sinhco@outlook.com>
This commit is contained in:
Sicong Hu
2025-10-24 11:24:13 +08:00
committed by GitHub
parent a0e6bcbeb3
commit 03a29c6591
5 changed files with 91 additions and 31 deletions

View File

@@ -32,11 +32,6 @@ use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, reopen_region, rows_schema,
};
// wait listener receives enough success count.
async fn wait_finish(listener: &IndexBuildListener, times: usize) {
listener.wait_finish(times).await;
}
fn async_build_mode_config(is_create_on_flush: bool) -> MitoConfig {
let mut config = MitoConfig::default();
config.index.build_mode = IndexBuildMode::Async;
@@ -84,7 +79,7 @@ fn assert_listener_counts(
expected_success_count: usize,
) {
assert_eq!(listener.begin_count(), expected_begin_count);
assert_eq!(listener.success_count(), expected_success_count);
assert_eq!(listener.finish_count(), expected_success_count);
}
#[tokio::test]
@@ -155,7 +150,7 @@ async fn test_index_build_type_flush() {
flush_region(&engine, region_id, None).await;
// After 2 index build task are finished, 2 index files should exist.
wait_finish(&listener, 2).await;
listener.wait_finish(2).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
@@ -204,6 +199,8 @@ async fn test_index_build_type_compact() {
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
put_and_flush(&engine, region_id, &column_schemas, 40..50).await;
// all index build tasks begin means flush tasks are all finished.
listener.wait_begin(4).await;
// Before compaction is triggered, files should be 4, and not all index files are built.
let scanner = engine
.scanner(region_id, ScanRequest::default())
@@ -216,8 +213,8 @@ async fn test_index_build_type_compact() {
// This explicit compaction call serves to make the process deterministic for the test.
compact(&engine, region_id).await;
listener.wait_begin(5).await; // 4 flush + 1 compaction begin
// Before compaction is triggered, files should be 2, and not all index files are built.
listener.clear_success_count();
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
@@ -226,7 +223,7 @@ async fn test_index_build_type_compact() {
assert!(num_of_index_files(&engine, &scanner, region_id).await < 2);
// Wait a while to make sure index build tasks are finished.
wait_finish(&listener, 2).await;
listener.wait_stop(5).await; // 4 flush + 1 compaction = some abort + some finish
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
@@ -292,7 +289,7 @@ async fn test_index_build_type_schema_change() {
.handle_request(region_id, RegionRequest::Alter(set_index_request))
.await
.unwrap();
wait_finish(&listener, 1).await;
listener.wait_finish(1).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await

View File

@@ -75,10 +75,13 @@ pub trait EventListener: Send + Sync {
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
/// Notifies the listener that the index build task is executed successfully.
async fn on_index_build_success(&self, _region_file_id: RegionFileId) {}
async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {}
/// Notifies the listener that the index build task is started.
async fn on_index_build_begin(&self, _region_file_id: RegionFileId) {}
/// Notifies the listener that the index build task is aborted.
async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {}
}
pub type EventListenerRef = Arc<dyn EventListener>;
@@ -309,45 +312,75 @@ impl EventListener for NotifyRegionChangeResultListener {
#[derive(Default)]
pub struct IndexBuildListener {
notify: Notify,
success_count: AtomicUsize,
start_count: AtomicUsize,
begin_count: AtomicUsize,
begin_notify: Notify,
finish_count: AtomicUsize,
finish_notify: Notify,
abort_count: AtomicUsize,
abort_notify: Notify,
// stop means finished or aborted
stop_notify: Notify,
}
impl IndexBuildListener {
/// Wait until index build is done for `times` times.
pub async fn wait_finish(&self, times: usize) {
while self.success_count.load(Ordering::Relaxed) < times {
self.notify.notified().await;
while self.finish_count.load(Ordering::Relaxed) < times {
self.finish_notify.notified().await;
}
}
/// Wait until index build is stopped for `times` times.
pub async fn wait_stop(&self, times: usize) {
while self.finish_count.load(Ordering::Relaxed) + self.abort_count.load(Ordering::Relaxed)
< times
{
self.stop_notify.notified().await;
}
}
/// Wait until index build is begun for `times` times.
pub async fn wait_begin(&self, times: usize) {
while self.begin_count.load(Ordering::Relaxed) < times {
self.begin_notify.notified().await;
}
}
/// Clears the success count.
pub fn clear_success_count(&self) {
self.success_count.store(0, Ordering::Relaxed);
pub fn clear_finish_count(&self) {
self.finish_count.store(0, Ordering::Relaxed);
}
/// Returns the success count.
pub fn success_count(&self) -> usize {
self.success_count.load(Ordering::Relaxed)
pub fn finish_count(&self) -> usize {
self.finish_count.load(Ordering::Relaxed)
}
/// Returns the start count.
pub fn begin_count(&self) -> usize {
self.start_count.load(Ordering::Relaxed)
self.begin_count.load(Ordering::Relaxed)
}
}
#[async_trait]
impl EventListener for IndexBuildListener {
async fn on_index_build_success(&self, region_file_id: RegionFileId) {
async fn on_index_build_finish(&self, region_file_id: RegionFileId) {
info!("Region {} index build successfully", region_file_id);
self.success_count.fetch_add(1, Ordering::Relaxed);
self.notify.notify_one();
self.finish_count.fetch_add(1, Ordering::Relaxed);
self.finish_notify.notify_one();
self.stop_notify.notify_one();
}
async fn on_index_build_begin(&self, region_file_id: RegionFileId) {
info!("Region {} index build begin", region_file_id);
self.start_count.fetch_add(1, Ordering::Relaxed);
self.begin_count.fetch_add(1, Ordering::Relaxed);
self.begin_notify.notify_one();
}
async fn on_index_build_abort(&self, region_file_id: RegionFileId) {
info!("Region {} index build aborted", region_file_id);
self.abort_count.fetch_add(1, Ordering::Relaxed);
self.abort_notify.notify_one();
self.stop_notify.notify_one();
}
}

View File

@@ -62,6 +62,7 @@ use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::parquet::SstInfo;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
use crate::worker::WorkerListener;
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
@@ -451,6 +452,7 @@ pub struct IndexBuildTask {
pub file_meta: FileMeta,
pub reason: IndexBuildType,
pub access_layer: AccessLayerRef,
pub(crate) listener: WorkerListener,
pub(crate) manifest_ctx: ManifestContextRef,
pub write_cache: Option<WriteCacheRef>,
pub file_purger: FilePurgerRef,
@@ -486,6 +488,12 @@ impl IndexBuildTask {
}
async fn do_index_build(&mut self, version_control: VersionControlRef) {
self.listener
.on_index_build_begin(RegionFileId::new(
self.file_meta.region_id,
self.file_meta.file_id,
))
.await;
match self.index_build(version_control).await {
Ok(outcome) => self.on_success(outcome).await,
Err(e) => {
@@ -540,6 +548,12 @@ impl IndexBuildTask {
if !self.check_sst_file_exists(&version_control).await {
// Calls abort to clean up index files.
indexer.abort().await;
self.listener
.on_index_build_abort(RegionFileId::new(
self.file_meta.region_id,
self.file_meta.file_id,
))
.await;
return Ok(IndexBuildOutcome::Aborted(format!(
"SST file not found during index build, region: {}, file_id: {}",
self.file_meta.region_id, self.file_meta.file_id
@@ -575,6 +589,12 @@ impl IndexBuildTask {
if !self.check_sst_file_exists(&version_control).await {
// Calls abort to clean up index files.
indexer.abort().await;
self.listener
.on_index_build_abort(RegionFileId::new(
self.file_meta.region_id,
self.file_meta.file_id,
))
.await;
return Ok(IndexBuildOutcome::Aborted(format!(
"SST file not found during index build, region: {}, file_id: {}",
self.file_meta.region_id, self.file_meta.file_id
@@ -1192,6 +1212,7 @@ mod tests {
},
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
manifest_ctx,
write_cache: None,
file_purger,
@@ -1242,6 +1263,7 @@ mod tests {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
manifest_ctx,
write_cache: None,
file_purger,
@@ -1309,6 +1331,7 @@ mod tests {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
manifest_ctx,
write_cache: None,
file_purger,
@@ -1405,6 +1428,7 @@ mod tests {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
manifest_ctx,
write_cache: None,
file_purger,
@@ -1485,6 +1509,7 @@ mod tests {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
manifest_ctx,
write_cache: Some(write_cache.clone()),
file_purger,

View File

@@ -1220,10 +1220,10 @@ impl WorkerListener {
}
}
pub(crate) async fn on_index_build_success(&self, _region_file_id: RegionFileId) {
pub(crate) async fn on_index_build_finish(&self, _region_file_id: RegionFileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_index_build_success(_region_file_id).await;
listener.on_index_build_finish(_region_file_id).await;
}
}
@@ -1233,6 +1233,13 @@ impl WorkerListener {
listener.on_index_build_begin(_region_file_id).await;
}
}
pub(crate) async fn on_index_build_abort(&self, _region_file_id: RegionFileId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_index_build_abort(_region_file_id).await;
}
}
}
#[cfg(test)]

View File

@@ -71,6 +71,7 @@ impl<S> RegionWorkerLoop<S> {
file_meta: file.meta_ref().clone(),
reason: build_type,
access_layer: access_layer.clone(),
listener: self.listener.clone(),
manifest_ctx: region.manifest_ctx.clone(),
write_cache: self.cache_manager.write_cache().cloned(),
file_purger: file.file_purger(),
@@ -172,9 +173,6 @@ impl<S> RegionWorkerLoop<S> {
let _ = self
.index_build_scheduler
.schedule_build(&region.version_control, task);
self.listener
.on_index_build_begin(RegionFileId::new(region_id, file_handle.meta_ref().file_id))
.await;
}
// Wait for all index build tasks to finish and notify the caller.
common_runtime::spawn_global(async move {
@@ -212,7 +210,7 @@ impl<S> RegionWorkerLoop<S> {
);
for file_meta in &request.edit.files_to_add {
self.listener
.on_index_build_success(RegionFileId::new(region_id, file_meta.file_id))
.on_index_build_finish(RegionFileId::new(region_id, file_meta.file_id))
.await;
}
}