chore: fix compaction caused race condition (#1767)

fix: unit tests. For real, this time.
This commit is contained in:
Lei, HUANG
2023-06-13 21:03:09 +08:00
committed by GitHub
parent 803940cfa4
commit fb35e09072
4 changed files with 25 additions and 9 deletions

View File

@@ -173,6 +173,8 @@ pub struct FlushScheduler<S: LogStore> {
scheduler: LocalScheduler<FlushRequest<S>>,
/// Auto flush task.
auto_flush_task: RepeatedTask<Error>,
#[cfg(test)]
pending_tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
pub type FlushSchedulerRef<S> = Arc<FlushScheduler<S>>;
@@ -197,15 +199,21 @@ impl<S: LogStore> FlushScheduler<S> {
auto_flush_task
.start(common_runtime::bg_runtime())
.context(StartPickTaskSnafu)?;
#[cfg(test)]
let pending_tasks = Arc::new(tokio::sync::RwLock::new(vec![]));
let handler = FlushHandler {
compaction_scheduler,
regions,
picker,
#[cfg(test)]
pending_tasks: pending_tasks.clone(),
};
Ok(Self {
scheduler: LocalScheduler::new(config, handler),
auto_flush_task,
#[cfg(test)]
pending_tasks,
})
}
@@ -247,6 +255,9 @@ impl<S: LogStore> FlushScheduler<S> {
.context(StopPickTaskSnafu)?;
self.scheduler.stop(true).await?;
#[cfg(test)]
futures::future::join_all(self.pending_tasks.write().await.drain(..)).await;
Ok(())
}
}
@@ -255,6 +266,8 @@ struct FlushHandler<S: LogStore> {
compaction_scheduler: CompactionSchedulerRef<S>,
regions: Arc<RegionMap<S>>,
picker: FlushPicker,
#[cfg(test)]
pending_tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
#[async_trait::async_trait]
@@ -270,7 +283,7 @@ impl<S: LogStore> Handler for FlushHandler<S> {
let compaction_scheduler = self.compaction_scheduler.clone();
let region_map = self.regions.clone();
let picker = self.picker.clone();
common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_bg(async move {
match req {
FlushRequest::Engine => {
let regions = region_map.list_regions();
@@ -287,6 +300,8 @@ impl<S: LogStore> Handler for FlushHandler<S> {
finish_notifier.notify_one();
});
#[cfg(test)]
self.pending_tasks.write().await.push(_handle);
Ok(())
}
}
@@ -299,7 +314,7 @@ async fn execute_flush_region<S: LogStore>(
let mut flush_job = FlushJob::from(&req);
if let Err(e) = flush_job.run().await {
logging::error!(e; "Failed to flush regoin {}", req.region_id());
logging::error!(e; "Failed to flush region {}", req.region_id());
increment_counter!(FLUSH_ERRORS_TOTAL);

View File

@@ -94,6 +94,13 @@ impl<S: LogStore> TesterBase<S> {
}
pub async fn close(&self) {
self.region.inner.flush_scheduler.stop().await.unwrap();
self.region
.inner
.compaction_scheduler
.stop(true)
.await
.unwrap();
self.region.close(&CloseContext::default()).await.unwrap();
self.region.inner.wal.close().await.unwrap();
}

View File

@@ -17,7 +17,6 @@
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_telemetry::logging;
use common_test_util::temp_dir::create_temp_dir;
@@ -262,8 +261,6 @@ impl CompactionTester {
MockFilePurgeHandler::default(),
));
// FIXME(hl): find out which component prevents logstore from being dropped.
tokio::time::sleep(Duration::from_millis(100)).await;
let Some(region) = RegionImpl::open(REGION_NAME.to_string(), store_config, &OpenOptions::default()).await? else {
return Ok(false);
};

View File

@@ -87,10 +87,9 @@ impl FlushTester {
async fn reopen(&mut self) {
self.regions.clear();
// Close the old region.
if let Some(base) = self.base.as_ref() {
if let Some(base) = self.base.take() {
base.close().await;
}
self.base = None;
// Reopen the region.
let mut store_config = config_util::new_store_config(
REGION_NAME,
@@ -102,8 +101,6 @@ impl FlushTester {
)
.await;
store_config.flush_strategy = self.flush_strategy.clone();
// FIXME(hl): find out which component prevents logstore from being dropped.
tokio::time::sleep(Duration::from_millis(100)).await;
let opts = OpenOptions::default();
let region = RegionImpl::open(REGION_NAME.to_string(), store_config, &opts)
.await