Add fair scheduler process

This commit is contained in:
Bojan Serafimov
2022-05-23 15:11:38 -04:00
parent 77c35906f5
commit d84c4907a9
2 changed files with 117 additions and 71 deletions

View File

@@ -1,31 +1,14 @@
use std::collections::VecDeque;
use std::time::Duration;
use std::collections::{HashMap, VecDeque};
use std::ops::Add;
use tokio::time::{Duration, Instant};
use crate::thread_mgr::shutdown_watcher;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use tokio::time::sleep;
use tokio::time::{sleep, sleep_until};
use super::worker::{Job, Worker, Report};
#[derive(Debug)]
pub struct Sched<J: Job> {
/// Idle workers
workers: Vec<Worker<J>>,
/// Queued due jobs
jobs: VecDeque<J>,
/// Channel for registering due jobs
pub send_work: Sender<J>, // TODO should job specify report destination?
recv_work: Receiver<J>,
/// Channel for enlisting idle workers
recv_worker: Receiver<Worker<J>>,
/// Channel where workers report results
recv_report: Receiver<Report<J>>,
}
pub struct Spawner<J: Job> {
send_worker: Sender<Worker<J>>,
send_report: Sender<Report<J>>,
@@ -50,26 +33,104 @@ impl<J: Job> Spawner<J> {
}
}
impl<J: Job> Sched<J> {
pub fn new() -> (Sched<J>, Spawner<J>) {
#[derive(Debug)]
pub enum Status {
Scheduled,
Running,
Stuck,
}
pub struct Scheduler<J: Job> {
send_work: Sender<J>,
recv_report: Receiver<Report<J>>,
pub period: Duration,
pub chores: Mutex<HashMap<J, Status>>,
}
impl<J: Job> Scheduler<J> {
pub async fn handle_report(&mut self, report: Report<J>) {
let job = report.for_job;
match report.result {
Ok(()) => {
// Reschedule job to run again
if let Some(status) = self.chores.lock().await.get_mut(&job) {
*status = Status::Scheduled;
}
},
Err(e) => {
// Remember error that got job stuck
println!("task panicked");
if let Some(status) = self.chores.lock().await.get_mut(&job) {
*status = Status::Stuck;
}
}
}
}
pub fn run(mut self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
let mut next_iteration = Instant::now();
loop {
tokio::select! {
_ = shutdown_watcher() => break,
_ = sleep_until(next_iteration) => {
next_iteration = Instant::now().add(self.period);
for (job, status) in self.chores.lock().await.iter_mut() {
if matches!(status, Status::Scheduled) {
self.send_work.send(job.clone()).await.unwrap();
*status = Status::Running;
}
}
}
report = self.recv_report.recv() => {
let report = report.expect("report channel closed");
self.handle_report(report).await;
}
}
}
});
Ok(())
}
}
#[derive(Debug)]
pub struct Board<J: Job> {
workers: Vec<Worker<J>>,
jobs: VecDeque<J>,
recv_work: Receiver<J>,
recv_worker: Receiver<Worker<J>>,
}
impl<J: Job> Board<J> {
pub fn new() -> (Board<J>, Spawner<J>, Scheduler<J>) {
let worker = channel::<Worker<J>>(100);
let work = channel::<J>(100);
let report = channel::<Report<J>>(100);
let sched = Sched {
let board = Board {
workers: vec![],
jobs: VecDeque::new(),
recv_worker: worker.1,
send_work: work.0,
recv_work: work.1,
recv_report: report.1,
};
let spawner = Spawner {
send_worker: worker.0,
send_report: report.0,
};
let scheduler = Scheduler {
send_work: work.0,
recv_report: report.1,
period: Duration::from_millis(10),
chores: Mutex::new(HashMap::new()),
};
(sched, spawner)
(board, spawner, scheduler)
}
@@ -92,24 +153,6 @@ impl<J: Job> Sched<J> {
}
}
pub async fn handle_report(&mut self, report: Report<J>) {
// Reschedule job to run again
let send_work = self.send_work.clone();
let job = report.for_job;
match report.result {
Ok(()) => {
tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
send_work.send(job).await.unwrap();
});
},
Err(e) => {
// TODO mark chore as blocked
println!("task panicked");
}
}
}
pub fn run(mut self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -117,9 +160,8 @@ impl<J: Job> Sched<J> {
runtime.block_on(async {
loop {
let shutdown_watcher = shutdown_watcher();
tokio::select! {
_ = shutdown_watcher => break,
_ = shutdown_watcher() => break,
worker = self.recv_worker.recv() => {
let worker = worker.expect("worker channel closed");
self.handle_worker(worker).await;
@@ -128,10 +170,6 @@ impl<J: Job> Sched<J> {
let job = job.expect("job channel closed");
self.handle_job(job).await;
},
report = self.recv_report.recv() => {
let report = report.expect("report channel closed");
self.handle_report(report).await;
}
}
}
});
@@ -142,10 +180,10 @@ impl<J: Job> Sched<J> {
#[cfg(test)]
mod tests {
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
use crate::thread_mgr::{self, ThreadKind};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
struct PrintJob {
to_print: String
}
@@ -158,9 +196,25 @@ mod tests {
#[tokio::test]
async fn sched_1() {
let (sched, spawner) = Sched::<PrintJob>::new();
let (board, spawner, scheduler) = Board::<PrintJob>::new();
let send_work = sched.send_work.clone();
// Schedule recurring job
let j = PrintJob {
to_print: "hello from job".to_string(),
};
scheduler.chores.lock().await.insert(j, Status::Scheduled);
// Spawn board
thread_mgr::spawn(
ThreadKind::GcScheduler,
None,
None,
"gc_scheduler",
true,
move || {
board.run()
},
).unwrap();
// Spawn scheduler
thread_mgr::spawn(
@@ -170,20 +224,17 @@ mod tests {
"gc_scheduler",
true,
move || {
sched.run()
scheduler.run()
},
).unwrap();
// Spawn worker
spawner.spawn_worker();
// Send a job
let j = PrintJob {
to_print: "hello from job".to_string(),
};
send_work.send(j.clone()).await.unwrap();
// Wait for job to run a few times
sleep(Duration::from_millis(100)).await;
// Cleanup
thread_mgr::shutdown_threads(None, None, None);
}
}

View File

@@ -4,19 +4,14 @@
use crate::thread_mgr::shutdown_watcher;
use tokio::sync::mpsc::{Sender, channel};
use std::any::Any;
use std::hash::Hash;
use std::panic::AssertUnwindSafe;
use std::panic::catch_unwind;
pub trait Job: std::fmt::Debug + Send + 'static + Clone {
pub trait Job: std::fmt::Debug + Send + 'static + Clone + PartialEq + Eq + Hash {
fn run(&self);
}
// TODO make scheduler an async fn, leave rescheduling to chore_mgr
pub struct Work<J: Job> {
pub job: J,
pub when_done: Sender<Report<J>>,
}
#[derive(Debug)]
pub struct Worker<J: Job>(pub Sender<J>);
@@ -66,7 +61,7 @@ mod tests {
use crate::thread_mgr::{self, ThreadKind};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
struct PrintJob {
to_print: String
}