From 13b374a2df0999ce2dcfa7626d51dcd883ff235b Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Sun, 22 May 2022 12:34:53 -0400 Subject: [PATCH] Add shutdown command --- pageserver/src/jobs.rs | 76 +++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 45 deletions(-) diff --git a/pageserver/src/jobs.rs b/pageserver/src/jobs.rs index f6cf6147b2..ff38ba0829 100644 --- a/pageserver/src/jobs.rs +++ b/pageserver/src/jobs.rs @@ -1,74 +1,49 @@ -use std::{collections::VecDeque, sync::{Condvar, Mutex}, time::Duration}; - -use crate::thread_mgr::{is_shutdown_requested, shutdown_watcher}; +use std::{collections::VecDeque, sync::{Condvar, Mutex}}; pub trait Job: std::fmt::Debug + Send + 'static { fn run(&self); } +#[derive(Debug)] +enum WorkerCommand { + Shutdown, + DoJob(J), +} + #[derive(Debug)] struct Pool { - job_queue: Mutex>, + job_queue: Mutex>>, condvar: Condvar, // Notified when queue becomes nonempty } impl Pool { fn new() -> Self { Pool { - job_queue: Mutex::new(VecDeque::::new()), + job_queue: Mutex::new(VecDeque::>::new()), condvar: Condvar::new(), } } - fn worker_main_2(&self) -> anyhow::Result<()> { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - runtime.block_on(async { - loop { - tokio::select! { - _ = shutdown_watcher() => break, - // TODO i need tokio::sync::Mutex - q = self.job_queue.lock() { - if let Some(job) = q.pop_front() { - drop(q); - job.run(); - q = self.job_queue.lock().unwrap(); - } else { - // TODO can't wait here, i might want to shut down - // q = self.condvar.wait(q).unwrap(); - std::thread::sleep(Duration::from_millis(5)) - } - }, - }; - } - }); - - Ok(()) - } - fn worker_main(&self) -> anyhow::Result<()> { let mut q = self.job_queue.lock().unwrap(); - while !is_shutdown_requested() { - if let Some(job) = q.pop_front() { + loop { + if let Some(command) = q.pop_front() { drop(q); - job.run(); + match command { + WorkerCommand::DoJob(job) => job.run(), + WorkerCommand::Shutdown => return Ok(()), + } q = self.job_queue.lock().unwrap(); } else { - // TODO can't wait here, i might want to shut down - // q = self.condvar.wait(q).unwrap(); - std::thread::sleep(Duration::from_millis(5)) + q = self.condvar.wait(q).unwrap(); } } - - Ok(()) } fn queue_job(&self, job: J) { // Add the job to the back of the queue let mut q = self.job_queue.lock().unwrap(); - q.push_back(job); + q.push_back(WorkerCommand::DoJob(job)); // If the queue was empty, wake up the next worker thread to pick up // the job. @@ -76,6 +51,19 @@ impl Pool { self.condvar.notify_one(); } } + + fn shutdown_one(&self) { + // Add shutdown command to the front of the queue + let mut q = self.job_queue.lock().unwrap(); + q.push_front(WorkerCommand::Shutdown); + + // If the queue was empty, wake up the next worker thread. + if q.len() == 1 { + self.condvar.notify_one(); + } + + // TODO wait? + } } #[cfg(test)] @@ -123,8 +111,6 @@ mod tests { TEST_POOL.get().unwrap().queue_job(j.clone()); tokio::time::sleep(Duration::from_millis(100)).await; - - thread_mgr::shutdown_threads(None, None, None); - TEST_POOL.get().unwrap().queue_job(j.clone()); + TEST_POOL.get().unwrap().shutdown_one(); } }