mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
fix: exiting from process while running drops
calling std::process::exit on signals or when one task exits will not run any drops nor drop the only runtime.
This commit is contained in:
@@ -3,9 +3,7 @@
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::Parser;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use futures::FutureExt;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::task::JoinError;
|
||||
@@ -195,11 +193,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
start_safekeeper(conf).await
|
||||
}
|
||||
|
||||
/// Result of joining any of main tasks: upper error means task failed to
|
||||
/// complete, e.g. panicked, inner is error produced by task itself.
|
||||
type JoinTaskRes = Result<anyhow::Result<()>, JoinError>;
|
||||
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
@@ -235,44 +229,47 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears. Probably
|
||||
// replace it with JoinSet once we update tokio.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
fn named_should_never_return(
|
||||
name: &'static str,
|
||||
unexpected: Result<Result<(), anyhow::Error>, JoinError>,
|
||||
) -> anyhow::Result<()> {
|
||||
let res = match unexpected {
|
||||
Ok(Ok(())) => Err(anyhow::anyhow!("unexpected Ok(()) return")),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(e) => Err(anyhow::Error::new(e)),
|
||||
};
|
||||
|
||||
// was not able to get this working with `enum Void {}`
|
||||
res.with_context(|| format!("task {name} unexpectedly joined"))
|
||||
}
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_service_handle = tokio::spawn(wal_service::task_main(conf_, pg_listener))
|
||||
// wrap with task name for error reporting
|
||||
.map(|res| ("WAL service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_service_handle));
|
||||
.map(|res| named_should_never_return("WAL service main", res));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let http_handle = tokio::spawn(http::task_main(conf_, http_listener))
|
||||
.map(|res| ("HTTP service main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(http_handle));
|
||||
.map(|res| named_should_never_return("HTTP service main", res));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let broker_task_handle =
|
||||
tokio::spawn(broker::task_main(conf_).instrument(info_span!("broker")))
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
.map(|res| named_should_never_return("broker main", res));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_remover_handle =
|
||||
tokio::spawn(remove_wal::task_main(conf_)).map(|res| ("WAL remover".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_remover_handle));
|
||||
let wal_remover_handle = tokio::spawn(remove_wal::task_main(conf_))
|
||||
.map(|res| named_should_never_return("WAL remover", res));
|
||||
|
||||
let conf_ = conf.clone();
|
||||
let wal_backup_handle = tokio::spawn(wal_backup::wal_backup_launcher_task_main(
|
||||
conf_,
|
||||
wal_backup_launcher_rx,
|
||||
))
|
||||
.map(|res| ("WAL backup launcher".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(wal_backup_handle));
|
||||
.map(|res| named_should_never_return("WAL backup launcher", res));
|
||||
|
||||
let metrics_shifter_handle = tokio::spawn(safekeeper::metrics::metrics_shifter())
|
||||
.map(|res| ("metrics shifter".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(metrics_shifter_handle));
|
||||
.map(|res| named_should_never_return("metrics shifter", res));
|
||||
|
||||
set_build_info_metric(GIT_VERSION);
|
||||
|
||||
@@ -283,19 +280,34 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
let mut sigint_stream = signal(SignalKind::interrupt())?;
|
||||
let mut sigterm_stream = signal(SignalKind::terminate())?;
|
||||
|
||||
let tasks = async move {
|
||||
tokio::try_join!(
|
||||
wal_service_handle,
|
||||
http_handle,
|
||||
broker_task_handle,
|
||||
wal_remover_handle,
|
||||
wal_backup_handle,
|
||||
metrics_shifter_handle
|
||||
)
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
Some((task_name, res)) = tasks_handles.next()=> {
|
||||
error!("{} task failed: {:?}, exiting", task_name, res);
|
||||
std::process::exit(1);
|
||||
res = tasks => {
|
||||
// this will be the first reason to stop a safekeeper, but not necessarily the only one
|
||||
// which will get to happen before we exit
|
||||
match res {
|
||||
Ok(_) => unreachable!("because of named_should_never_return, we can never end up here, cannot use ! yet"),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
// On any shutdown signal, log receival and exit. Additionally, handling
|
||||
// SIGQUIT prevents coredump.
|
||||
_ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"),
|
||||
_ = sigint_stream.recv() => info!("received SIGINT, terminating"),
|
||||
_ = sigterm_stream.recv() => info!("received SIGTERM, terminating")
|
||||
}
|
||||
|
||||
};
|
||||
std::process::exit(0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Determine safekeeper id.
|
||||
|
||||
Reference in New Issue
Block a user