feat!: Divide flush and compaction job pool (#4871)

* feat: divide flush/compact job pool

* feat!: divide bg jobs config

* docs: update config examples

* test: fix tests
This commit is contained in:
Yingwen
2024-10-26 07:36:16 +08:00
committed by GitHub
parent 2485f66077
commit 1008af5324
8 changed files with 103 additions and 48 deletions

View File

@@ -134,7 +134,7 @@ pub async fn open_compaction_region(
));
let file_purger = {
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
Arc::new(LocalFilePurger::new(
purge_scheduler.clone(),
access_layer.clone(),

View File

@@ -28,9 +28,6 @@ use crate::error::Result;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
@@ -69,8 +66,12 @@ pub struct MitoConfig {
pub compress_manifest: bool,
// Background job configs:
/// Max number of running background jobs (default 4).
pub max_background_jobs: usize,
/// Max number of running background flush jobs (default: 1/2 of cpu cores).
pub max_background_flushes: usize,
/// Max number of running background compaction jobs (default: 1/4 of cpu cores).
pub max_background_compactions: usize,
/// Max number of running background purge jobs (default: number of cpu cores).
pub max_background_purges: usize,
// Flush configs:
/// Interval to auto flush a region if it has not flushed yet (default 30 min).
@@ -137,7 +138,9 @@ impl Default for MitoConfig {
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
compress_manifest: false,
max_background_jobs: DEFAULT_MAX_BG_JOB,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
@@ -185,9 +188,26 @@ impl MitoConfig {
self.worker_channel_size = 1;
}
if self.max_background_jobs == 0 {
warn!("Sanitize max background jobs 0 to {}", DEFAULT_MAX_BG_JOB);
self.max_background_jobs = DEFAULT_MAX_BG_JOB;
if self.max_background_flushes == 0 {
warn!(
"Sanitize max background flushes 0 to {}",
divide_num_cpus(2)
);
self.max_background_flushes = divide_num_cpus(2);
}
if self.max_background_compactions == 0 {
warn!(
"Sanitize max background compactions 0 to {}",
divide_num_cpus(4)
);
self.max_background_compactions = divide_num_cpus(4);
}
if self.max_background_purges == 0 {
warn!(
"Sanitize max background purges 0 to {}",
common_config::utils::get_cpus()
);
self.max_background_purges = common_config::utils::get_cpus();
}
if self.global_write_buffer_reject_size <= self.global_write_buffer_size {

View File

@@ -272,7 +272,7 @@ async fn test_readonly_during_compaction() {
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
max_background_purges: 1,
..Default::default()
},
None,
@@ -310,7 +310,7 @@ async fn test_readonly_during_compaction() {
listener.wake();
let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// We already sets max background purges to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine

View File

@@ -114,8 +114,10 @@ pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Flush background job pool.
flush_job_pool: SchedulerRef,
/// Compaction background job pool.
compact_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
@@ -146,10 +148,10 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
// We use another scheduler to avoid purge jobs blocking other jobs.
// A purge job is cheaper than other background jobs so they share the same job limit.
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
@@ -178,7 +180,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
@@ -195,7 +198,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -205,8 +209,11 @@ impl WorkerGroup {
pub(crate) async fn stop(&self) -> Result<()> {
info!("Stop region worker group");
// TODO(yingwen): Do we need to stop gracefully?
// Stops the scheduler gracefully.
self.scheduler.stop(true).await?;
self.compact_job_pool.stop(true).await?;
// Stops the scheduler gracefully.
self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;
@@ -275,8 +282,9 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_flushes));
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
@@ -310,7 +318,8 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
@@ -327,7 +336,8 @@ impl WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
flush_job_pool,
compact_job_pool,
purge_scheduler,
cache_manager,
})
@@ -382,7 +392,8 @@ struct WorkerStarter<S> {
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
compact_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
@@ -423,9 +434,9 @@ impl<S: LogStore> WorkerStarter<S> {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.scheduler,
self.compact_job_pool,
sender.clone(),
self.cache_manager.clone(),
self.config,