diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 619c5bce3e..94a072e394 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -151,7 +151,7 @@ where print!("."); io::stdout().flush().unwrap(); } - thread::sleep(RETRY_INTERVAL); + tokio::time::sleep(RETRY_INTERVAL).await; } Err(e) => { println!("error starting process {process_name:?}: {e:#}"); diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 1c94c42865..92f609761a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -34,12 +34,14 @@ use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; +use std::borrow::Cow; use std::collections::{BTreeSet, HashMap}; use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use std::time::Duration; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; +use tokio::task::JoinSet; use url::Host; use utils::{ auth::{Claims, Scope}, @@ -87,35 +89,35 @@ fn main() -> Result<()> { // Check for 'neon init' command first. let subcommand_result = if sub_name == "init" { - handle_init(sub_args).map(Some) + handle_init(sub_args).map(|env| Some(Cow::Owned(env))) } else { // all other commands need an existing config - let mut env = - LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?; - let original_env = env.clone(); + let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?; + let original_env = env.clone(); + let env = Box::leak(Box::new(env)); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let subcommand_result = match sub_name { - "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), - "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => rt.block_on(handle_start_all(&env, get_start_timeout(sub_args))), - "stop" => rt.block_on(handle_stop_all(sub_args, &env)), - "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), - "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), - "storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)), - "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), - "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), - "mappings" => handle_mappings(sub_args, &mut env), + "tenant" => rt.block_on(handle_tenant(sub_args, env)), + "timeline" => rt.block_on(handle_timeline(sub_args, env)), + "start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))), + "stop" => rt.block_on(handle_stop_all(sub_args, env)), + "pageserver" => rt.block_on(handle_pageserver(sub_args, env)), + "storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)), + "storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)), + "safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)), + "endpoint" => rt.block_on(handle_endpoint(sub_args, env)), + "mappings" => handle_mappings(sub_args, env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), _ => bail!("unexpected subcommand {sub_name}"), }; - if original_env != env { - subcommand_result.map(|()| Some(env)) + if &original_env != env { + subcommand_result.map(|()| Some(Cow::Borrowed(env))) } else { subcommand_result.map(|()| None) } @@ -1273,48 +1275,95 @@ async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv } async fn handle_start_all( - env: &local_env::LocalEnv, + env: &'static local_env::LocalEnv, retry_timeout: &Duration, ) -> anyhow::Result<()> { + let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else { + neon_start_status_check(env, retry_timeout) + .await + .context("status check after successful startup of all services")?; + return Ok(()); + }; + + eprintln!("startup failed because one or more services could not be started"); + + for e in errors { + eprintln!("{e}"); + let debug_repr = format!("{e:?}"); + for line in debug_repr.lines() { + eprintln!(" {line}"); + } + } + + try_stop_all(env, true).await; + + exit(2); +} + +/// Returns Ok() if and only if all services could be started successfully. +/// Otherwise, returns the list of errors that occurred during startup. +async fn handle_start_all_impl( + env: &'static local_env::LocalEnv, + retry_timeout: Duration, +) -> Result<(), Vec> { // Endpoints are not started automatically - broker::start_broker_process(env, retry_timeout).await?; + let mut js = JoinSet::new(); - // Only start the storage controller if the pageserver is configured to need it - if env.control_plane_api.is_some() { - let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller - .start(NeonStorageControllerStartArgs::with_default_instance_id( - (*retry_timeout).into(), - )) - .await - { - eprintln!("storage_controller start failed: {:#}", e); - try_stop_all(env, true).await; - exit(1); + // force infalliblity through closure + #[allow(clippy::redundant_closure_call)] + (|| { + js.spawn(async move { + let retry_timeout = retry_timeout; + broker::start_broker_process(env, &retry_timeout).await + }); + + // Only start the storage controller if the pageserver is configured to need it + if env.control_plane_api.is_some() { + js.spawn(async move { + let storage_controller = StorageController::from_env(env); + storage_controller + .start(NeonStorageControllerStartArgs::with_default_instance_id( + retry_timeout.into(), + )) + .await + .map_err(|e| e.context("start storage_controller")) + }); + } + + for ps_conf in &env.pageservers { + js.spawn(async move { + let pageserver = PageServerNode::from_env(env, ps_conf); + pageserver + .start(&retry_timeout) + .await + .map_err(|e| e.context(format!("start pageserver {}", ps_conf.id))) + }); + } + + for node in env.safekeepers.iter() { + js.spawn(async move { + let safekeeper = SafekeeperNode::from_env(env, node); + safekeeper + .start(vec![], &retry_timeout) + .await + .map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id))) + }); + } + })(); + + let mut errors = Vec::new(); + while let Some(result) = js.join_next().await { + let result = result.expect("we don't panic or cancel the tasks"); + if let Err(e) = result { + errors.push(e); } } - for ps_conf in &env.pageservers { - let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start(retry_timeout).await { - eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); - try_stop_all(env, true).await; - exit(1); - } + if !errors.is_empty() { + return Err(errors); } - for node in env.safekeepers.iter() { - let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![], retry_timeout).await { - eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); - try_stop_all(env, false).await; - exit(1); - } - } - - neon_start_status_check(env, retry_timeout).await?; - Ok(()) } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 9f879c4b08..7554a03a68 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -702,7 +702,7 @@ impl Endpoint { } } } - std::thread::sleep(ATTEMPT_INTERVAL); + tokio::time::sleep(ATTEMPT_INTERVAL).await; } // disarm the scopeguard, let the child outlive this function (and neon_local invoction)