From e308265e424e747a8a5fc9e61040cd81281d6c0c Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Tue, 12 Jul 2022 23:07:26 +0300 Subject: [PATCH] register tenants task thread pool threads in thread_mgr needed to avoid this warning: is_shutdown_requested() called in an unexpected thread --- pageserver/src/tenant_tasks.rs | 4 ++ pageserver/src/thread_mgr.rs | 70 +++++++++++++++++++++++++++++++--- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index b0bb4953ca..e51744d3cc 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -120,6 +120,10 @@ pub fn init_tenant_task_pool() -> anyhow::Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("tenant-task-worker") .enable_all() + .on_thread_start(|| { + thread_mgr::register(ThreadKind::TenantTaskWorker, "tenant-task-worker") + }) + .on_thread_stop(thread_mgr::deregister) .build()?; let (gc_send, mut gc_recv) = mpsc::channel::(100); diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index ab0d894c70..6dd2e4b00b 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -97,6 +97,9 @@ pub enum ThreadKind { // Thread that schedules new compaction and gc jobs TenantTaskManager, + // Worker thread for tenant tasks thread pool + TenantTaskWorker, + // Thread that flushes frozen in-memory layers to disk LayerFlushThread, @@ -105,18 +108,20 @@ pub enum ThreadKind { StorageSync, } +#[derive(Default)] struct MutableThreadState { /// Tenant and timeline that this thread is associated with. tenant_id: Option, timeline_id: Option, /// Handle for waiting for the thread to exit. It can be None, if the - /// the thread has already exited. + /// the thread has already exited. OR if this thread is managed externally + /// and was not spawned through thread_mgr.rs::spawn function. join_handle: Option>, } struct PageServerThread { - _thread_id: u64, + thread_id: u64, kind: ThreadKind, @@ -147,7 +152,7 @@ where let (shutdown_tx, shutdown_rx) = watch::channel(()); let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed); let thread = Arc::new(PageServerThread { - _thread_id: thread_id, + thread_id, kind, name: name.to_string(), shutdown_requested: AtomicBool::new(false), @@ -315,8 +320,10 @@ pub fn shutdown_threads( drop(thread_mut); let _ = join_handle.join(); } else { - // The thread had not even fully started yet. Or it was shut down - // concurrently and already exited + // Possibly one of: + // * The thread had not even fully started yet. + // * It was shut down concurrently and already exited + // * Is managed through `register`/`deregister` fns without providing a join handle } } } @@ -348,3 +355,56 @@ pub fn is_shutdown_requested() -> bool { } }) } + +/// Needed to register threads that were not spawned through spawn function. +/// For example tokio blocking threads. This function is expected to be used +/// in tandem with `deregister`. +/// NOTE: threads registered through this function cannot be joined +pub fn register(kind: ThreadKind, name: &str) { + CURRENT_THREAD.with(|ct| { + let mut borrowed = ct.borrow_mut(); + if borrowed.is_some() { + panic!("thread already registered") + }; + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed); + + let thread = Arc::new(PageServerThread { + thread_id, + kind, + name: name.to_owned(), + shutdown_requested: AtomicBool::new(false), + shutdown_tx, + mutable: Mutex::new(MutableThreadState { + tenant_id: None, + timeline_id: None, + join_handle: None, + }), + }); + + *borrowed = Some(Arc::clone(&thread)); + + SHUTDOWN_RX.with(|rx| { + *rx.borrow_mut() = Some(shutdown_rx); + }); + + THREADS.lock().unwrap().insert(thread_id, thread); + }); +} + +// Expected to be used in tandem with `register`. See the doc for `register` for more details +pub fn deregister() { + CURRENT_THREAD.with(|ct| { + let mut borrowed = ct.borrow_mut(); + let thread = match borrowed.take() { + Some(thread) => thread, + None => panic!("calling deregister on unregistered thread"), + }; + + SHUTDOWN_RX.with(|rx| { + *rx.borrow_mut() = None; + }); + + THREADS.lock().unwrap().remove(&thread.thread_id) + }); +}