fix: wait for compaction task to finish (#1783)

This commit is contained in:
Lei, HUANG
2023-06-16 16:45:06 +08:00
committed by GitHub
parent 1eeb5b4330
commit 69854c07c5
3 changed files with 29 additions and 13 deletions

View File

@@ -354,7 +354,7 @@ impl Instance {
fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = SchedulerConfig::from(opts);
let handler = CompactionHandler::new(picker);
let handler = CompactionHandler { picker };
let scheduler = LocalScheduler::new(config, handler);
Arc::new(scheduler)
}

View File

@@ -81,12 +81,8 @@ impl<S: LogStore> CompactionRequestImpl<S> {
pub struct CompactionHandler<P> {
pub picker: P,
}
impl<P> CompactionHandler<P> {
pub fn new(picker: P) -> Self {
Self { picker }
}
#[cfg(test)]
pub pending_tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
#[async_trait::async_trait]
@@ -111,7 +107,7 @@ where
debug!("Compaction task, region: {:?}, task: {:?}", region_id, task);
// TODO(hl): we need to keep a track of task handle here to allow task cancellation.
common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_bg(async move {
if let Err(e) = task.run().await {
// TODO(hl): maybe resubmit compaction task on failure?
error!(e; "Failed to compact region: {:?}", region_id);
@@ -128,6 +124,9 @@ where
finish_notifier.notify_one();
});
#[cfg(test)]
self.pending_tasks.write().await.push(_handle);
Ok(())
}
}

View File

@@ -24,7 +24,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use tokio::sync::Notify;
use tokio::sync::{Notify, RwLock};
use crate::compaction::{CompactionHandler, SimplePicker};
use crate::config::EngineConfig;
@@ -74,7 +74,11 @@ async fn create_region_for_compaction<
purge_handler: H,
flush_strategy: FlushStrategyRef,
s3_bucket: Option<String>,
) -> (RegionImpl<RaftEngineLogStore>, ObjectStore) {
) -> (
RegionImpl<RaftEngineLogStore>,
ObjectStore,
Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
) {
let metadata = tests::new_metadata(REGION_NAME);
let object_store = new_object_store(store_dir, s3_bucket);
@@ -90,7 +94,12 @@ async fn create_region_for_compaction<
store_config.flush_strategy = flush_strategy;
let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let pending_compaction_tasks = Arc::new(RwLock::new(vec![]));
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: pending_compaction_tasks.clone(),
};
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
@@ -104,6 +113,7 @@ async fn create_region_for_compaction<
(
RegionImpl::create(metadata, store_config).await.unwrap(),
object_store,
pending_compaction_tasks,
)
}
@@ -154,6 +164,7 @@ struct CompactionTester {
store_dir: String,
engine_config: EngineConfig,
flush_strategy: FlushStrategyRef,
pending_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}
impl CompactionTester {
@@ -164,7 +175,7 @@ impl CompactionTester {
s3_bucket: Option<String>,
) -> CompactionTester {
let purge_handler = MockFilePurgeHandler::default();
let (region, object_store) = create_region_for_compaction(
let (region, object_store, pending_tasks) = create_region_for_compaction(
store_dir,
engine_config.clone(),
purge_handler.clone(),
@@ -180,6 +191,7 @@ impl CompactionTester {
store_dir: store_dir.to_string(),
engine_config,
flush_strategy,
pending_tasks,
}
}
@@ -231,6 +243,7 @@ impl CompactionTester {
async fn reopen(&mut self) -> Result<bool> {
// Close the old region.
if let Some(base) = self.base.take() {
futures::future::join_all(self.pending_tasks.write().await.drain(..)).await;
base.close().await;
}
@@ -250,7 +263,11 @@ impl CompactionTester {
store_config.flush_strategy = self.flush_strategy.clone();
let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: Arc::new(Default::default()),
};
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));