diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9cb7e6f13d..35fc838274 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -9,12 +9,7 @@ use clap::{App, Arg}; use daemonize::Daemonize; use fail::FailScenario; -use pageserver::{ - config::{defaults::*, PageServerConf}, - http, page_cache, page_service, profiling, tenant_mgr, thread_mgr, - thread_mgr::ThreadKind, - timelines, virtual_file, LOG_FILE_NAME, -}; +use pageserver::{LOG_FILE_NAME, config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, tenant_jobs::gc::GC_POOL, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file}; use utils::{ auth::JwtAuth, http::endpoint, @@ -305,6 +300,36 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type), )?; + // Spawn GC workers + for i in 0..3 { + let name = format!("gc_worker_{}", i); + thread_mgr::spawn( + ThreadKind::GarbageCollectionWorker, + None, + None, + &name.clone(), + true, + move || { + GC_POOL.get().unwrap().worker_main(name.clone()) + }, + ).unwrap(); + } + + // Spawn compaction workers + for i in 0..3 { + let name = format!("compaction_worker_{}", i); + thread_mgr::spawn( + ThreadKind::CompactionWorker, + None, + None, + &name.clone(), + true, + move || { + GC_POOL.get().unwrap().worker_main(name.clone()) + }, + ).unwrap(); + } + signals.handle(|signal| match signal { Signal::Quit => { info!( diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index fc8824171f..0d26d22faa 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -92,7 +92,8 @@ impl Pool where J::ErrorType: Debug { } } - fn worker_main(&self, worker_name: String) -> anyhow::Result<()> { + // TODO listen for shutdown request? + pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); loop { if let Some(Deadline {job, ..}) = job_table.pop_due() { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 238be6ab00..6387676e63 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -151,8 +151,8 @@ pub fn shutdown_all_tenants() { drop(m); thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None); - thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None); + thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollectionWorker), None, None); + thread_mgr::shutdown_threads(Some(ThreadKind::CompactionWorker), None, None); // Ok, no background threads running anymore. Flush any remaining data in // memory to disk. diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index b908f220ee..15735a8043 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -94,11 +94,11 @@ pub enum ThreadKind { // Thread that connects to a safekeeper to fetch WAL for one timeline. WalReceiver, - // Thread that handles compaction of all timelines for a tenant. - Compactor, + // Worker that does compaction jobs + CompactionWorker, - // Thread that handles GC of a tenant - GarbageCollector, + // Worker that does GC jobs + GarbageCollectionWorker, // Thread that flushes frozen in-memory layers to disk LayerFlushThread,