Add shutdown command

This commit is contained in:
Bojan Serafimov
2022-05-22 12:34:53 -04:00
parent b9814222f9
commit 13b374a2df

View File

@@ -1,74 +1,49 @@
use std::{collections::VecDeque, sync::{Condvar, Mutex}, time::Duration};
use crate::thread_mgr::{is_shutdown_requested, shutdown_watcher};
use std::{collections::VecDeque, sync::{Condvar, Mutex}};
pub trait Job: std::fmt::Debug + Send + 'static {
fn run(&self);
}
#[derive(Debug)]
enum WorkerCommand<J: Job> {
Shutdown,
DoJob(J),
}
#[derive(Debug)]
struct Pool<J: Job> {
job_queue: Mutex<VecDeque<J>>,
job_queue: Mutex<VecDeque<WorkerCommand<J>>>,
condvar: Condvar, // Notified when queue becomes nonempty
}
impl<J: Job> Pool<J> {
fn new() -> Self {
Pool {
job_queue: Mutex::new(VecDeque::<J>::new()),
job_queue: Mutex::new(VecDeque::<WorkerCommand<J>>::new()),
condvar: Condvar::new(),
}
}
fn worker_main_2(&self) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
runtime.block_on(async {
loop {
tokio::select! {
_ = shutdown_watcher() => break,
// TODO i need tokio::sync::Mutex
q = self.job_queue.lock() {
if let Some(job) = q.pop_front() {
drop(q);
job.run();
q = self.job_queue.lock().unwrap();
} else {
// TODO can't wait here, i might want to shut down
// q = self.condvar.wait(q).unwrap();
std::thread::sleep(Duration::from_millis(5))
}
},
};
}
});
Ok(())
}
fn worker_main(&self) -> anyhow::Result<()> {
let mut q = self.job_queue.lock().unwrap();
while !is_shutdown_requested() {
if let Some(job) = q.pop_front() {
loop {
if let Some(command) = q.pop_front() {
drop(q);
job.run();
match command {
WorkerCommand::DoJob(job) => job.run(),
WorkerCommand::Shutdown => return Ok(()),
}
q = self.job_queue.lock().unwrap();
} else {
// TODO can't wait here, i might want to shut down
// q = self.condvar.wait(q).unwrap();
std::thread::sleep(Duration::from_millis(5))
q = self.condvar.wait(q).unwrap();
}
}
Ok(())
}
fn queue_job(&self, job: J) {
// Add the job to the back of the queue
let mut q = self.job_queue.lock().unwrap();
q.push_back(job);
q.push_back(WorkerCommand::DoJob(job));
// If the queue was empty, wake up the next worker thread to pick up
// the job.
@@ -76,6 +51,19 @@ impl<J: Job> Pool<J> {
self.condvar.notify_one();
}
}
fn shutdown_one(&self) {
// Add shutdown command to the front of the queue
let mut q = self.job_queue.lock().unwrap();
q.push_front(WorkerCommand::Shutdown);
// If the queue was empty, wake up the next worker thread.
if q.len() == 1 {
self.condvar.notify_one();
}
// TODO wait?
}
}
#[cfg(test)]
@@ -123,8 +111,6 @@ mod tests {
TEST_POOL.get().unwrap().queue_job(j.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
thread_mgr::shutdown_threads(None, None, None);
TEST_POOL.get().unwrap().queue_job(j.clone());
TEST_POOL.get().unwrap().shutdown_one();
}
}