diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5a1b5e5e2c..14249963de 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -26,7 +26,6 @@ use pageserver::{ timelines, virtual_file, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; -use zenith_utils::postgres_backend; use zenith_utils::shutdown::exit_now; use zenith_utils::signals::{self, Signal}; @@ -322,38 +321,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() "Got {}. Terminating gracefully in fast shutdown mode", signal.name() ); - shutdown_pageserver(); + pageserver::shutdown_pageserver(); unreachable!() } }) } - -fn shutdown_pageserver() { - // Shut down the libpq endpoint thread. This prevents new connections from - // being accepted. - thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None); - - // Shut down any page service threads. - postgres_backend::set_pgbackend_shutdown_requested(); - thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None); - - // Shut down all the tenants. This flushes everything to disk and kills - // the checkpoint and GC threads. - tenant_mgr::shutdown_all_tenants(); - - // Stop syncing with remote storage. - // - // FIXME: Does this wait for the sync thread to finish syncing what's queued up? - // Should it? - thread_mgr::shutdown_threads(Some(ThreadKind::StorageSync), None, None); - - // Shut down the HTTP endpoint last, so that you can still check the server's - // status while it's shutting down. - thread_mgr::shutdown_threads(Some(ThreadKind::HttpEndpointListener), None, None); - - // There should be nothing left, but let's be sure - thread_mgr::shutdown_threads(None, None, None); - - info!("Shut down successfully completed"); - std::process::exit(0); -} diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 7ec11add9c..ac0afcb275 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -976,7 +976,7 @@ impl Timeline for LayeredTimeline { /// Public entry point for checkpoint(). All the logic is in the private /// checkpoint_internal function, this public facade just wraps it for /// metrics collection. - fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> { + fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> { match cconf { CheckpointConfig::Flush => self .flush_checkpoint_time_histo diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 3d66192c80..060fa54b23 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -19,8 +19,14 @@ pub mod walrecord; pub mod walredo; use lazy_static::lazy_static; +use tracing::info; use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::{ + postgres_backend, + zid::{ZTenantId, ZTimelineId}, +}; + +use crate::thread_mgr::ThreadKind; lazy_static! { static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( @@ -43,3 +49,33 @@ pub enum CheckpointConfig { // Flush all in-memory data and reconstruct all page images Forced, } + +pub fn shutdown_pageserver() { + // Shut down the libpq endpoint thread. This prevents new connections from + // being accepted. + thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None); + + // Shut down any page service threads. + postgres_backend::set_pgbackend_shutdown_requested(); + thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None); + + // Shut down all the tenants. This flushes everything to disk and kills + // the checkpoint and GC threads. + tenant_mgr::shutdown_all_tenants(); + + // Stop syncing with remote storage. + // + // FIXME: Does this wait for the sync thread to finish syncing what's queued up? + // Should it? + thread_mgr::shutdown_threads(Some(ThreadKind::StorageSync), None, None); + + // Shut down the HTTP endpoint last, so that you can still check the server's + // status while it's shutting down. + thread_mgr::shutdown_threads(Some(ThreadKind::HttpEndpointListener), None, None); + + // There should be nothing left, but let's be sure + thread_mgr::shutdown_threads(None, None, None); + + info!("Shut down successfully completed"); + std::process::exit(0); +} diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index d24d6bf016..c4202e80be 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -43,12 +43,14 @@ use std::thread::JoinHandle; use tokio::sync::watch; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use lazy_static::lazy_static; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use crate::shutdown_pageserver; + lazy_static! { /// Each thread that we track is associated with a "thread ID". It's just /// an increasing number that we assign, not related to any system thread @@ -125,7 +127,7 @@ struct PageServerThread { } /// Launch a new thread -pub fn spawn( +pub fn spawn( kind: ThreadKind, tenant_id: Option, timeline_id: Option, @@ -133,7 +135,7 @@ pub fn spawn( f: F, ) -> std::io::Result<()> where - F: FnOnce() -> Result<(), E> + Send + 'static, + F: FnOnce() -> anyhow::Result<()> + Send + 'static, { let (shutdown_tx, shutdown_rx) = watch::channel(()); let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed); @@ -160,12 +162,14 @@ where .insert(thread_id, Arc::clone(&thread_rc)); let thread_rc2 = Arc::clone(&thread_rc); + let thread_name = name.to_string(); let join_handle = match thread::Builder::new() .name(name.to_string()) - .spawn(move || thread_wrapper(thread_id, thread_rc2, shutdown_rx, f)) + .spawn(move || thread_wrapper(thread_name, thread_id, thread_rc2, shutdown_rx, f)) { Ok(handle) => handle, Err(err) => { + error!("Failed to spawn thread '{}': {}", name, err); // Could not spawn the thread. Remove the entry THREADS.lock().unwrap().remove(&thread_id); return Err(err); @@ -180,13 +184,14 @@ where /// This wrapper function runs in a newly-spawned thread. It initializes the /// thread-local variables and calls the payload function -fn thread_wrapper( +fn thread_wrapper( + thread_name: String, thread_id: u64, thread: Arc, shutdown_rx: watch::Receiver<()>, f: F, ) where - F: FnOnce() -> Result<(), E> + Send + 'static, + F: FnOnce() -> anyhow::Result<()> + Send + 'static, { SHUTDOWN_RX.with(|rx| { *rx.borrow_mut() = Some(shutdown_rx); @@ -195,6 +200,8 @@ fn thread_wrapper( *ct.borrow_mut() = Some(thread); }); + info!("Starting thread '{}'", thread_name); + // We use AssertUnwindSafe here so that the payload function // doesn't need to be UnwindSafe. We don't do anything after the // unwinding that would expose us to unwind-unsafe behavior. @@ -203,9 +210,22 @@ fn thread_wrapper( // Remove our entry from the global hashmap. THREADS.lock().unwrap().remove(&thread_id); - // If the thread payload panic'd, exit with the panic. - if let Err(err) = result { - panic::resume_unwind(err); + match result { + Ok(Ok(())) => info!("Thread '{}' exited normally", thread_name), + Ok(Err(err)) => { + error!( + "Shutting down: thread '{}' exited with error: {:?}", + thread_name, err + ); + shutdown_pageserver(); + } + Err(err) => { + error!( + "Shutting down: thread '{}' panicked: {:?}", + thread_name, err + ); + shutdown_pageserver(); + } } }