Revert "WIP"

This reverts commit b70cf3f7b1.
This commit is contained in:
Bojan Serafimov
2022-05-23 14:07:38 -04:00
parent b70cf3f7b1
commit 77c35906f5
2 changed files with 67 additions and 47 deletions

View File

@@ -5,10 +5,7 @@ use crate::thread_mgr::shutdown_watcher;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use tokio::time::sleep;
use super::worker::{Job, Worker, Report, Work};
// TODO spawn a tokio task for each tenant
// x) Why not use the simpler worker implementation then?
use super::worker::{Job, Worker, Report};
#[derive(Debug)]
pub struct Sched<J: Job> {
@@ -16,18 +13,22 @@ pub struct Sched<J: Job> {
workers: Vec<Worker<J>>,
/// Queued due jobs
work_queue: VecDeque<Work<J>>,
jobs: VecDeque<J>,
/// Channel for registering due jobs
pub send_work: Sender<Work<J>>,
recv_work: Receiver<Work<J>>,
pub send_work: Sender<J>, // TODO should job specify report destination?
recv_work: Receiver<J>,
/// Channel for enlisting idle workers
recv_worker: Receiver<Worker<J>>,
/// Channel where workers report results
recv_report: Receiver<Report<J>>,
}
pub struct Spawner<J: Job> {
send_worker: Sender<Worker<J>>,
send_report: Sender<Report<J>>,
}
impl<J: Job> Spawner<J> {
@@ -35,6 +36,7 @@ impl<J: Job> Spawner<J> {
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
let enlist = self.send_worker.clone();
let report = self.send_report.clone();
thread_mgr::spawn(
ThreadKind::GcWorker,
None,
@@ -42,7 +44,7 @@ impl<J: Job> Spawner<J> {
"gc_worker_1",
true,
move || {
run_worker(enlist)
run_worker(enlist, report)
},
).unwrap();
}
@@ -51,41 +53,63 @@ impl<J: Job> Spawner<J> {
impl<J: Job> Sched<J> {
pub fn new() -> (Sched<J>, Spawner<J>) {
let worker = channel::<Worker<J>>(100);
let work = channel::<Work<J>>(100);
let work = channel::<J>(100);
let report = channel::<Report<J>>(100);
let sched = Sched {
workers: vec![],
work_queue: VecDeque::new(),
jobs: VecDeque::new(),
recv_worker: worker.1,
send_work: work.0,
recv_work: work.1,
recv_report: report.1,
};
let spawner = Spawner {
send_worker: worker.0,
send_report: report.0,
};
(sched, spawner)
}
pub async fn handle_work(&mut self, work: Work<J>) {
pub async fn handle_job(&mut self, job: J) {
// Assign to a worker if any are availabe
while let Some(worker) = self.workers.pop() {
if let Ok(()) = worker.0.send(work.clone()).await {
while let Some(w) = self.workers.pop() {
if let Ok(()) = w.0.send(job.clone()).await {
return;
}
}
self.work_queue.push_back(work);
self.jobs.push_back(job);
}
pub async fn handle_worker(&mut self, worker: Worker<J>) {
// Assign jobs if any are queued
if let Some(j) = self.work_queue.pop_front() {
if let Some(j) = self.jobs.pop_front() {
worker.0.send(j).await.ok();
} else {
self.workers.push(worker);
}
}
pub async fn handle_report(&mut self, report: Report<J>) {
// Reschedule job to run again
let send_work = self.send_work.clone();
let job = report.for_job;
match report.result {
Ok(()) => {
tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
send_work.send(job).await.unwrap();
});
},
Err(e) => {
// TODO mark chore as blocked
println!("task panicked");
}
}
}
pub fn run(mut self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -100,10 +124,14 @@ impl<J: Job> Sched<J> {
let worker = worker.expect("worker channel closed");
self.handle_worker(worker).await;
},
work = self.recv_work.recv() => {
let work = work.expect("job channel closed");
self.handle_work(work).await;
job = self.recv_work.recv() => {
let job = job.expect("job channel closed");
self.handle_job(job).await;
},
report = self.recv_report.recv() => {
let report = report.expect("report channel closed");
self.handle_report(report).await;
}
}
}
});
@@ -114,7 +142,7 @@ impl<J: Job> Sched<J> {
#[cfg(test)]
mod tests {
use crate::thread_mgr::{self, ThreadKind};
use crate::{jobs::worker::run_worker, thread_mgr::{self, ThreadKind}};
use super::*;
#[derive(Debug, Clone, Eq, PartialEq)]
@@ -149,14 +177,10 @@ mod tests {
spawner.spawn_worker();
// Send a job
let when_done = channel::<Report<PrintJob>>(100);
let work = Work {
job: PrintJob {
to_print: "hello from job".to_string(),
},
when_done: when_done.0,
let j = PrintJob {
to_print: "hello from job".to_string(),
};
send_work.send(work.clone()).await.unwrap();
send_work.send(j.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;

View File

@@ -12,14 +12,13 @@ pub trait Job: std::fmt::Debug + Send + 'static + Clone {
}
// TODO make scheduler an async fn, leave rescheduling to chore_mgr
#[derive(Debug, Clone)]
pub struct Work<J: Job> {
pub job: J,
pub when_done: Sender<Report<J>>,
}
#[derive(Debug, Clone)]
pub struct Worker<J: Job>(pub Sender<Work<J>>);
#[derive(Debug)]
pub struct Worker<J: Job>(pub Sender<J>);
#[derive(Debug)]
pub struct Report<J: Job> {
@@ -27,26 +26,26 @@ pub struct Report<J: Job> {
pub result: Result<(), Box<dyn Any + Send>>
}
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>) -> anyhow::Result<()> {
pub fn run_worker<J: Job>(enlist: Sender<Worker<J>>, report: Sender<Report<J>>) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
loop {
let (send_work, mut get_work) = channel::<Work<J>>(100);
let (send_work, mut get_work) = channel::<J>(100);
enlist.send(Worker(send_work)).await.unwrap();
let shutdown_watcher = shutdown_watcher();
tokio::select! {
_ = shutdown_watcher => break,
w = get_work.recv() => {
if let Some(work) = w {
j = get_work.recv() => {
if let Some(job) = j {
let result = catch_unwind(AssertUnwindSafe(|| {
work.job.run();
job.run();
}));
work.when_done.send(Report {
for_job: work.job,
report.send(Report {
for_job: job,
result: result,
}).await.unwrap();
} else {
@@ -81,6 +80,7 @@ mod tests {
#[tokio::test]
async fn worker_1() {
let mut worker = channel::<Worker<PrintJob>>(100);
let mut result = channel::<Report<PrintJob>>(100);
thread_mgr::spawn(
ThreadKind::GcWorker,
@@ -89,23 +89,19 @@ mod tests {
"gc_worker_1",
true,
move || {
run_worker(worker.0)
run_worker(worker.0, result.0)
},
).unwrap();
let mut when_done = channel::<Report<PrintJob>>(100);
let work = Work {
job: PrintJob {
to_print: "hello from job".to_string(),
},
when_done: when_done.0,
let j = PrintJob {
to_print: "hello from job".to_string(),
};
let worker = worker.1.recv().await.unwrap();
worker.0.send(work.clone()).await.unwrap();
let w = worker.1.recv().await.unwrap();
w.0.send(j.clone()).await.unwrap();
println!("waiting for result");
let report = when_done.1.recv().await.unwrap();
assert_eq!(work.job, report.for_job);
let report = result.1.recv().await.unwrap();
assert_eq!(j, report.for_job);
println!("got result");
thread_mgr::shutdown_threads(None, None, None);