diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ec36e0a93d..d5c60946a6 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -242,9 +242,19 @@ pub async fn open_compaction_region( } impl CompactionRegion { + /// Get the file purger of the compaction region. pub fn file_purger(&self) -> Option> { self.file_purger.clone() } + + /// Stop the file purger scheduler of the compaction region. + pub async fn stop_purger_scheduler(&self) -> Result<()> { + if let Some(file_purger) = &self.file_purger { + file_purger.stop_scheduler().await + } else { + Ok(()) + } + } } /// `[MergeOutput]` represents the output of merging SST files. diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 130264f68a..34a367c039 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -20,6 +20,7 @@ use common_telemetry::{error, info}; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::CacheManagerRef; +use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::FileMeta; @@ -76,6 +77,11 @@ impl LocalFilePurger { cache_manager, } } + + /// Stop the scheduler of the file purger. + pub async fn stop_scheduler(&self) -> Result<()> { + self.scheduler.stop(true).await + } } impl FilePurger for LocalFilePurger {