mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 00:12:54 +00:00
Start workers
This commit is contained in:
@@ -9,12 +9,7 @@ use clap::{App, Arg};
|
||||
use daemonize::Daemonize;
|
||||
|
||||
use fail::FailScenario;
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
|
||||
thread_mgr::ThreadKind,
|
||||
timelines, virtual_file, LOG_FILE_NAME,
|
||||
};
|
||||
use pageserver::{LOG_FILE_NAME, config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, tenant_jobs::gc::GC_POOL, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file};
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
http::endpoint,
|
||||
@@ -305,6 +300,36 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type),
|
||||
)?;
|
||||
|
||||
// Spawn GC workers
|
||||
for i in 0..3 {
|
||||
let name = format!("gc_worker_{}", i);
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::GarbageCollectionWorker,
|
||||
None,
|
||||
None,
|
||||
&name.clone(),
|
||||
true,
|
||||
move || {
|
||||
GC_POOL.get().unwrap().worker_main(name.clone())
|
||||
},
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
// Spawn compaction workers
|
||||
for i in 0..3 {
|
||||
let name = format!("compaction_worker_{}", i);
|
||||
thread_mgr::spawn(
|
||||
ThreadKind::CompactionWorker,
|
||||
None,
|
||||
None,
|
||||
&name.clone(),
|
||||
true,
|
||||
move || {
|
||||
GC_POOL.get().unwrap().worker_main(name.clone())
|
||||
},
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
signals.handle(|signal| match signal {
|
||||
Signal::Quit => {
|
||||
info!(
|
||||
|
||||
@@ -92,7 +92,8 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
|
||||
}
|
||||
}
|
||||
|
||||
fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
|
||||
// TODO listen for shutdown request?
|
||||
pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
loop {
|
||||
if let Some(Deadline {job, ..}) = job_table.pop_due() {
|
||||
|
||||
@@ -151,8 +151,8 @@ pub fn shutdown_all_tenants() {
|
||||
drop(m);
|
||||
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiver), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollectionWorker), None, None);
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::CompactionWorker), None, None);
|
||||
|
||||
// Ok, no background threads running anymore. Flush any remaining data in
|
||||
// memory to disk.
|
||||
|
||||
@@ -94,11 +94,11 @@ pub enum ThreadKind {
|
||||
// Thread that connects to a safekeeper to fetch WAL for one timeline.
|
||||
WalReceiver,
|
||||
|
||||
// Thread that handles compaction of all timelines for a tenant.
|
||||
Compactor,
|
||||
// Worker that does compaction jobs
|
||||
CompactionWorker,
|
||||
|
||||
// Thread that handles GC of a tenant
|
||||
GarbageCollector,
|
||||
// Worker that does GC jobs
|
||||
GarbageCollectionWorker,
|
||||
|
||||
// Thread that flushes frozen in-memory layers to disk
|
||||
LayerFlushThread,
|
||||
|
||||
Reference in New Issue
Block a user