From a8f0124af75d81dfbcac578d289610e61cc350f0 Mon Sep 17 00:00:00 2001 From: Bojan Serafimov Date: Wed, 25 May 2022 11:28:50 -0400 Subject: [PATCH] Make condvar shutdown-aware --- pageserver/src/jobs.rs | 1 - pageserver/src/tenant_jobs/worker.rs | 15 +++++++++------ pageserver/src/thread_mgr.rs | 17 ++++++++++++++++- 3 files changed, 25 insertions(+), 8 deletions(-) delete mode 100644 pageserver/src/jobs.rs diff --git a/pageserver/src/jobs.rs b/pageserver/src/jobs.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/pageserver/src/jobs.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/pageserver/src/tenant_jobs/worker.rs b/pageserver/src/tenant_jobs/worker.rs index de308799dc..d8ad72b377 100644 --- a/pageserver/src/tenant_jobs/worker.rs +++ b/pageserver/src/tenant_jobs/worker.rs @@ -6,10 +6,12 @@ use std::{ fmt::Debug, hash::Hash, panic::{self, AssertUnwindSafe}, - sync::{Condvar, Mutex}, + sync::{Arc, Condvar, Mutex}, time::Instant, }; +use crate::thread_mgr::{get_shutdown_aware_condvar, is_shutdown_requested}; + lazy_static! { static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!( "pageserver_pool_utilization", @@ -119,13 +121,13 @@ where } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Pool where J::ErrorType: Debug, { job_table: Mutex>, - condvar: Condvar, // Notified when idle worker should wake up + condvar: Arc, // Notified when idle worker should wake up } impl Pool @@ -138,14 +140,13 @@ where status: HashMap::>::new(), queue: BinaryHeap::>::new(), }), - condvar: Condvar::new(), + condvar: get_shutdown_aware_condvar(), } } - // TODO listen for shutdown request? pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> { let mut job_table = self.job_table.lock().unwrap(); - loop { + while !is_shutdown_requested() { if let Some(Deadline { job, .. }) = job_table.pop_due() { job_table.set_status( &job, @@ -204,6 +205,8 @@ where } } } + + Ok(()) } pub fn queue_job(&self, job: J) { diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index 94147fc028..a30eae2be4 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -37,7 +37,7 @@ use std::collections::HashMap; use std::panic; use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::thread::JoinHandle; @@ -59,6 +59,17 @@ lazy_static! { /// Global registry of threads static ref THREADS: Mutex>> = Mutex::new(HashMap::new()); + + // TODO make these per thread? + /// Condvars to notify after shutdown request + static ref SHUTDOWN_CONDVARS: Mutex>> = Mutex::new(Vec::new()); +} + +/// Return a condvar which will receive a notify_all() call when shutdown is requested +pub fn get_shutdown_aware_condvar() -> Arc { + let mut condvars = SHUTDOWN_CONDVARS.lock().unwrap(); + condvars.push(Arc::new(Condvar::new())); + condvars.last().unwrap().clone() } // There is a Tokio watch channel for each thread, which can be used to signal the @@ -297,6 +308,10 @@ pub fn shutdown_threads( } drop(threads); + for condvar in SHUTDOWN_CONDVARS.lock().unwrap().iter() { + condvar.notify_all(); + } + for thread in victim_threads { info!("waiting for {} to shut down", thread.name); if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {