diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index df43b8c0df..f6a088d4b5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -370,6 +370,10 @@ impl PageServerHandler { ) -> anyhow::Result<()> { let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered(); + // NOTE: pagerequests handler exits when connection is closed, + // so there is no need to reset the association + thread_mgr::associate_with(Some(tenantid), Some(timelineid)); + // Check that the timeline exists let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) .context("Cannot load local timeline")?; @@ -802,7 +806,6 @@ impl postgres_backend::Handler for PageServerHandler { .map(|h| h.as_str().parse()) .unwrap_or_else(|| Ok(repo.get_gc_horizon()))?; - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; // Use tenant's pitr setting let pitr = repo.get_pitr_interval(); let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?; diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index 8264bdd97c..6e4bc1a787 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -108,15 +108,21 @@ pub enum ThreadKind { StorageSync, } +struct MutableThreadState { + /// Tenant and timeline that this thread is associated with. + tenant_id: Option, + timeline_id: Option, + + /// Handle for waiting for the thread to exit. It can be None, if the + /// the thread has already exited. + join_handle: Option>, +} + struct PageServerThread { _thread_id: u64, kind: ThreadKind, - /// Tenant and timeline that this thread is associated with. - tenant_id: Option, - timeline_id: Option, - name: String, // To request thread shutdown, set the flag, and send a dummy message to the @@ -124,9 +130,7 @@ struct PageServerThread { shutdown_requested: AtomicBool, shutdown_tx: watch::Sender<()>, - /// Handle for waiting for the thread to exit. It can be None, if the - /// the thread has already exited. - join_handle: Mutex>>, + mutable: Mutex, } /// Launch a new thread @@ -145,29 +149,27 @@ where { let (shutdown_tx, shutdown_rx) = watch::channel(()); let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed); - let thread = PageServerThread { + let thread = Arc::new(PageServerThread { _thread_id: thread_id, kind, - tenant_id, - timeline_id, name: name.to_string(), - shutdown_requested: AtomicBool::new(false), shutdown_tx, - - join_handle: Mutex::new(None), - }; - - let thread_rc = Arc::new(thread); - - let mut jh_guard = thread_rc.join_handle.lock().unwrap(); + mutable: Mutex::new(MutableThreadState { + tenant_id, + timeline_id, + join_handle: None, + }), + }); THREADS .lock() .unwrap() - .insert(thread_id, Arc::clone(&thread_rc)); + .insert(thread_id, Arc::clone(&thread)); - let thread_rc2 = Arc::clone(&thread_rc); + let mut thread_mut = thread.mutable.lock().unwrap(); + + let thread_cloned = Arc::clone(&thread); let thread_name = name.to_string(); let join_handle = match thread::Builder::new() .name(name.to_string()) @@ -175,7 +177,7 @@ where thread_wrapper( thread_name, thread_id, - thread_rc2, + thread_cloned, shutdown_rx, shutdown_process_on_error, f, @@ -189,8 +191,8 @@ where return Err(err); } }; - *jh_guard = Some(join_handle); - drop(jh_guard); + thread_mut.join_handle = Some(join_handle); + drop(thread_mut); // The thread is now running. Nothing more to do here Ok(thread_id) @@ -229,19 +231,20 @@ fn thread_wrapper( .remove(&thread_id) .expect("no thread in registry"); + let thread_mut = thread.mutable.lock().unwrap(); match result { Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name), Ok(Err(err)) => { if shutdown_process_on_error { error!( "Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - thread_name, thread.tenant_id, thread.timeline_id, err + thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err ); shutdown_pageserver(1); } else { error!( "Thread '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - thread_name, thread.tenant_id, thread.timeline_id, err + thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err ); } } @@ -249,19 +252,29 @@ fn thread_wrapper( if shutdown_process_on_error { error!( "Shutting down: thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - thread_name, thread.tenant_id, thread.timeline_id, err + thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err ); shutdown_pageserver(1); } else { error!( "Thread '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - thread_name, thread.tenant_id, thread.timeline_id, err + thread_name, thread_mut.tenant_id, thread_mut.timeline_id, err ); } } } } +// expected to be called from the thread of the given id. +pub fn associate_with(tenant_id: Option, timeline_id: Option) { + CURRENT_THREAD.with(|ct| { + let borrowed = ct.borrow(); + let mut thread_mut = borrowed.as_ref().unwrap().mutable.lock().unwrap(); + thread_mut.tenant_id = tenant_id; + thread_mut.timeline_id = timeline_id; + }); +} + /// Is there a thread running that matches the criteria /// Signal and wait for threads to shut down. @@ -285,9 +298,10 @@ pub fn shutdown_threads( let threads = THREADS.lock().unwrap(); for thread in threads.values() { + let thread_mut = thread.mutable.lock().unwrap(); if (kind.is_none() || Some(thread.kind) == kind) - && (tenant_id.is_none() || thread.tenant_id == tenant_id) - && (timeline_id.is_none() || thread.timeline_id == timeline_id) + && (tenant_id.is_none() || thread_mut.tenant_id == tenant_id) + && (timeline_id.is_none() || thread_mut.timeline_id == timeline_id) { thread.shutdown_requested.store(true, Ordering::Relaxed); // FIXME: handle error? @@ -298,8 +312,10 @@ pub fn shutdown_threads( drop(threads); for thread in victim_threads { + let mut thread_mut = thread.mutable.lock().unwrap(); info!("waiting for {} to shut down", thread.name); - if let Some(join_handle) = thread.join_handle.lock().unwrap().take() { + if let Some(join_handle) = thread_mut.join_handle.take() { + drop(thread_mut); let _ = join_handle.join(); } else { // The thread had not even fully started yet. Or it was shut down