mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Make condvar shutdown-aware
This commit is contained in:
@@ -1 +0,0 @@
|
||||
|
||||
@@ -6,10 +6,12 @@ use std::{
|
||||
fmt::Debug,
|
||||
hash::Hash,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::{Condvar, Mutex},
|
||||
sync::{Arc, Condvar, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use crate::thread_mgr::{get_shutdown_aware_condvar, is_shutdown_requested};
|
||||
|
||||
lazy_static! {
|
||||
static ref POOL_UTILIZATION_GAUGE: GaugeVec = register_gauge_vec!(
|
||||
"pageserver_pool_utilization",
|
||||
@@ -119,13 +121,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct Pool<J: Job>
|
||||
where
|
||||
J::ErrorType: Debug,
|
||||
{
|
||||
job_table: Mutex<JobStatusTable<J>>,
|
||||
condvar: Condvar, // Notified when idle worker should wake up
|
||||
condvar: Arc<Condvar>, // Notified when idle worker should wake up
|
||||
}
|
||||
|
||||
impl<J: Job> Pool<J>
|
||||
@@ -138,14 +140,13 @@ where
|
||||
status: HashMap::<J, JobStatus<J>>::new(),
|
||||
queue: BinaryHeap::<Deadline<J>>::new(),
|
||||
}),
|
||||
condvar: Condvar::new(),
|
||||
condvar: get_shutdown_aware_condvar(),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO listen for shutdown request?
|
||||
pub fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
|
||||
let mut job_table = self.job_table.lock().unwrap();
|
||||
loop {
|
||||
while !is_shutdown_requested() {
|
||||
if let Some(Deadline { job, .. }) = job_table.pop_due() {
|
||||
job_table.set_status(
|
||||
&job,
|
||||
@@ -204,6 +205,8 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn queue_job(&self, job: J) {
|
||||
|
||||
@@ -37,7 +37,7 @@ use std::collections::HashMap;
|
||||
use std::panic;
|
||||
use std::panic::AssertUnwindSafe;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
@@ -59,6 +59,17 @@ lazy_static! {
|
||||
|
||||
/// Global registry of threads
|
||||
static ref THREADS: Mutex<HashMap<u64, Arc<PageServerThread>>> = Mutex::new(HashMap::new());
|
||||
|
||||
// TODO make these per thread?
|
||||
/// Condvars to notify after shutdown request
|
||||
static ref SHUTDOWN_CONDVARS: Mutex<Vec<Arc<Condvar>>> = Mutex::new(Vec::new());
|
||||
}
|
||||
|
||||
/// Return a condvar which will receive a notify_all() call when shutdown is requested
|
||||
pub fn get_shutdown_aware_condvar() -> Arc<Condvar> {
|
||||
let mut condvars = SHUTDOWN_CONDVARS.lock().unwrap();
|
||||
condvars.push(Arc::new(Condvar::new()));
|
||||
condvars.last().unwrap().clone()
|
||||
}
|
||||
|
||||
// There is a Tokio watch channel for each thread, which can be used to signal the
|
||||
@@ -297,6 +308,10 @@ pub fn shutdown_threads(
|
||||
}
|
||||
drop(threads);
|
||||
|
||||
for condvar in SHUTDOWN_CONDVARS.lock().unwrap().iter() {
|
||||
condvar.notify_all();
|
||||
}
|
||||
|
||||
for thread in victim_threads {
|
||||
info!("waiting for {} to shut down", thread.name);
|
||||
if let Some(join_handle) = thread.join_handle.lock().unwrap().take() {
|
||||
|
||||
Reference in New Issue
Block a user