From 98d8d65c832bee2d0a3285fa70c845eec4b1b8ad Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Tue, 24 May 2022 14:03:07 -0400 Subject: [PATCH] Cleanup --- pageserver/src/tenant_jobs/compaction.rs | 11 ++++---- pageserver/src/tenant_jobs/gc.rs | 11 ++++---- pageserver/src/tenant_jobs/worker.rs | 36 ++++++++++-------------- 3 files changed, 27 insertions(+), 31 deletions(-) diff --git a/pageserver/src/tenant_jobs/compaction.rs b/pageserver/src/tenant_jobs/compaction.rs index 6369e95035..915305f2bc 100644 --- a/pageserver/src/tenant_jobs/compaction.rs +++ b/pageserver/src/tenant_jobs/compaction.rs @@ -1,3 +1,5 @@ +use std::{ops::Add, time::Instant}; + use once_cell::sync::OnceCell; use utils::zid::ZTenantId; use crate::repository::Repository; @@ -15,17 +17,16 @@ pub struct CompactionJob { impl Job for CompactionJob { type ErrorType = anyhow::Error; - fn run(&self) -> Result<(), Self::ErrorType> { - // TODO GC has the same code too + fn run(&self) -> Result, Self::ErrorType> { + // Don't reschedule job if tenant isn't active if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { - // TODO Maybe record this as "didn't run"? - return Ok(()); + return Ok(None); } let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; repo.compaction_iteration()?; - Ok(()) + Ok(Some(Instant::now().add(repo.get_compaction_period()))) } } diff --git a/pageserver/src/tenant_jobs/gc.rs b/pageserver/src/tenant_jobs/gc.rs index 41c6e9844a..c7ef63b53d 100644 --- a/pageserver/src/tenant_jobs/gc.rs +++ b/pageserver/src/tenant_jobs/gc.rs @@ -1,3 +1,5 @@ +use std::{ops::Add, time::Instant}; + use once_cell::sync::OnceCell; use utils::zid::ZTenantId; use crate::{repository::Repository, tenant_mgr::{self, TenantState}}; @@ -12,11 +14,10 @@ pub struct GcJob { impl Job for GcJob { type ErrorType = anyhow::Error; - fn run(&self) -> Result<(), Self::ErrorType> { + fn run(&self) -> Result, Self::ErrorType> { + // Don't reschedule job if tenant isn't active if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) { - // TODO Maybe record this as "didn't run"? - // TODO Maybe unschedule? - return Ok(()); + return Ok(None); } let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?; @@ -25,7 +26,7 @@ impl Job for GcJob { repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?; } - Ok(()) + Ok(Some(Instant::now().add(repo.get_gc_period()))) } } diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index e1c4c4f4d3..24e1050cb0 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -2,7 +2,7 @@ use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash, pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static { type ErrorType; - fn run(&self) -> Result<(), Self::ErrorType>; + fn run(&self) -> Result, Self::ErrorType>; } #[derive(Debug)] @@ -18,12 +18,7 @@ enum JobStatus where J::ErrorType: Debug { Stuck(JobError), } -#[derive(Debug)] -struct JobEntry where J::ErrorType: Debug { - period: Duration, - status: JobStatus, -} - +// TODO make this generic event, put in different module #[derive(Debug)] struct Deadline where J::ErrorType: Debug { start_by: Instant, @@ -52,7 +47,7 @@ impl Eq for Deadline where J::ErrorType: Debug { } #[derive(Debug)] struct JobStatusTable where J::ErrorType: Debug { - status: HashMap>, + status: HashMap>, queue: BinaryHeap>, } @@ -68,7 +63,7 @@ impl JobStatusTable where J::ErrorType: Debug { fn set_status(&mut self, job: &J, status: JobStatus) { let s = self.status.get_mut(job).expect("status not found"); - (*s).status = status; + *s = status; } } @@ -82,7 +77,7 @@ impl Pool where J::ErrorType: Debug { fn new() -> Self { Pool { job_table: Mutex::new(JobStatusTable:: { - status: HashMap::>::new(), + status: HashMap::>::new(), queue: BinaryHeap::>::new(), }), condvar: Condvar::new(), @@ -104,14 +99,16 @@ impl Pool where J::ErrorType: Debug { // Update job status match result { - Ok(Ok(())) => { + Ok(Ok(Some(reschedule_for))) => { job_table.set_status(&job, JobStatus::Ready); - let period = job_table.status.get(&job).unwrap().period; job_table.queue.push(Deadline { job: job.clone(), - start_by: Instant::now().add(period), + start_by: reschedule_for, }) }, + Ok(Ok(None)) => { + // TODO remove from job table + }, Ok(Err(e)) => { job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e))); println!("Job errored, thread is ok."); @@ -135,13 +132,10 @@ impl Pool where J::ErrorType: Debug { } } - fn queue_job(&self, job: J, period: Duration) { + fn queue_job(&self, job: J) { // Add the job to the back of the queue let mut job_table = self.job_table.lock().unwrap(); - job_table.status.insert(job.clone(), JobEntry { - period, - status: JobStatus::Ready, - }); + job_table.status.insert(job.clone(), JobStatus::Ready); job_table.queue.push(Deadline { job: job.clone(), start_by: Instant::now(), @@ -168,12 +162,12 @@ mod tests { impl Job for PrintJob { type ErrorType = String; - fn run(&self) -> Result<(), String> { + fn run(&self) -> Result, String> { if self.to_print == "pls panic" { panic!("AAA"); } println!("{}", self.to_print); - Ok(()) + Ok(Some(Instant::now().add(Duration::from_millis(10)))) } } @@ -207,7 +201,7 @@ mod tests { TEST_POOL.get().unwrap().queue_job(PrintJob { to_print: "hello from job".to_string(), - }, Duration::from_millis(10)); + }); tokio::time::sleep(Duration::from_millis(100)).await; }