From 0d5fee3ab7651828051bf2fa47822c70e1c21e24 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Mon, 23 May 2022 23:42:25 -0400 Subject: [PATCH] Add gc pool --- pageserver/src/tenant_jobs/compaction.rs | 35 +++++++++++++++ pageserver/src/tenant_jobs/gc.rs | 34 +++++++++++++++ pageserver/src/tenant_jobs/mod.rs | 2 + pageserver/src/tenant_jobs/worker.rs | 54 +++++++++++++++--------- 4 files changed, 104 insertions(+), 21 deletions(-) create mode 100644 pageserver/src/tenant_jobs/compaction.rs create mode 100644 pageserver/src/tenant_jobs/gc.rs diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs new file mode 100644 index 0000000000..5df7540f87 --- /dev/null +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -0,0 +1,35 @@ +use once_cell::sync::OnceCell; +use utils::zid::ZTenantId; +use crate::repository::Repository; + +use crate::tenant_mgr::{self, TenantState}; + +use super::worker::{Job, Pool}; + + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CompactionJob { + pub tenant: ZTenantId, +} + +impl Job for CompactionJob { + type ErrorType = anyhow::Error; + + fn run(&self) -> Result<(), Self::ErrorType> { + // TODO GC has the same code too + if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { + // TODO Maybe record this as "didn't run"? + return Ok(()); + } + + let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; + repo.compaction_iteration()?; + + Ok(()) + } +} + +pub static COMPACTION_SCHEDULER: OnceCell> = OnceCell::new(); + +// TODO init pool with compaction interval +// TODO spawn 20 worker threads diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs new file mode 100644 index 0000000000..0ecfdd90f9 --- /dev/null +++ b/pageserver/src/tenant_jobs/gc.rs @@ -0,0 +1,34 @@ +use once_cell::sync::OnceCell; +use utils::zid::ZTenantId; +use crate::{repository::Repository, tenant_mgr::{self, TenantState}}; + +use super::worker::{Job, Pool}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct GcJob { + pub tenant: ZTenantId, +} + +impl Job for GcJob { + type ErrorType = anyhow::Error; + + fn run(&self) -> Result<(), Self::ErrorType> { + if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { + // TODO Maybe record this as "didn't run"? + return Ok(()); + } + + let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; + let gc_horizon = repo.get_gc_horizon(); + if gc_horizon > 0 { + repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; + } + + Ok(()) + } +} + +pub static GC_POOL: OnceCell> = OnceCell::new(); + +// TODO init gc pool with gc interval +// TODO spawn 20 worker threads diff --git a/pageserver/src/tenant_jobs/mod.rs b/pageserver/src/tenant_jobs/mod.rs index 2c8b83993a..f71bdec4d5 100644 --- a/pageserver/src/tenant_jobs/mod.rs +++ b/pageserver/src/tenant_jobs/mod.rs @@ -1 +1,3 @@ pub mod worker; +pub mod compaction; +pub mod gc; diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index 4b8fd1bcd3..94af61663e 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -1,28 +1,33 @@ -use std::{panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; - -// TODO maybe make jobs tenant-specific? Makes monitorin easier. +use std::{any::Any, fmt::Debug, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}}; pub trait Job: std::fmt::Debug + Send + Clone + 'static { - fn run(&self); -} - -#[derive(Debug, Clone)] -enum JobStatus { - Ready, - Running(Instant), // TODO add worker id - Stuck, // TODO remember error + type ErrorType; + fn run(&self) -> Result<(), Self::ErrorType>; } #[derive(Debug)] -struct JobStatusTable { +enum JobError { + Panic(Box), + Error(J::ErrorType), +} + +#[derive(Debug)] +enum JobStatus where J::ErrorType: Debug { + Ready, + Running(Instant), // TODO add worker id + Stuck(JobError), +} + +#[derive(Debug)] +struct JobStatusTable where J::ErrorType: Debug { // TODO this vec is no good. Too much index arithmetic. - jobs: Vec<(J, JobStatus)>, + jobs: Vec<(J, JobStatus)>, next: usize, begin: Instant, period: Duration, } -impl JobStatusTable { +impl JobStatusTable where J::ErrorType: Debug { fn next(&mut self) -> Option<(usize, J)> { while self.next < self.jobs.len() { let curr = self.next; @@ -34,7 +39,7 @@ impl JobStatusTable { return Some((curr, self.jobs[curr].0.clone())) } JobStatus::Running(_) => println!("Job already running, skipping this round"), - JobStatus::Stuck => println!("Job stuck, skipping"), + JobStatus::Stuck(_) => println!("Job stuck, skipping"), } } None @@ -53,12 +58,12 @@ impl JobStatusTable { } #[derive(Debug)] -struct Pool { +pub struct Pool where J::ErrorType: Debug { job_table: Mutex>, condvar: Condvar, // Notified when idle worker should wake up } -impl Pool { +impl Pool where J::ErrorType: Debug { fn new() -> Self { Pool { job_table: Mutex::new(JobStatusTable:: { @@ -78,17 +83,21 @@ impl Pool { // Run job without holding lock drop(job_table); let result = panic::catch_unwind(AssertUnwindSafe(|| { - job.run(); + job.run() })); job_table = self.job_table.lock().unwrap(); // Update job status match result { - Ok(()) => { + Ok(Ok(())) => { job_table.jobs[id].1 = JobStatus::Ready; }, + Ok(Err(e)) => { + job_table.jobs[id].1 = JobStatus::Stuck(JobError::Error(e)); + println!("Job errored, thread is ok."); + }, Err(e) => { - job_table.jobs[id].1 = JobStatus::Stuck; + job_table.jobs[id].1 = JobStatus::Stuck(JobError::Panic(e)); println!("Job panicked, thread is ok."); }, } @@ -127,11 +136,14 @@ mod tests { } impl Job for PrintJob { - fn run(&self) { + type ErrorType = String; + + fn run(&self) -> Result<(), String> { if self.to_print == "pls panic" { panic!("AAA"); } println!("{}", self.to_print); + Ok(()) } }