Exit only on important thread failures

This commit is contained in:
Kirill Bulatov
2022-03-24 14:05:15 +02:00
committed by Kirill Bulatov
parent 28bc8e3f5c
commit b39d1b1717
6 changed files with 57 additions and 34 deletions

View File

@@ -291,6 +291,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
None,
None,
"http_endpoint_thread",
false,
move || {
let router = http::make_router(conf, auth_cloned, remote_index);
endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher())
@@ -304,6 +305,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
None,
None,
"libpq endpoint thread",
false,
move || page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type),
)?;

View File

@@ -228,6 +228,7 @@ pub fn thread_main(
None,
None,
"serving Page Service thread",
false,
move || page_service_conn_main(conf, local_auth, socket, auth_type),
) {
// Thread creation failed. Log the error and continue.

View File

@@ -404,6 +404,7 @@ pub(super) fn spawn_storage_sync_thread<
None,
None,
"Remote storage sync thread",
false,
move || {
storage_sync_loop(
runtime,
@@ -413,7 +414,8 @@ pub(super) fn spawn_storage_sync_thread<
storage,
max_concurrent_sync,
max_sync_errors,
)
);
Ok(())
},
)
.context("Failed to spawn remote storage sync thread")?;
@@ -440,7 +442,7 @@ fn storage_sync_loop<
storage: S,
max_concurrent_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> anyhow::Result<()> {
) {
let remote_assets = Arc::new((storage, Arc::clone(&index)));
loop {
let index = Arc::clone(&index);
@@ -470,8 +472,6 @@ fn storage_sync_loop<
}
}
}
Ok(())
}
async fn loop_step<

View File

@@ -206,13 +206,13 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
/// Change the state of a tenant to Active and launch its checkpointer and GC
/// threads. If the tenant was already in Active state or Stopping, does nothing.
///
pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Result<()> {
pub fn activate_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Result<()> {
let mut m = access_tenants();
let tenant = m
.get_mut(&tenantid)
.with_context(|| format!("Tenant not found for id {}", tenantid))?;
.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {}", tenant_id))?;
info!("activating tenant {}", tenantid);
info!("activating tenant {}", tenant_id);
match tenant.state {
// If the tenant is already active, nothing to do.
@@ -222,22 +222,31 @@ pub fn activate_tenant(conf: &'static PageServerConf, tenantid: ZTenantId) -> Re
TenantState::Idle => {
thread_mgr::spawn(
ThreadKind::Checkpointer,
Some(tenantid),
Some(tenant_id),
None,
"Checkpointer thread",
move || crate::tenant_threads::checkpoint_loop(tenantid, conf),
true,
move || crate::tenant_threads::checkpoint_loop(tenant_id, conf),
)?;
// FIXME: if we fail to launch the GC thread, but already launched the
// checkpointer, we're in a strange state.
thread_mgr::spawn(
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenantid),
Some(tenant_id),
None,
"GC thread",
move || crate::tenant_threads::gc_loop(tenantid, conf),
)?;
true,
move || crate::tenant_threads::gc_loop(tenant_id, conf),
)
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
if let Err(e) = &gc_spawn_result {
error!(
"Failed to start GC thread for tenant {}, stopping its checkpointer thread: {:?}",
tenant_id, e
);
thread_mgr::shutdown_threads(Some(ThreadKind::Checkpointer), Some(tenant_id), None);
return gc_spawn_result;
}
tenant.state = TenantState::Active;
}

View File

@@ -43,7 +43,7 @@ use std::thread::JoinHandle;
use tokio::sync::watch;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use lazy_static::lazy_static;
@@ -132,6 +132,7 @@ pub fn spawn<F>(
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
name: &str,
fail_on_error: bool,
f: F,
) -> std::io::Result<()>
where
@@ -165,8 +166,16 @@ where
let thread_name = name.to_string();
let join_handle = match thread::Builder::new()
.name(name.to_string())
.spawn(move || thread_wrapper(thread_name, thread_id, thread_rc2, shutdown_rx, f))
{
.spawn(move || {
thread_wrapper(
thread_name,
thread_id,
thread_rc2,
shutdown_rx,
fail_on_error,
f,
)
}) {
Ok(handle) => handle,
Err(err) => {
error!("Failed to spawn thread '{}': {}", name, err);
@@ -189,6 +198,7 @@ fn thread_wrapper<F>(
thread_id: u64,
thread: Arc<PageServerThread>,
shutdown_rx: watch::Receiver<()>,
fail_on_error: bool,
f: F,
) where
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
@@ -200,7 +210,7 @@ fn thread_wrapper<F>(
*ct.borrow_mut() = Some(thread);
});
info!("Starting thread '{}'", thread_name);
debug!("Starting thread '{}'", thread_name);
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
@@ -211,13 +221,17 @@ fn thread_wrapper<F>(
THREADS.lock().unwrap().remove(&thread_id);
match result {
Ok(Ok(())) => info!("Thread '{}' exited normally", thread_name),
Ok(Ok(())) => debug!("Thread '{}' exited normally", thread_name),
Ok(Err(err)) => {
error!(
"Shutting down: thread '{}' exited with error: {:?}",
thread_name, err
);
shutdown_pageserver();
if fail_on_error {
error!(
"Shutting down: thread '{}' exited with error: {:?}",
thread_name, err
);
shutdown_pageserver();
} else {
error!("Thread '{}' exited with error: {:?}", thread_name, err);
}
}
Err(err) => {
error!(

View File

@@ -78,9 +78,11 @@ pub fn launch_wal_receiver(
Some(tenantid),
Some(timelineid),
"WAL receiver thread",
false,
move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, tenantid, timelineid)
thread_main(conf, tenantid, timelineid);
Ok(())
},
)?;
@@ -110,11 +112,7 @@ fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> Str
//
// This is the entry point for the WAL receiver thread.
//
fn thread_main(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Result<()> {
fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: ZTimelineId) {
let _enter = info_span!("WAL receiver", timeline = %timeline_id, tenant = %tenant_id).entered();
info!("WAL receiver thread started");
@@ -138,7 +136,6 @@ fn thread_main(
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(tenant_id, timeline_id);
Ok(())
}
fn walreceiver_main(