This commit is contained in:
Bojan Serafimov
2022-05-24 14:03:07 -04:00
parent 1e9b628c25
commit 98d8d65c83
3 changed files with 27 additions and 31 deletions

View File

@@ -1,3 +1,5 @@
use std::{ops::Add, time::Instant};
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::repository::Repository;
@@ -15,17 +17,16 @@ pub struct CompactionJob {
impl Job for CompactionJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
// TODO GC has the same code too
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
// TODO Maybe record this as "didn't run"?
return Ok(());
return Ok(None);
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
repo.compaction_iteration()?;
Ok(())
Ok(Some(Instant::now().add(repo.get_compaction_period())))
}
}

View File

@@ -1,3 +1,5 @@
use std::{ops::Add, time::Instant};
use once_cell::sync::OnceCell;
use utils::zid::ZTenantId;
use crate::{repository::Repository, tenant_mgr::{self, TenantState}};
@@ -12,11 +14,10 @@ pub struct GcJob {
impl Job for GcJob {
type ErrorType = anyhow::Error;
fn run(&self) -> Result<(), Self::ErrorType> {
fn run(&self) -> Result<Option<Instant>, Self::ErrorType> {
// Don't reschedule job if tenant isn't active
if !matches!(tenant_mgr::get_tenant_state(self.tenant), Some(TenantState::Active)) {
// TODO Maybe record this as "didn't run"?
// TODO Maybe unschedule?
return Ok(());
return Ok(None);
}
let repo = tenant_mgr::get_repository_for_tenant(self.tenant)?;
@@ -25,7 +26,7 @@ impl Job for GcJob {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(())
Ok(Some(Instant::now().add(repo.get_gc_period())))
}
}

View File

@@ -2,7 +2,7 @@ use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash,
pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static {
type ErrorType;
fn run(&self) -> Result<(), Self::ErrorType>;
fn run(&self) -> Result<Option<Instant>, Self::ErrorType>;
}
#[derive(Debug)]
@@ -18,12 +18,7 @@ enum JobStatus<J: Job> where J::ErrorType: Debug {
Stuck(JobError<J>),
}
#[derive(Debug)]
struct JobEntry<J: Job> where J::ErrorType: Debug {
period: Duration,
status: JobStatus<J>,
}
// TODO make this generic event, put in different module
#[derive(Debug)]
struct Deadline<J: Job> where J::ErrorType: Debug {
start_by: Instant,
@@ -52,7 +47,7 @@ impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug { }
#[derive(Debug)]
struct JobStatusTable<J: Job> where J::ErrorType: Debug {
status: HashMap<J, JobEntry<J>>,
status: HashMap<J, JobStatus<J>>,
queue: BinaryHeap<Deadline<J>>,
}
@@ -68,7 +63,7 @@ impl<J: Job> JobStatusTable<J> where J::ErrorType: Debug {
fn set_status(&mut self, job: &J, status: JobStatus<J>) {
let s = self.status.get_mut(job).expect("status not found");
(*s).status = status;
*s = status;
}
}
@@ -82,7 +77,7 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
fn new() -> Self {
Pool {
job_table: Mutex::new(JobStatusTable::<J> {
status: HashMap::<J, JobEntry<J>>::new(),
status: HashMap::<J, JobStatus<J>>::new(),
queue: BinaryHeap::<Deadline<J>>::new(),
}),
condvar: Condvar::new(),
@@ -104,14 +99,16 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
// Update job status
match result {
Ok(Ok(())) => {
Ok(Ok(Some(reschedule_for))) => {
job_table.set_status(&job, JobStatus::Ready);
let period = job_table.status.get(&job).unwrap().period;
job_table.queue.push(Deadline {
job: job.clone(),
start_by: Instant::now().add(period),
start_by: reschedule_for,
})
},
Ok(Ok(None)) => {
// TODO remove from job table
},
Ok(Err(e)) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
println!("Job errored, thread is ok.");
@@ -135,13 +132,10 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
}
}
fn queue_job(&self, job: J, period: Duration) {
fn queue_job(&self, job: J) {
// Add the job to the back of the queue
let mut job_table = self.job_table.lock().unwrap();
job_table.status.insert(job.clone(), JobEntry {
period,
status: JobStatus::Ready,
});
job_table.status.insert(job.clone(), JobStatus::Ready);
job_table.queue.push(Deadline {
job: job.clone(),
start_by: Instant::now(),
@@ -168,12 +162,12 @@ mod tests {
impl Job for PrintJob {
type ErrorType = String;
fn run(&self) -> Result<(), String> {
fn run(&self) -> Result<Option<Instant>, String> {
if self.to_print == "pls panic" {
panic!("AAA");
}
println!("{}", self.to_print);
Ok(())
Ok(Some(Instant::now().add(Duration::from_millis(10))))
}
}
@@ -207,7 +201,7 @@ mod tests {
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job".to_string(),
}, Duration::from_millis(10));
});
tokio::time::sleep(Duration::from_millis(100)).await;
}