From b0ea9175fbf07bc5382c885eea8490b9841219ae Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Sat, 29 Apr 2023 14:10:37 +0300 Subject: [PATCH] fix: exiting from process while running drops calling std::process::exit on signals or when one task exits will not run any drops nor drop the only runtime. --- safekeeper/src/bin/safekeeper.rs | 72 +++++++++++++++++++------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index b73eca49b5..3a9a4135f1 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -3,9 +3,7 @@ // use anyhow::{bail, Context, Result}; use clap::Parser; -use futures::future::BoxFuture; -use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use remote_storage::RemoteStorageConfig; use tokio::signal::unix::{signal, SignalKind}; use tokio::task::JoinError; @@ -195,11 +193,7 @@ async fn main() -> anyhow::Result<()> { start_safekeeper(conf).await } -/// Result of joining any of main tasks: upper error means task failed to -/// complete, e.g. panicked, inner is error produced by task itself. -type JoinTaskRes = Result, JoinError>; - -async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +async fn start_safekeeper(conf: SafeKeeperConf) -> anyhow::Result<()> { // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(PID_FILE_NAME); let lock_file = @@ -235,44 +229,47 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // Load all timelines from disk to memory. GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; - // Keep handles to main tasks to die if any of them disappears. Probably - // replace it with JoinSet once we update tokio. - let mut tasks_handles: FuturesUnordered> = - FuturesUnordered::new(); + fn named_should_never_return( + name: &'static str, + unexpected: Result, JoinError>, + ) -> anyhow::Result<()> { + let res = match unexpected { + Ok(Ok(())) => Err(anyhow::anyhow!("unexpected Ok(()) return")), + Ok(Err(e)) => Err(e), + Err(e) => Err(anyhow::Error::new(e)), + }; + + // was not able to get this working with `enum Void {}` + res.with_context(|| format!("task {name} unexpectedly joined")) + } let conf_ = conf.clone(); let wal_service_handle = tokio::spawn(wal_service::task_main(conf_, pg_listener)) // wrap with task name for error reporting - .map(|res| ("WAL service main".to_owned(), res)); - tasks_handles.push(Box::pin(wal_service_handle)); + .map(|res| named_should_never_return("WAL service main", res)); let conf_ = conf.clone(); let http_handle = tokio::spawn(http::task_main(conf_, http_listener)) - .map(|res| ("HTTP service main".to_owned(), res)); - tasks_handles.push(Box::pin(http_handle)); + .map(|res| named_should_never_return("HTTP service main", res)); let conf_ = conf.clone(); let broker_task_handle = tokio::spawn(broker::task_main(conf_).instrument(info_span!("broker"))) - .map(|res| ("broker main".to_owned(), res)); - tasks_handles.push(Box::pin(broker_task_handle)); + .map(|res| named_should_never_return("broker main", res)); let conf_ = conf.clone(); - let wal_remover_handle = - tokio::spawn(remove_wal::task_main(conf_)).map(|res| ("WAL remover".to_owned(), res)); - tasks_handles.push(Box::pin(wal_remover_handle)); + let wal_remover_handle = tokio::spawn(remove_wal::task_main(conf_)) + .map(|res| named_should_never_return("WAL remover", res)); let conf_ = conf.clone(); let wal_backup_handle = tokio::spawn(wal_backup::wal_backup_launcher_task_main( conf_, wal_backup_launcher_rx, )) - .map(|res| ("WAL backup launcher".to_owned(), res)); - tasks_handles.push(Box::pin(wal_backup_handle)); + .map(|res| named_should_never_return("WAL backup launcher", res)); let metrics_shifter_handle = tokio::spawn(safekeeper::metrics::metrics_shifter()) - .map(|res| ("metrics shifter".to_owned(), res)); - tasks_handles.push(Box::pin(metrics_shifter_handle)); + .map(|res| named_should_never_return("metrics shifter", res)); set_build_info_metric(GIT_VERSION); @@ -283,19 +280,34 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let mut sigint_stream = signal(SignalKind::interrupt())?; let mut sigterm_stream = signal(SignalKind::terminate())?; + let tasks = async move { + tokio::try_join!( + wal_service_handle, + http_handle, + broker_task_handle, + wal_remover_handle, + wal_backup_handle, + metrics_shifter_handle + ) + }; + tokio::select! { - Some((task_name, res)) = tasks_handles.next()=> { - error!("{} task failed: {:?}, exiting", task_name, res); - std::process::exit(1); + res = tasks => { + // this will be the first reason to stop a safekeeper, but not necessarily the only one + // which will get to happen before we exit + match res { + Ok(_) => unreachable!("because of named_should_never_return, we can never end up here, cannot use ! yet"), + Err(e) => return Err(e), + } } // On any shutdown signal, log receival and exit. Additionally, handling // SIGQUIT prevents coredump. _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"), _ = sigint_stream.recv() => info!("received SIGINT, terminating"), _ = sigterm_stream.recv() => info!("received SIGTERM, terminating") + } - }; - std::process::exit(0); + Ok(()) } /// Determine safekeeper id.