diff --git a/pageserver/src/jobs/chore.rs b/pageserver/src/jobs/chore.rs new file mode 100644 index 0000000000..567abcf060 --- /dev/null +++ b/pageserver/src/jobs/chore.rs @@ -0,0 +1,53 @@ +use std::{marker::PhantomData, ops::Range, time::{Duration, Instant}}; +use serde::{Deserialize, Serialize}; + + + +pub trait Job: std::fmt::Debug + Send + Copy + Clone + PartialEq + Eq + 'static { + type ErrorType: AsRef; + fn run(&self) -> Result<(), Self::ErrorType>; +} + +pub enum Schedule { + Every(Duration), +} + +/// A job that repeats on a schedule +pub struct Chore { + pub job: J, + pub schedule: Schedule, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)] +pub struct ChoreHandle { + _marker: PhantomData, + chore_id: u64, +} + +pub enum Status { + Scheduled(Instant), + Error(J::ErrorType), +} + +pub trait Scheduler { + fn add_chore(&self, chore: Chore) -> ChoreHandle; + fn remove_chore(&self, ch: ChoreHandle); + fn get_status(&self, ch: ChoreHandle) -> Status; +} + +pub struct SimpleScheduler { + _marker: PhantomData, +} +impl Scheduler for SimpleScheduler { + fn add_chore(&self, chore: Chore) -> ChoreHandle { + todo!() + } + + fn remove_chore(&self, ch: ChoreHandle) { + todo!() + } + + fn get_status(&self, ch: ChoreHandle) -> Status { + todo!() + } +} diff --git a/pageserver/src/jobs/compaction.rs b/pageserver/src/jobs/compaction.rs new file mode 100644 index 0000000000..e2424f06d9 --- /dev/null +++ b/pageserver/src/jobs/compaction.rs @@ -0,0 +1,33 @@ +use once_cell::sync::OnceCell; +use utils::zid::ZTenantId; +use tracing::*; +use crate::repository::Repository; + +use crate::tenant_mgr::{self, TenantState}; + +use super::chore::{Job, SimpleScheduler}; + + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CompactionJob { + pub tenant: ZTenantId, +} + +impl Job for CompactionJob { + type ErrorType = anyhow::Error; + + fn run(&self) -> Result<(), Self::ErrorType> { + // TODO why not kill the chore when tenant is not active? + // TODO GC has the same code too + if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) { + return Ok(()); + } + + let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; + repo.compaction_iteration()?; + + Ok(()) + } +} + +pub static COMPACTION_SCHEDULER: OnceCell> = OnceCell::new(); diff --git a/pageserver/src/jobs/gc.rs b/pageserver/src/jobs/gc.rs new file mode 100644 index 0000000000..5d198f8643 --- /dev/null +++ b/pageserver/src/jobs/gc.rs @@ -0,0 +1,35 @@ +use once_cell::sync::OnceCell; +use utils::zid::ZTenantId; +use tracing::*; +use crate::repository::Repository; + +use crate::tenant_mgr::{self, TenantState}; + +use super::chore::{Job, SimpleScheduler}; + + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GcJob { + pub tenant: ZTenantId, +} + +impl Job for GcJob { + type ErrorType = anyhow::Error; + + fn run(&self) -> Result<(), Self::ErrorType> { + // TODO why not kill the chore when tenant is not active? + if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) { + return Ok(()); + } + + let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; + let gc_horizon = repo.get_gc_horizon(); + if gc_horizon > 0 { + repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; + } + + Ok(()) + } +} + +pub static GC_SCHEDULER: OnceCell> = OnceCell::new(); diff --git a/pageserver/src/jobs/mod.rs b/pageserver/src/jobs/mod.rs index be2b292734..5a2c16294a 100644 --- a/pageserver/src/jobs/mod.rs +++ b/pageserver/src/jobs/mod.rs @@ -1,2 +1,5 @@ pub mod scheduler; pub mod worker; +pub mod chore; +pub mod gc; +pub mod compaction; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8bc709cbc6..6788933434 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,6 @@ pub mod repository; pub mod storage_sync; pub mod tenant_config; pub mod tenant_mgr; -pub mod tenant_threads; pub mod thread_mgr; pub mod timelines; pub mod virtual_file; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 20a723b5b5..ccf9b58bc0 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -2,6 +2,9 @@ //! page server. use crate::config::PageServerConf; +use crate::jobs::chore::{Chore, ChoreHandle, Schedule, Scheduler}; +use crate::jobs::compaction::{COMPACTION_SCHEDULER, CompactionJob}; +use crate::jobs::gc::{GC_SCHEDULER, GcJob}; use crate::layered_repository::LayeredRepository; use crate::pgdatadir_mapping::DatadirTimeline; use crate::repository::{Repository, TimelineSyncStatusUpdate}; @@ -21,6 +24,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; use std::sync::Arc; +use std::time::Duration; use tracing::*; use utils::zid::{ZTenantId, ZTimelineId}; @@ -71,7 +75,7 @@ pub enum TenantState { //Ready, // This tenant exists on local disk, and the layer map has been loaded into memory. // The local disk might have some newer files that don't exist in cloud storage yet. - Active, + Active(ChoreHandle, ChoreHandle), // Tenant is active, but there is no walreceiver connection. Idle, // This tenant exists on local disk, and the layer map has been loaded into memory. @@ -83,7 +87,7 @@ pub enum TenantState { impl fmt::Display for TenantState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TenantState::Active => f.write_str("Active"), + TenantState::Active(_, _) => f.write_str("Active"), TenantState::Idle => f.write_str("Idle"), TenantState::Stopping => f.write_str("Stopping"), } @@ -236,35 +240,26 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { match tenant.state { // If the tenant is already active, nothing to do. - TenantState::Active => {} + TenantState::Active(_, _) => {} // If it's Idle, launch the compactor and GC threads TenantState::Idle => { - thread_mgr::spawn( - ThreadKind::Compactor, - Some(tenant_id), - None, - "Compactor thread", - false, - move || crate::tenant_threads::compact_loop(tenant_id), - )?; + let repo = crate::tenant_mgr::get_repository_for_tenant(tenant_id)?; - let gc_spawn_result = thread_mgr::spawn( - ThreadKind::GarbageCollector, - Some(tenant_id), - None, - "GC thread", - false, - move || crate::tenant_threads::gc_loop(tenant_id), - ) - .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); + let compaction_chore_handle = COMPACTION_SCHEDULER.get().unwrap().add_chore(Chore { + job: CompactionJob { + tenant: tenant_id, + }, + schedule: Schedule::Every(repo.get_compaction_period()), + }); - if let Err(e) = &gc_spawn_result { - error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); - thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); - return gc_spawn_result; - } - tenant.state = TenantState::Active; + let gc_chore_handle = GC_SCHEDULER.get().unwrap().add_chore(Chore { + job: GcJob { + tenant: tenant_id, + }, + schedule: Schedule::Every(repo.get_gc_period()), + }); + tenant.state = TenantState::Active(gc_chore_handle, compaction_chore_handle); } TenantState::Stopping => { diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs deleted file mode 100644 index b904d9040d..0000000000 --- a/pageserver/src/tenant_threads.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! This module contains functions to serve per-tenant background processes, -//! such as compaction and GC -use crate::repository::Repository; -use crate::tenant_mgr; -use crate::tenant_mgr::TenantState; -use anyhow::Result; -use std::time::Duration; -use tracing::*; -use utils::zid::ZTenantId; - -/// -/// Compaction thread's main loop -/// -pub fn compact_loop(tenantid: ZTenantId) -> Result<()> { - if let Err(err) = compact_loop_ext(tenantid) { - error!("compact loop terminated with error: {:?}", err); - Err(err) - } else { - Ok(()) - } -} - -fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let compaction_period = repo.get_compaction_period(); - - std::thread::sleep(compaction_period); - trace!("compaction thread for tenant {} waking up", tenantid); - - // Compact timelines - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - repo.compaction_iteration()?; - } - - trace!( - "compaction thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -} - -/// -/// GC thread's main loop -/// -pub fn gc_loop(tenantid: ZTenantId) -> Result<()> { - loop { - if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) { - break; - } - - trace!("gc thread for tenant {} waking up", tenantid); - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let gc_horizon = repo.get_gc_horizon(); - // Garbage collect old files that are not needed for PITR anymore - if gc_horizon > 0 { - repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; - } - - // TODO Write it in more adequate way using - // condvar.wait_timeout() or something - let mut sleep_time = repo.get_gc_period().as_secs(); - while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active) - { - sleep_time -= 1; - std::thread::sleep(Duration::from_secs(1)); - } - } - trace!( - "GC thread stopped for tenant {} state is {:?}", - tenantid, - tenant_mgr::get_tenant_state(tenantid) - ); - Ok(()) -}