Log pageserver threads better and shut down on errors in them

This commit is contained in:
Kirill Bulatov
2022-03-23 19:33:06 +02:00
committed by Kirill Bulatov
parent 6244fd9e7e
commit 28bc8e3f5c
4 changed files with 68 additions and 43 deletions

View File

@@ -26,7 +26,6 @@ use pageserver::{
timelines, virtual_file, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals::{self, Signal};
@@ -322,38 +321,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
shutdown_pageserver();
pageserver::shutdown_pageserver();
unreachable!()
}
})
}
fn shutdown_pageserver() {
// Shut down the libpq endpoint thread. This prevents new connections from
// being accepted.
thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None);
// Shut down any page service threads.
postgres_backend::set_pgbackend_shutdown_requested();
thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None);
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC threads.
tenant_mgr::shutdown_all_tenants();
// Stop syncing with remote storage.
//
// FIXME: Does this wait for the sync thread to finish syncing what's queued up?
// Should it?
thread_mgr::shutdown_threads(Some(ThreadKind::StorageSync), None, None);
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
thread_mgr::shutdown_threads(Some(ThreadKind::HttpEndpointListener), None, None);
// There should be nothing left, but let's be sure
thread_mgr::shutdown_threads(None, None, None);
info!("Shut down successfully completed");
std::process::exit(0);
}

View File

@@ -976,7 +976,7 @@ impl Timeline for LayeredTimeline {
/// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for
/// metrics collection.
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> {
fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => self
.flush_checkpoint_time_histo

View File

@@ -19,8 +19,14 @@ pub mod walrecord;
pub mod walredo;
use lazy_static::lazy_static;
use tracing::info;
use zenith_metrics::{register_int_gauge_vec, IntGaugeVec};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::{
postgres_backend,
zid::{ZTenantId, ZTimelineId},
};
use crate::thread_mgr::ThreadKind;
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(
@@ -43,3 +49,33 @@ pub enum CheckpointConfig {
// Flush all in-memory data and reconstruct all page images
Forced,
}
pub fn shutdown_pageserver() {
// Shut down the libpq endpoint thread. This prevents new connections from
// being accepted.
thread_mgr::shutdown_threads(Some(ThreadKind::LibpqEndpointListener), None, None);
// Shut down any page service threads.
postgres_backend::set_pgbackend_shutdown_requested();
thread_mgr::shutdown_threads(Some(ThreadKind::PageRequestHandler), None, None);
// Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC threads.
tenant_mgr::shutdown_all_tenants();
// Stop syncing with remote storage.
//
// FIXME: Does this wait for the sync thread to finish syncing what's queued up?
// Should it?
thread_mgr::shutdown_threads(Some(ThreadKind::StorageSync), None, None);
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
thread_mgr::shutdown_threads(Some(ThreadKind::HttpEndpointListener), None, None);
// There should be nothing left, but let's be sure
thread_mgr::shutdown_threads(None, None, None);
info!("Shut down successfully completed");
std::process::exit(0);
}

View File

@@ -43,12 +43,14 @@ use std::thread::JoinHandle;
use tokio::sync::watch;
use tracing::{info, warn};
use tracing::{error, info, warn};
use lazy_static::lazy_static;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::shutdown_pageserver;
lazy_static! {
/// Each thread that we track is associated with a "thread ID". It's just
/// an increasing number that we assign, not related to any system thread
@@ -125,7 +127,7 @@ struct PageServerThread {
}
/// Launch a new thread
pub fn spawn<F, E>(
pub fn spawn<F>(
kind: ThreadKind,
tenant_id: Option<ZTenantId>,
timeline_id: Option<ZTimelineId>,
@@ -133,7 +135,7 @@ pub fn spawn<F, E>(
f: F,
) -> std::io::Result<()>
where
F: FnOnce() -> Result<(), E> + Send + 'static,
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
{
let (shutdown_tx, shutdown_rx) = watch::channel(());
let thread_id = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
@@ -160,12 +162,14 @@ where
.insert(thread_id, Arc::clone(&thread_rc));
let thread_rc2 = Arc::clone(&thread_rc);
let thread_name = name.to_string();
let join_handle = match thread::Builder::new()
.name(name.to_string())
.spawn(move || thread_wrapper(thread_id, thread_rc2, shutdown_rx, f))
.spawn(move || thread_wrapper(thread_name, thread_id, thread_rc2, shutdown_rx, f))
{
Ok(handle) => handle,
Err(err) => {
error!("Failed to spawn thread '{}': {}", name, err);
// Could not spawn the thread. Remove the entry
THREADS.lock().unwrap().remove(&thread_id);
return Err(err);
@@ -180,13 +184,14 @@ where
/// This wrapper function runs in a newly-spawned thread. It initializes the
/// thread-local variables and calls the payload function
fn thread_wrapper<F, E>(
fn thread_wrapper<F>(
thread_name: String,
thread_id: u64,
thread: Arc<PageServerThread>,
shutdown_rx: watch::Receiver<()>,
f: F,
) where
F: FnOnce() -> Result<(), E> + Send + 'static,
F: FnOnce() -> anyhow::Result<()> + Send + 'static,
{
SHUTDOWN_RX.with(|rx| {
*rx.borrow_mut() = Some(shutdown_rx);
@@ -195,6 +200,8 @@ fn thread_wrapper<F, E>(
*ct.borrow_mut() = Some(thread);
});
info!("Starting thread '{}'", thread_name);
// We use AssertUnwindSafe here so that the payload function
// doesn't need to be UnwindSafe. We don't do anything after the
// unwinding that would expose us to unwind-unsafe behavior.
@@ -203,9 +210,22 @@ fn thread_wrapper<F, E>(
// Remove our entry from the global hashmap.
THREADS.lock().unwrap().remove(&thread_id);
// If the thread payload panic'd, exit with the panic.
if let Err(err) = result {
panic::resume_unwind(err);
match result {
Ok(Ok(())) => info!("Thread '{}' exited normally", thread_name),
Ok(Err(err)) => {
error!(
"Shutting down: thread '{}' exited with error: {:?}",
thread_name, err
);
shutdown_pageserver();
}
Err(err) => {
error!(
"Shutting down: thread '{}' panicked: {:?}",
thread_name, err
);
shutdown_pageserver();
}
}
}