Start jobs when tenant activates

This commit is contained in:
Bojan Serafimov
2022-05-24 14:25:15 -04:00
parent b028f12b06
commit 07e6dd809d
6 changed files with 14 additions and 109 deletions

View File

@@ -13,7 +13,6 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -30,7 +30,6 @@ impl Job for CompactionJob {
}
}
pub static COMPACTION_SCHEDULER: OnceCell<Pool<CompactionJob>> = OnceCell::new();
pub static COMPACTION_POOL: OnceCell<Pool<CompactionJob>> = OnceCell::new();
// TODO spawn 20 worker threads
// TODO add tasks when tenant activates

View File

@@ -33,4 +33,3 @@ impl Job for GcJob {
pub static GC_POOL: OnceCell<Pool<GcJob>> = OnceCell::new();
// TODO spawn 20 worker threads
// TODO add tasks when tenant activates

View File

@@ -145,7 +145,7 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
}
}
fn queue_job(&self, job: J) {
pub fn queue_job(&self, job: J) {
let mut job_table = self.job_table.lock().unwrap();
let scheduled_for = Instant::now();
job_table.status.insert(job.clone(), JobStatus::Ready {

View File

@@ -8,6 +8,8 @@ use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::tenant_jobs::compaction::{COMPACTION_POOL, CompactionJob};
use crate::tenant_jobs::gc::{GC_POOL, GcJob};
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;
use crate::timelines;
@@ -238,33 +240,18 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
// If it's Idle, launch the compactor and GC threads
// If it's Idle, launch the compactor and GC jobs
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Important to activate before scheduling jobs
tenant.state = TenantState::Active;
GC_POOL.get().unwrap().queue_job(GcJob {
tenant: tenant_id,
});
COMPACTION_POOL.get().unwrap().queue_job(CompactionJob {
tenant: tenant_id,
});
}
TenantState::Stopping => {

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}