From b39d1b17177eb6fe9509b87cb8908f8128ab78bc Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 24 Mar 2022 14:05:15 +0200 Subject: [PATCH] Exit only on important thread failures --- pageserver/src/bin/pageserver.rs | 2 ++ pageserver/src/page_service.rs | 1 + pageserver/src/remote_storage/storage_sync.rs | 8 ++--- pageserver/src/tenant_mgr.rs | 35 ++++++++++++------- pageserver/src/thread_mgr.rs | 34 ++++++++++++------ pageserver/src/walreceiver.rs | 11 +++--- 6 files changed, 57 insertions(+), 34 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 14249963de..e217806147 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -291,6 +291,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() None, None, "http_endpoint_thread", + false, move || { let router = http::make_router(conf, auth_cloned, remote_index); endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher()) @@ -304,6 +305,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() None, None, "libpq endpoint thread", + false, move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type), )?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6acdc8e93d..4744f0fe52 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -228,6 +228,7 @@ pub fn thread_main( None, None, "serving Page Service thread", + false, move || page_service_conn_main(conf, local_auth, socket, auth_type), ) { // Thread creation failed. Log the error and continue. diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 4ad28e6f8f..b01b152e0a 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -404,6 +404,7 @@ pub(super) fn spawn_storage_sync_thread< None, None, "Remote storage sync thread", + false, move || { storage_sync_loop( runtime, @@ -413,7 +414,8 @@ pub(super) fn spawn_storage_sync_thread< storage, max_concurrent_sync, max_sync_errors, - ) + ); + Ok(()) }, ) .context("Failed to spawn remote storage sync thread")?; @@ -440,7 +442,7 @@ fn storage_sync_loop< storage: S, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, -) -> anyhow::Result<()> { +) { let remote_assets = Arc::new((storage, Arc::clone(&index))); loop { let index = Arc::clone(&index); @@ -470,8 +472,6 @@ fn storage_sync_loop< } } } - - Ok(()) } async fn loop_step< diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 4d6dfd7488..0bc18231c9 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -206,13 +206,13 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option { /// Change the state of a tenant to Active and launch its checkpointer and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// -pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> { +pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> { let mut m = access_tenants(); let tenant = m - .get_mut(&tenantid) - .with_context(|| format!("Tenant not found for id {}", tenantid))?; + .get_mut(&tenant_id) + .with_context(|| format!("Tenant not found for id {}", tenant_id))?; - info!("activating tenant {}", tenantid); + info!("activating tenant {}", tenant_id); match tenant.state { // If the tenant is already active, nothing to do. @@ -222,22 +222,31 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re TenantState::Idle => { thread_mgr::spawn( ThreadKind::Checkpointer, - Some(tenantid), + Some(tenant_id), None, "Checkpointer thread", - move || crate::tenant_threads::checkpoint_loop(tenantid, conf), + true, + move || crate::tenant_threads::checkpoint_loop(tenant_id, conf), )?; - // FIXME: if we fail to launch the GC thread, but already launched the - // checkpointer, we're in a strange state. - - thread_mgr::spawn( + let gc_spawn_result = thread_mgr::spawn( ThreadKind::GarbageCollector, - Some(tenantid), + Some(tenant_id), None, "GC thread", - move || crate::tenant_threads::gc_loop(tenantid, conf), - )?; + true, + move || crate::tenant_threads::gc_loop(tenant_id, conf), + ) + .with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id)); + + if let Err(e) = &gc_spawn_result { + error!( + "Failed to start GC thread for tenant {}, stopping its checkpointer thread: {:?}", + tenant_id, e + ); + thread_mgr::shutdown_threads(Some(ThreadKind::Checkpointer), Some(tenant_id), None); + return gc_spawn_result; + } tenant.state = TenantState::Active; } diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index c4202e80be..cafdc5e700 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -43,7 +43,7 @@ use std::thread::JoinHandle; use tokio::sync::watch; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use lazy_static::lazy_static; @@ -132,6 +132,7 @@ pub fn spawn( tenant_id: Option, timeline_id: Option, name: &str, + fail_on_error: bool, f: F, ) -> std::io::Result<()> where @@ -165,8 +166,16 @@ where let thread_name = name.to_string(); let join_handle = match thread::Builder::new() .name(name.to_string()) - .spawn(move || thread_wrapper(thread_name, thread_id, thread_rc2, shutdown_rx, f)) - { + .spawn(move || { + thread_wrapper( + thread_name, + thread_id, + thread_rc2, + shutdown_rx, + fail_on_error, + f, + ) + }) { Ok(handle) => handle, Err(err) => { error!("Failed to spawn thread '{}': {}", name, err); @@ -189,6 +198,7 @@ fn thread_wrapper( thread_id: u64, thread: Arc, shutdown_rx: watch::Receiver<()>, + fail_on_error: bool, f: F, ) where F: FnOnce() -> anyhow::Result<()> + Send + 'static, @@ -200,7 +210,7 @@ fn thread_wrapper( *ct.borrow_mut() = Some(thread); }); - info!("Starting thread '{}'", thread_name); + debug!("Starting thread '{}'", thread_name); // We use AssertUnwindSafe here so that the payload function // doesn't need to be UnwindSafe. We don't do anything after the @@ -211,13 +221,17 @@ fn thread_wrapper( THREADS.lock().unwrap().remove(&thread_id); match result { - Ok(Ok(())) => info!("Thread '{}' exited normally", thread_name), + Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name), Ok(Err(err)) => { - error!( - "Shutting down: thread '{}' exited with error: {:?}", - thread_name, err - ); - shutdown_pageserver(); + if fail_on_error { + error!( + "Shutting down: thread '{}' exited with error: {:?}", + thread_name, err + ); + shutdown_pageserver(); + } else { + error!("Thread '{}' exited with error: {:?}", thread_name, err); + } } Err(err) => { error!( diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 43fb7db4b0..2c10ad315b 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -78,9 +78,11 @@ pub fn launch_wal_receiver( Some(tenantid), Some(timelineid), "WAL receiver thread", + false, move || { IS_WAL_RECEIVER.with(|c| c.set(true)); - thread_main(conf, tenantid, timelineid) + thread_main(conf, tenantid, timelineid); + Ok(()) }, )?; @@ -110,11 +112,7 @@ fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> Str // // This is the entry point for the WAL receiver thread. // -fn thread_main( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, -) -> Result<()> { +fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: ZTimelineId) { let _enter = info_span!("WAL receiver", timeline = %timeline_id, tenant = %tenant_id).entered(); info!("WAL receiver thread started"); @@ -138,7 +136,6 @@ fn thread_main( // Drop it from list of active WAL_RECEIVERS // so that next callmemaybe request launched a new thread drop_wal_receiver(tenant_id, timeline_id); - Ok(()) } fn walreceiver_main(