From fb35e09072048853c8ed01d8098d0442b529ade8 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 13 Jun 2023 21:03:09 +0800 Subject: [PATCH] chore: fix compaction caused race condition (#1767) fix: unit tests. For real, this time. --- src/storage/src/flush/scheduler.rs | 19 +++++++++++++++++-- src/storage/src/region/tests.rs | 7 +++++++ src/storage/src/region/tests/compact.rs | 3 --- src/storage/src/region/tests/flush.rs | 5 +---- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index fc3c219390..76d00c0bbe 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -173,6 +173,8 @@ pub struct FlushScheduler { scheduler: LocalScheduler>, /// Auto flush task. auto_flush_task: RepeatedTask, + #[cfg(test)] + pending_tasks: Arc>>>, } pub type FlushSchedulerRef = Arc>; @@ -197,15 +199,21 @@ impl FlushScheduler { auto_flush_task .start(common_runtime::bg_runtime()) .context(StartPickTaskSnafu)?; + #[cfg(test)] + let pending_tasks = Arc::new(tokio::sync::RwLock::new(vec![])); let handler = FlushHandler { compaction_scheduler, regions, picker, + #[cfg(test)] + pending_tasks: pending_tasks.clone(), }; Ok(Self { scheduler: LocalScheduler::new(config, handler), auto_flush_task, + #[cfg(test)] + pending_tasks, }) } @@ -247,6 +255,9 @@ impl FlushScheduler { .context(StopPickTaskSnafu)?; self.scheduler.stop(true).await?; + #[cfg(test)] + futures::future::join_all(self.pending_tasks.write().await.drain(..)).await; + Ok(()) } } @@ -255,6 +266,8 @@ struct FlushHandler { compaction_scheduler: CompactionSchedulerRef, regions: Arc>, picker: FlushPicker, + #[cfg(test)] + pending_tasks: Arc>>>, } #[async_trait::async_trait] @@ -270,7 +283,7 @@ impl Handler for FlushHandler { let compaction_scheduler = self.compaction_scheduler.clone(); let region_map = self.regions.clone(); let picker = self.picker.clone(); - common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_bg(async move { match req { FlushRequest::Engine => { let regions = region_map.list_regions(); @@ -287,6 +300,8 @@ impl Handler for FlushHandler { finish_notifier.notify_one(); }); + #[cfg(test)] + self.pending_tasks.write().await.push(_handle); Ok(()) } } @@ -299,7 +314,7 @@ async fn execute_flush_region( let mut flush_job = FlushJob::from(&req); if let Err(e) = flush_job.run().await { - logging::error!(e; "Failed to flush regoin {}", req.region_id()); + logging::error!(e; "Failed to flush region {}", req.region_id()); increment_counter!(FLUSH_ERRORS_TOTAL); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index d5953e00fe..5665cd4037 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -94,6 +94,13 @@ impl TesterBase { } pub async fn close(&self) { + self.region.inner.flush_scheduler.stop().await.unwrap(); + self.region + .inner + .compaction_scheduler + .stop(true) + .await + .unwrap(); self.region.close(&CloseContext::default()).await.unwrap(); self.region.inner.wal.close().await.unwrap(); } diff --git a/src/storage/src/region/tests/compact.rs b/src/storage/src/region/tests/compact.rs index 2156427f05..7586ac034e 100644 --- a/src/storage/src/region/tests/compact.rs +++ b/src/storage/src/region/tests/compact.rs @@ -17,7 +17,6 @@ use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use common_telemetry::logging; use common_test_util::temp_dir::create_temp_dir; @@ -262,8 +261,6 @@ impl CompactionTester { MockFilePurgeHandler::default(), )); - // FIXME(hl): find out which component prevents logstore from being dropped. - tokio::time::sleep(Duration::from_millis(100)).await; let Some(region) = RegionImpl::open(REGION_NAME.to_string(), store_config, &OpenOptions::default()).await? else { return Ok(false); }; diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index d586c34698..fc4dc6fc0a 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -87,10 +87,9 @@ impl FlushTester { async fn reopen(&mut self) { self.regions.clear(); // Close the old region. - if let Some(base) = self.base.as_ref() { + if let Some(base) = self.base.take() { base.close().await; } - self.base = None; // Reopen the region. let mut store_config = config_util::new_store_config( REGION_NAME, @@ -102,8 +101,6 @@ impl FlushTester { ) .await; store_config.flush_strategy = self.flush_strategy.clone(); - // FIXME(hl): find out which component prevents logstore from being dropped. - tokio::time::sleep(Duration::from_millis(100)).await; let opts = OpenOptions::default(); let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts) .await