Use queue

This commit is contained in:
Bojan Serafimov
2022-05-24 10:50:56 -04:00
parent 0d5fee3ab7
commit 31dc9e6abd
3 changed files with 78 additions and 50 deletions

View File

@@ -7,7 +7,7 @@ use crate::tenant_mgr::{self, TenantState};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CompactionJob {
pub tenant: ZTenantId,
}

View File

@@ -4,7 +4,7 @@ use crate::{repository::Repository, tenant_mgr::{self, TenantState}};
use super::worker::{Job, Pool};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct GcJob {
pub tenant: ZTenantId,
}

View File

@@ -1,6 +1,6 @@
use std::{any::Any, fmt::Debug, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
use std::{any::Any, collections::{BinaryHeap, HashMap}, fmt::Debug, hash::Hash, ops::Add, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
pub trait Job: std::fmt::Debug + Send + Clone + 'static {
pub trait Job: std::fmt::Debug + Send + Clone + PartialOrd + Ord + Hash + 'static {
type ErrorType;
fn run(&self) -> Result<(), Self::ErrorType>;
}
@@ -18,42 +18,51 @@ enum JobStatus<J: Job> where J::ErrorType: Debug {
Stuck(JobError<J>),
}
#[derive(Debug)]
struct Deadline<J: Job> where J::ErrorType: Debug {
start_by: Instant,
job: J,
}
impl<J: Job> PartialOrd for Deadline<J> where J::ErrorType: Debug {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
other.start_by.partial_cmp(&self.start_by)
}
}
impl<J: Job> Ord for Deadline<J> where J::ErrorType: Debug {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.start_by.cmp(&self.start_by)
}
}
impl<J: Job> PartialEq for Deadline<J> where J::ErrorType: Debug {
fn eq(&self, other: &Self) -> bool {
self.start_by == other.start_by
}
}
impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug { }
#[derive(Debug)]
struct JobStatusTable<J: Job> where J::ErrorType: Debug {
// TODO this vec is no good. Too much index arithmetic.
jobs: Vec<(J, JobStatus<J>)>,
next: usize,
begin: Instant,
period: Duration,
status: HashMap<J, JobStatus<J>>,
queue: BinaryHeap<Deadline<J>>,
}
impl<J: Job> JobStatusTable<J> where J::ErrorType: Debug {
fn next(&mut self) -> Option<(usize, J)> {
while self.next < self.jobs.len() {
let curr = self.next;
self.next += 1;
match self.jobs[curr].1 {
JobStatus::Ready => {
self.jobs[curr].1 = JobStatus::Running(Instant::now());
return Some((curr, self.jobs[curr].0.clone()))
}
JobStatus::Running(_) => println!("Job already running, skipping this round"),
JobStatus::Stuck(_) => println!("Job stuck, skipping"),
fn pop_due(&mut self) -> Option<Deadline<J>> {
if let Some(deadline) = self.queue.peek() {
if Instant::now() > deadline.start_by {
return self.queue.pop();
}
}
None
}
fn check_end_of_cycle(&mut self) -> Option<Duration> {
let until_next = self.period.saturating_sub(Instant::now().duration_since(self.begin));
if until_next.is_zero() {
self.next = 0;
self.begin = Instant::now();
None
} else {
Some(until_next)
}
fn set_status(&mut self, job: &J, status: JobStatus<J>) {
let s = self.status.get_mut(job).expect("status not found");
*s = status;
}
}
@@ -67,10 +76,8 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
fn new() -> Self {
Pool {
job_table: Mutex::new(JobStatusTable::<J> {
jobs: vec![],
next: 0,
begin: Instant::now(),
period: Duration::from_millis(10),
status: HashMap::<J, JobStatus<J>>::new(),
queue: BinaryHeap::<Deadline<J>>::new(),
}),
condvar: Condvar::new(),
}
@@ -79,7 +86,9 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
fn worker_main(&self) -> anyhow::Result<()> {
let mut job_table = self.job_table.lock().unwrap();
loop {
if let Some((id, job)) = job_table.next() {
if let Some(Deadline {job, ..}) = job_table.pop_due() {
job_table.set_status(&job, JobStatus::Running(Instant::now()));
// Run job without holding lock
drop(job_table);
let result = panic::catch_unwind(AssertUnwindSafe(|| {
@@ -90,20 +99,30 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
// Update job status
match result {
Ok(Ok(())) => {
job_table.jobs[id].1 = JobStatus::Ready;
job_table.set_status(&job, JobStatus::Ready);
job_table.queue.push(Deadline {
job: job.clone(),
start_by: Instant::now().add(Duration::from_millis(10)),
})
},
Ok(Err(e)) => {
job_table.jobs[id].1 = JobStatus::Stuck(JobError::Error(e));
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
println!("Job errored, thread is ok.");
},
Err(e) => {
job_table.jobs[id].1 = JobStatus::Stuck(JobError::Panic(e));
job_table.set_status(&job, JobStatus::Stuck(JobError::Panic(e)));
println!("Job panicked, thread is ok.");
},
}
} else {
if let Some(wait_time) = job_table.check_end_of_cycle() {
job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0;
match job_table.queue.peek() {
Some(deadline) => {
let wait_time = deadline.start_by.duration_since(Instant::now());
job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0;
}
None => {
job_table = self.condvar.wait(job_table).unwrap();
}
}
}
}
@@ -112,12 +131,13 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
fn queue_job(&self, job: J) {
// Add the job to the back of the queue
let mut job_table = self.job_table.lock().unwrap();
job_table.jobs.push((job, JobStatus::Ready));
job_table.status.insert(job.clone(), JobStatus::Ready);
job_table.queue.push(Deadline {
job: job.clone(),
start_by: Instant::now(),
});
// Notify workers if they're waiting for work.
if job_table.next == job_table.jobs.len() - 1 {
self.condvar.notify_one();
}
self.condvar.notify_all();
}
}
@@ -130,7 +150,7 @@ mod tests {
use crate::thread_mgr::{self, ThreadKind};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
struct PrintJob {
to_print: String
}
@@ -164,11 +184,19 @@ mod tests {
},
).unwrap();
thread_mgr::spawn(
ThreadKind::GarbageCollector, // change this
None,
None,
"test_worker_2",
true,
move || {
TEST_POOL.get().unwrap().worker_main()
},
).unwrap();
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job 1".to_string(),
});
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job 2".to_string(),
to_print: "hello from job".to_string(),
});
tokio::time::sleep(Duration::from_millis(100)).await;