diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index d0fa4ea708..5549e55c44 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -56,7 +56,7 @@ pub(crate) struct RegionOpener { memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, region_dir: String, - scheduler: SchedulerRef, + purge_scheduler: SchedulerRef, options: Option, cache_manager: Option, skip_wal_replay: bool, @@ -71,7 +71,7 @@ impl RegionOpener { region_dir: &str, memtable_builder_provider: MemtableBuilderProvider, object_store_manager: ObjectStoreManagerRef, - scheduler: SchedulerRef, + purge_scheduler: SchedulerRef, intermediate_manager: IntermediateManager, ) -> RegionOpener { RegionOpener { @@ -80,7 +80,7 @@ impl RegionOpener { memtable_builder_provider, object_store_manager, region_dir: normalize_dir(region_dir), - scheduler, + purge_scheduler, options: None, cache_manager: None, skip_wal_replay: false, @@ -204,7 +204,7 @@ impl RegionOpener { access_layer: access_layer.clone(), manifest_manager, file_purger: Arc::new(LocalFilePurger::new( - self.scheduler, + self.purge_scheduler, access_layer, self.cache_manager, )), @@ -277,7 +277,7 @@ impl RegionOpener { self.intermediate_manager.clone(), )); let file_purger = Arc::new(LocalFilePurger::new( - self.scheduler.clone(), + self.purge_scheduler.clone(), access_layer.clone(), self.cache_manager.clone(), )); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a9aa652d86..c3fe64e83c 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -110,6 +110,8 @@ pub(crate) struct WorkerGroup { workers: Vec, /// Global background job scheduelr. scheduler: SchedulerRef, + /// Scheduler for file purgers. + purge_scheduler: SchedulerRef, /// Cache. cache_manager: CacheManagerRef, } @@ -131,6 +133,9 @@ impl WorkerGroup { .await? .with_buffer_size(Some(config.inverted_index.write_buffer_size.as_bytes() as _)); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + // We use another scheduler to avoid purge jobs blocking other jobs. + // A purge job is cheaper than other background jobs so they share the same job limit. + let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let write_cache = write_cache_from_config( &config, object_store_manager.clone(), @@ -156,6 +161,7 @@ impl WorkerGroup { object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), + purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), @@ -168,6 +174,7 @@ impl WorkerGroup { Ok(WorkerGroup { workers, scheduler, + purge_scheduler, cache_manager, }) } @@ -178,6 +185,8 @@ impl WorkerGroup { // Stops the scheduler gracefully. self.scheduler.stop(true).await?; + // Stops the purge scheduler gracefully. + self.purge_scheduler.stop(true).await?; try_join_all(self.workers.iter().map(|worker| worker.stop())).await?; @@ -238,6 +247,7 @@ impl WorkerGroup { )) }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); + let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); let intermediate_manager = IntermediateManager::init_fs(&config.inverted_index.intermediate_path) .await? @@ -265,6 +275,7 @@ impl WorkerGroup { object_store_manager: object_store_manager.clone(), write_buffer_manager: write_buffer_manager.clone(), scheduler: scheduler.clone(), + purge_scheduler: purge_scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), @@ -277,6 +288,7 @@ impl WorkerGroup { Ok(WorkerGroup { workers, scheduler, + purge_scheduler, cache_manager, }) } @@ -323,6 +335,7 @@ struct WorkerStarter { object_store_manager: ObjectStoreManagerRef, write_buffer_manager: WriteBufferManagerRef, scheduler: SchedulerRef, + purge_scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, intermediate_manager: IntermediateManager, @@ -351,7 +364,7 @@ impl WorkerStarter { Some(self.write_buffer_manager.clone()), self.config, ), - scheduler: self.scheduler.clone(), + purge_scheduler: self.purge_scheduler.clone(), write_buffer_manager: self.write_buffer_manager, flush_scheduler: FlushScheduler::new(self.scheduler.clone()), compaction_scheduler: CompactionScheduler::new( @@ -507,8 +520,8 @@ struct RegionWorkerLoop { running: Arc, /// Memtable builder provider for each region. memtable_builder_provider: MemtableBuilderProvider, - /// Background job scheduler. - scheduler: SchedulerRef, + /// Background purge job scheduler. + purge_scheduler: SchedulerRef, /// Engine write buffer manager. write_buffer_manager: WriteBufferManagerRef, /// Schedules background flush requests. diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 4a9730dd7a..ed2c52c211 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -53,7 +53,7 @@ impl RegionWorkerLoop { region.region_dir(), self.memtable_builder_provider.clone(), self.object_store_manager.clone(), - self.scheduler.clone(), + self.purge_scheduler.clone(), self.intermediate_manager.clone(), ) .cache(Some(self.cache_manager.clone())) diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 6b54289637..f07a1f38a1 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -60,7 +60,7 @@ impl RegionWorkerLoop { &request.region_dir, self.memtable_builder_provider.clone(), self.object_store_manager.clone(), - self.scheduler.clone(), + self.purge_scheduler.clone(), self.intermediate_manager.clone(), ) .metadata(metadata) diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 884012473e..bcd050220e 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -67,7 +67,7 @@ impl RegionWorkerLoop { &request.region_dir, self.memtable_builder_provider.clone(), self.object_store_manager.clone(), - self.scheduler.clone(), + self.purge_scheduler.clone(), self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay)