This commit is contained in:
Bojan Serafimov
2022-05-23 00:42:14 -04:00
parent 13ed1e32b4
commit 304641afe5
2 changed files with 115 additions and 60 deletions

View File

@@ -7,20 +7,106 @@ use tokio::time::sleep;
use super::worker::{Job, Worker, Report};
#[derive(Debug)]
pub struct Sched<J: Job> {
pub worker: (Sender<Worker<J>>, Receiver<Worker<J>>),
pub work: (Sender<J>, Receiver<J>),
pub report: (Sender<Report<J>>, Receiver<Report<J>>),
/// Idle workers
workers: Vec<Worker<J>>,
/// Queued due jobs
jobs: VecDeque<J>,
/// Channel for registering due jobs
pub send_work: Sender<J>, // TODO should job specify report destination?
recv_work: Receiver<J>,
/// Channel for enlisting idle workers
recv_worker: Receiver<Worker<J>>,
/// Channel where workers report results
recv_report: Receiver<Report<J>>,
}
pub struct Spawner<J: Job> {
send_worker: Sender<Worker<J>>,
send_report: Sender<Report<J>>,
}
impl<J: Job> Spawner<J> {
pub fn spawn_worker(&self) {
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
let enlist = self.send_worker.clone();
let report = self.send_report.clone();
thread_mgr::spawn(
ThreadKind::GcWorker,
None,
None,
"gc_worker_1",
true,
move || {
run_worker(enlist, report)
},
).unwrap();
}
}
impl<J: Job> Sched<J> {
pub fn new() -> Self {
Sched {
worker: channel::<Worker<J>>(100),
work: channel::<J>(100),
report: channel::<Report<J>>(100),
pub fn new() -> (Sched<J>, Spawner<J>) {
let worker = channel::<Worker<J>>(100);
let work = channel::<J>(100);
let report = channel::<Report<J>>(100);
let sched = Sched {
workers: vec![],
jobs: VecDeque::new(),
recv_worker: worker.1,
send_work: work.0,
recv_work: work.1,
recv_report: report.1,
};
let spawner = Spawner {
send_worker: worker.0,
send_report: report.0,
};
(sched, spawner)
}
pub async fn handle_job(&mut self, job: J) {
// Assign to a worker if any are availabe
while let Some(w) = self.workers.pop() {
if let Ok(()) = w.0.send(job.clone()).await {
return;
}
}
self.jobs.push_back(job);
}
pub async fn handle_worker(&mut self, worker: Worker<J>) {
// Assign jobs if any are queued
if let Some(j) = self.jobs.pop_front() {
worker.0.send(j).await.ok();
} else {
self.workers.push(worker);
}
}
pub async fn handle_report(&mut self, report: Report<J>) {
// Reschedule job to run again
let send_work = self.send_work.clone();
let job = report.for_job;
match report.result {
Ok(()) => {
tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
send_work.send(job).await.unwrap();
});
},
Err(e) => {
// TODO mark chore as blocked
println!("task panicked");
}
}
}
@@ -30,44 +116,21 @@ impl<J: Job> Sched<J> {
.build()?;
runtime.block_on(async {
let mut workers = Vec::<Worker<J>>::new();
let mut jobs = VecDeque::<J>::new();
loop {
let shutdown_watcher = shutdown_watcher();
tokio::select! {
_ = shutdown_watcher => break,
worker = self.worker.1.recv() => {
// Assign to next job in queue, if nonempty
if let Some(j) = jobs.pop_front() {
worker.unwrap().0.send(j).await.unwrap();
} else {
workers.push(worker.unwrap());
}
worker = self.recv_worker.recv() => {
let worker = worker.expect("worker channel closed");
self.handle_worker(worker).await;
},
job = self.work.1.recv() => {
// Assign to first worker in pool, if nonempty
if let Some(w) = workers.pop() {
w.0.send(job.unwrap()).await.unwrap();
} else {
jobs.push_back(job.unwrap());
}
job = self.recv_work.recv() => {
let job = job.expect("job channel closed");
self.handle_job(job).await;
},
report = self.report.1.recv() => {
// Reschedule job to run again
let send_work = self.work.0.clone();
let report = report.unwrap();
let job = report.for_job;
match report.result {
Ok(()) => {
tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
send_work.send(job).await.unwrap();
});
},
Err(e) => {
println!("task panicked");
}
}
report = self.recv_report.recv() => {
let report = report.expect("report channel closed");
self.handle_report(report).await;
}
}
}
@@ -95,13 +158,9 @@ mod tests {
#[tokio::test]
async fn sched_1() {
let s = Sched::<PrintJob>::new();
let (sched, spawner) = Sched::<PrintJob>::new();
let send_work = s.work.0.clone();
// Used for workers
let enlist = s.worker.0.clone();
let report = s.report.0.clone();
let send_work = sched.send_work.clone();
// Spawn scheduler
thread_mgr::spawn(
@@ -111,21 +170,11 @@ mod tests {
"gc_scheduler",
true,
move || {
s.run()
sched.run()
},
).unwrap();
// Spawn worker 1
thread_mgr::spawn(
ThreadKind::GcWorker,
None,
None,
"gc_worker_1",
true,
move || {
run_worker(enlist, report)
},
).unwrap();
spawner.spawn_worker();
// Send a job
let j = PrintJob {

View File

@@ -7,10 +7,16 @@ use std::any::Any;
use std::panic::AssertUnwindSafe;
use std::panic::catch_unwind;
pub trait Job: std::fmt::Debug + Send + 'static {
pub trait Job: std::fmt::Debug + Send + 'static + Clone {
fn run(&self);
}
// TODO make scheduler an async fn, leave rescheduling to chore_mgr
pub struct Work<J: Job> {
pub job: J,
pub when_done: Sender<Report<J>>,
}
#[derive(Debug)]
pub struct Worker<J: Job>(pub Sender<J>);