This commit is contained in:
Bojan Serafimov
2022-05-16 13:07:50 -04:00
parent 491a1870ce
commit 858ce6a8b5
7 changed files with 145 additions and 106 deletions

View File

@@ -0,0 +1,53 @@
use std::{marker::PhantomData, ops::Range, time::{Duration, Instant}};
use serde::{Deserialize, Serialize};
pub trait Job: std::fmt::Debug + Send + Copy + Clone + PartialEq + Eq + 'static {
type ErrorType: AsRef<dyn std::error::Error + 'static>;
fn run(&self) -> Result<(), Self::ErrorType>;
}
pub enum Schedule {
Every(Duration),
}
/// A job that repeats on a schedule
pub struct Chore<J: Job> {
pub job: J,
pub schedule: Schedule,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
pub struct ChoreHandle<J: Job> {
_marker: PhantomData<J>,
chore_id: u64,
}
pub enum Status<J: Job> {
Scheduled(Instant),
Error(J::ErrorType),
}
pub trait Scheduler<J: Job> {
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J>;
fn remove_chore(&self, ch: ChoreHandle<J>);
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J>;
}
pub struct SimpleScheduler<J: Job> {
_marker: PhantomData<J>,
}
impl<J: Job> Scheduler<J> for SimpleScheduler<J> {
fn add_chore(&self, chore: Chore<J>) -> ChoreHandle<J> {
todo!()
}
fn remove_chore(&self, ch: ChoreHandle<J>) {
todo!()
}
fn get_status(&self, ch: ChoreHandle<J>) -> Status<J> {
todo!()
}
}

View File

@@ -0,0 +1,33 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use tracing::*;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::chore::{Job, SimpleScheduler};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CompactionJob {
pub tenant: ZTenantId,
}
impl Job for CompactionJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO why not kill the chore when tenant is not active?
// TODO GC has the same code too
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
repo.compaction_iteration()?;
Ok(())
}
}
pub static COMPACTION_SCHEDULER: OnceCell<SimpleScheduler<CompactionJob>> = OnceCell::new();

35
pageserver/src/jobs/gc.rs Normal file
View File

@@ -0,0 +1,35 @@
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use tracing::*;
use crate::repository::Repository;
use crate::tenant_mgr::{self, TenantState};
use super::chore::{Job, SimpleScheduler};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct GcJob {
pub tenant: ZTenantId,
}
impl Job for GcJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO why not kill the chore when tenant is not active?
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active(_, _))) {
return Ok(());
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(())
}
}
pub static GC_SCHEDULER: OnceCell<SimpleScheduler<GcJob>> = OnceCell::new();

View File

@@ -1,2 +1,5 @@
pub mod scheduler;
pub mod worker;
pub mod chore;
pub mod gc;
pub mod compaction;

View File

@@ -13,7 +13,6 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -2,6 +2,9 @@
//! page server.
use crate::config::PageServerConf;
use crate::jobs::chore::{Chore, ChoreHandle, Schedule, Scheduler};
use crate::jobs::compaction::{COMPACTION_SCHEDULER, CompactionJob};
use crate::jobs::gc::{GC_SCHEDULER, GcJob};
use crate::layered_repository::LayeredRepository;
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
@@ -21,6 +24,7 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tracing::*;
use utils::zid::{ZTenantId, ZTimelineId};
@@ -71,7 +75,7 @@ pub enum TenantState {
//Ready,
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
Active,
Active(ChoreHandle<GcJob>, ChoreHandle<CompactionJob>),
// Tenant is active, but there is no walreceiver connection.
Idle,
// This tenant exists on local disk, and the layer map has been loaded into memory.
@@ -83,7 +87,7 @@ pub enum TenantState {
impl fmt::Display for TenantState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TenantState::Active => f.write_str("Active"),
TenantState::Active(_, _) => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
}
@@ -236,35 +240,26 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
match tenant.state {
// If the tenant is already active, nothing to do.
TenantState::Active => {}
TenantState::Active(_, _) => {}
// If it's Idle, launch the compactor and GC threads
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
)?;
let repo = crate::tenant_mgr::get_repository_for_tenant(tenant_id)?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
let compaction_chore_handle = COMPACTION_SCHEDULER.get().unwrap().add_chore(Chore {
job: CompactionJob {
tenant: tenant_id,
},
schedule: Schedule::Every(repo.get_compaction_period()),
});
if let Err(e) = &gc_spawn_result {
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
let gc_chore_handle = GC_SCHEDULER.get().unwrap().add_chore(Chore {
job: GcJob {
tenant: tenant_id,
},
schedule: Schedule::Every(repo.get_gc_period()),
});
tenant.state = TenantState::Active(gc_chore_handle, compaction_chore_handle);
}
TenantState::Stopping => {

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}