From 07e6dd809dcc8382ca5dea4cda72e3e8791b8957 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 24 May 2022 14:25:15 -0400 Subject: [PATCH] Start jobs when tenant activates --- pageserver/src/lib.rs | 1 - pageserver/src/tenant_jobs/compaction.rs | 3 +- pageserver/src/tenant_jobs/gc.rs | 1 - pageserver/src/tenant_jobs/worker.rs | 2 +- pageserver/src/tenant_mgr.rs | 37 ++++------- pageserver/src/tenant_threads.rs | 79 ------------------------ 6 files changed, 14 insertions(+), 109 deletions(-) delete mode 100644 pageserver/src/tenant_threads.rs diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index afb1f3801a..bde54debd2 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,6 @@ pub mod repository; pub mod storage_sync; pub mod tenant_config; pub mod tenant_mgr; -pub mod tenant_threads; pub mod thread_mgr; pub mod timelines; pub mod virtual_file; diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs index 915305f2bc..5f80dd2775 100644 --- a/pageserver/src/tenant_jobs/compaction.rs +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -30,7 +30,6 @@ impl Job for CompactionJob { } } -pub static COMPACTION_SCHEDULER: OnceCell> = OnceCell::new(); +pub static COMPACTION_POOL: OnceCell> = OnceCell::new(); // TODO spawn 20 worker threads -// TODO add tasks when tenant activates diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs index c7ef63b53d..d8bfdd854a 100644 --- a/pageserver/src/tenant_jobs/gc.rs +++ b/pageserver/src/tenant_jobs/gc.rs @@ -33,4 +33,3 @@ impl Job for GcJob { pub static GC_POOL: OnceCell> = OnceCell::new(); // TODO spawn 20 worker threads -// TODO add tasks when tenant activates diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 1cfb11065f..fc8824171f 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -145,7 +145,7 @@ impl Pool where J::ErrorType: Debug { } } - fn queue_job(&self, job: J) { + pub fn queue_job(&self, job: J) { let mut job_table = self.job_table.lock().unwrap(); let scheduled_for = Instant::now(); job_table.status.insert(job.clone(), JobStatus::Ready { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 20a723b5b5..238be6ab00 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -8,6 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate}; use crate::storage_sync::index::RemoteIndex; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; +use crate::tenant_jobs::compaction::{COMPACTION_POOL, CompactionJob}; +use crate::tenant_jobs::gc::{GC_POOL, GcJob}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; use crate::timelines; @@ -238,33 +240,18 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { // If the tenant is already active, nothing to do. TenantState::Active => {} - // If it's Idle, launch the compactor and GC threads + // If it's Idle, launch the compactor and GC jobs TenantState::Idle => { - thread_mgr::spawn( - ThreadKind::Compactor, - Some(tenant_id), - None, - "Compactor thread", - false, - move || crate::tenant_threads::compact_loop(tenant_id), - )?; - - let gc_spawn_result = thread_mgr::spawn( - ThreadKind::GarbageCollector, - Some(tenant_id), - None, - "GC thread", - false, - move || crate::tenant_threads::gc_loop(tenant_id), - ) - .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); - - if let Err(e) = &gc_spawn_result { - error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); - return gc_spawn_result; - } + // Important to activate before scheduling jobs tenant.state = TenantState::Active; + + GC_POOL.get().unwrap().queue_job(GcJob { + tenant: tenant_id, + }); + + COMPACTION_POOL.get().unwrap().queue_job(CompactionJob { + tenant: tenant_id, + }); } TenantState::Stopping => { diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs deleted file mode 100644 index b904d9040d..0000000000 --- a/pageserver/src/tenant_threads.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! This module contains functions to serve per-tenant background processes, -//! such as compaction and GC -use crate::repository::Repository; -use crate::tenant_mgr; -use crate::tenant_mgr::TenantState; -use anyhow::Result; -use std::time::Duration; -use tracing::*; -use utils::zid::ZTenantId; - -/// -/// Compaction thread's main loop -/// -pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid) { - error!("compact loop terminated with error: {:?}", err); - Err(err) - } else { - Ok(()) - } -} - -fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let compaction_period = repo.get_compaction_period(); - - std::thread::sleep(compaction_period); - trace!("compaction thread for tenant {} waking up", tenantid); - - // Compact timelines - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.compaction_iteration()?; - } - - trace!( - "compaction thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -} - -/// -/// GC thread's main loop -/// -pub fn gc_loop(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - - trace!("gc thread for tenant {} waking up", tenantid); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let gc_horizon = repo.get_gc_horizon(); - // Garbage collect old files that are not needed for PITR anymore - if gc_horizon > 0 { - repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; - } - - // TODO Write it in more adequate way using - // condvar.wait_timeout() or something - let mut sleep_time = repo.get_gc_period().as_secs(); - while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) - { - sleep_time -= 1; - std::thread::sleep(Duration::from_secs(1)); - } - } - trace!( - "GC thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -}