This commit is contained in:
Bojan Serafimov
2022-05-23 23:18:25 -04:00
parent fefbff8981
commit dc47f9ccf1
3 changed files with 61 additions and 58 deletions

View File

@@ -1,2 +1 @@
pub mod worker;
pub mod scheduler;

View File

@@ -1,14 +0,0 @@
use std::time::Duration;
use super::worker::Job;
enum JobStatus {
Active,
Stuck,
}
struct Scheduler<J: Job> {
interval: Duration,
jobs: Vec<(J, JobStatus)>
}

View File

@@ -1,4 +1,4 @@
use std::{collections::VecDeque, ops::Sub, panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
use std::{panic::{self, AssertUnwindSafe}, sync::{Condvar, Mutex}, time::{Duration, Instant}};
// TODO maybe make jobs tenant-specific? Makes monitorin easier.
@@ -9,22 +9,53 @@ pub trait Job: std::fmt::Debug + Send + Clone + 'static {
#[derive(Debug, Clone)]
enum JobStatus {
Ready,
Running(Instant),
Running(Instant), // TODO add worker id
Stuck, // TODO remember error
}
#[derive(Debug)]
struct JobStatusTable<J: Job> {
// TODO this vec is no good. Too much index arithmetic.
jobs: Vec<(J, JobStatus)>,
next: usize,
begin: Instant,
period: Duration,
}
impl<J: Job> JobStatusTable<J> {
fn next(&mut self) -> Option<(usize, J)> {
while self.next < self.jobs.len() {
let curr = self.next;
self.next += 1;
match self.jobs[curr].1 {
JobStatus::Ready => {
self.jobs[curr].1 = JobStatus::Running(Instant::now());
return Some((curr, self.jobs[curr].0.clone()))
}
JobStatus::Running(_) => println!("Job already running, skipping this round"),
JobStatus::Stuck => println!("Job stuck, skipping"),
}
}
None
}
fn check_end_of_cycle(&mut self) -> Option<Duration> {
let until_next = self.period.saturating_sub(Instant::now().duration_since(self.begin));
if until_next.is_zero() {
self.next = 0;
self.begin = Instant::now();
None
} else {
Some(until_next)
}
}
}
#[derive(Debug)]
struct Pool<J: Job> {
job_table: Mutex<JobStatusTable<J>>,
condvar: Condvar, // Notified when queue becomes nonempty
period: Duration,
condvar: Condvar, // Notified when idle worker should wake up
}
impl<J: Job> Pool<J> {
@@ -34,48 +65,36 @@ impl<J: Job> Pool<J> {
jobs: vec![],
next: 0,
begin: Instant::now(),
period: Duration::from_millis(10),
}),
condvar: Condvar::new(),
period: Duration::from_millis(10),
}
}
fn worker_main(&self) -> anyhow::Result<()> {
let mut job_table = self.job_table.lock().unwrap();
loop {
if job_table.next < job_table.jobs.len() {
let curr = job_table.next;
let (job, status) = job_table.jobs[curr].clone();
job_table.next += 1;
if let Some((id, job)) = job_table.next() {
// Run job without holding lock
drop(job_table);
let result = panic::catch_unwind(AssertUnwindSafe(|| {
job.run();
}));
job_table = self.job_table.lock().unwrap();
if matches!(status, JobStatus::Ready) {
// Run job without holding lock
job_table.jobs[curr].1 = JobStatus::Running(Instant::now());
drop(job_table);
// TODO don't run job if previous iteration is still running
let result = panic::catch_unwind(AssertUnwindSafe(|| {
job.run();
}));
job_table = self.job_table.lock().unwrap();
match result {
Ok(()) => {
job_table.jobs[curr].1 = JobStatus::Ready;
},
Err(e) => {
job_table.jobs[curr].1 = JobStatus::Stuck;
println!("Job panicked, thread is ok.");
},
}
// Update job status
match result {
Ok(()) => {
job_table.jobs[id].1 = JobStatus::Ready;
},
Err(e) => {
job_table.jobs[id].1 = JobStatus::Stuck;
println!("Job panicked, thread is ok.");
},
}
} else {
let since_last_cycle = Instant::now().duration_since(job_table.begin);
let until_next_cycle = self.period.saturating_sub(since_last_cycle);
if until_next_cycle.is_zero() {
job_table.next = 0;
job_table.begin = Instant::now();
} else {
job_table = self.condvar.wait_timeout(job_table, until_next_cycle).unwrap().0;
if let Some(wait_time) = job_table.check_end_of_cycle() {
job_table = self.condvar.wait_timeout(job_table, wait_time).unwrap().0;
}
}
}
@@ -86,8 +105,7 @@ impl<J: Job> Pool<J> {
let mut job_table = self.job_table.lock().unwrap();
job_table.jobs.push((job, JobStatus::Ready));
// If the queue was empty, wake up the next worker thread to pick up
// the job.
// Notify workers if they're waiting for work.
if job_table.next == job_table.jobs.len() - 1 {
self.condvar.notify_one();
}
@@ -134,12 +152,12 @@ mod tests {
},
).unwrap();
let j = PrintJob {
to_print: "hello from job".to_string(),
};
TEST_POOL.get().unwrap().queue_job(j.clone());
TEST_POOL.get().unwrap().queue_job(j.clone());
TEST_POOL.get().unwrap().queue_job(j.clone());
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job 1".to_string(),
});
TEST_POOL.get().unwrap().queue_job(PrintJob {
to_print: "hello from job 2".to_string(),
});
tokio::time::sleep(Duration::from_millis(100)).await;
}