neon_local start: parallel startup to break cyclic dependency (#8950)

(Found this useful during investigation
https://github.com/neondatabase/cloud/issues/16886.)

Problem
-------

Before this PR, `neon_local` sequentially does the following:
1. launch storcon process
2. wait for storcon to signal readiness
[here](75310fe441/control_plane/src/storage_controller.rs (L804-L808))
3. start pageserver
4. wait for pageserver to become ready
[here](c43e664ff5/control_plane/src/pageserver.rs (L343-L346))
5. etc

The problem is that storcon's readiness waits for the
[`startup_reconcile`](cbcd4058ed/storage_controller/src/service.rs (L520-L523))
to complete.

But pageservers aren't started at this point.

So, worst case we wait for `STARTUP_RECONCILE_TIMEOUT/2`, i.e., 15s.

This is more than the 10s default timeout allowed by neon_local.

So, the result is that `neon_local start` fails to start storcon and
stops everything.

Solution
--------

In this PR I choose the the radical solution to start everything in
parallel.

It junks up the output because we do stuff like `print!(".")` to
indicate progress.
We should just abandon that.
And switch to `utils::logging` + `tracing` with separate spans for each
component.
I can do that in this PR or we leave it as a follow-up.

Alternatives Considered
-----------------------

The Pageserver's `/v1/status` or in fact any endpoint of the mgmt API
will not `accept()` on the mgmt API socket until after the `re-attach`
call to storcon returned success.

So, it's insufficient to change the startup order to start Pageservers
first.

We cannot easily change Pageserver startup order because
`init_tenant_mgr` must complete before we start serving the mgmt API.
Otherwise tenant detach calls et al can race with `init_tenant_mgr`.

We'd have to add a "loading" state to tenant mgr and make all API
endpoints except `/v1/status` wait for _that_ to complete.


Related
-------

- https://github.com/neondatabase/neon/pull/6475
This commit is contained in:
Christian Schwarz
2024-09-18 18:17:55 +02:00
committed by GitHub
parent 794bd4b866
commit 035a49a6b2
3 changed files with 99 additions and 50 deletions

View File

@@ -151,7 +151,7 @@ where
print!(".");
io::stdout().flush().unwrap();
}
thread::sleep(RETRY_INTERVAL);
tokio::time::sleep(RETRY_INTERVAL).await;
}
Err(e) => {
println!("error starting process {process_name:?}: {e:#}");

View File

@@ -34,12 +34,14 @@ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::{
auth::{Claims, Scope},
@@ -87,35 +89,35 @@ fn main() -> Result<()> {
// Check for 'neon init' command first.
let subcommand_result = if sub_name == "init" {
handle_init(sub_args).map(Some)
handle_init(sub_args).map(|env| Some(Cow::Owned(env)))
} else {
// all other commands need an existing config
let mut env =
LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
let env = Box::leak(Box::new(env));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let subcommand_result = match sub_name {
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
"start" => rt.block_on(handle_start_all(&env, get_start_timeout(sub_args))),
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)),
"storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
"mappings" => handle_mappings(sub_args, &mut env),
"tenant" => rt.block_on(handle_tenant(sub_args, env)),
"timeline" => rt.block_on(handle_timeline(sub_args, env)),
"start" => rt.block_on(handle_start_all(env, get_start_timeout(sub_args))),
"stop" => rt.block_on(handle_stop_all(sub_args, env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, env)),
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, env)),
"storage_broker" => rt.block_on(handle_storage_broker(sub_args, env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, env)),
"endpoint" => rt.block_on(handle_endpoint(sub_args, env)),
"mappings" => handle_mappings(sub_args, env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
};
if original_env != env {
subcommand_result.map(|()| Some(env))
if &original_env != env {
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
} else {
subcommand_result.map(|()| None)
}
@@ -1273,48 +1275,95 @@ async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv
}
async fn handle_start_all(
env: &local_env::LocalEnv,
env: &'static local_env::LocalEnv,
retry_timeout: &Duration,
) -> anyhow::Result<()> {
let Err(errors) = handle_start_all_impl(env, *retry_timeout).await else {
neon_start_status_check(env, retry_timeout)
.await
.context("status check after successful startup of all services")?;
return Ok(());
};
eprintln!("startup failed because one or more services could not be started");
for e in errors {
eprintln!("{e}");
let debug_repr = format!("{e:?}");
for line in debug_repr.lines() {
eprintln!(" {line}");
}
}
try_stop_all(env, true).await;
exit(2);
}
/// Returns Ok() if and only if all services could be started successfully.
/// Otherwise, returns the list of errors that occurred during startup.
async fn handle_start_all_impl(
env: &'static local_env::LocalEnv,
retry_timeout: Duration,
) -> Result<(), Vec<anyhow::Error>> {
// Endpoints are not started automatically
broker::start_broker_process(env, retry_timeout).await?;
let mut js = JoinSet::new();
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
let storage_controller = StorageController::from_env(env);
if let Err(e) = storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
(*retry_timeout).into(),
))
.await
{
eprintln!("storage_controller start failed: {:#}", e);
try_stop_all(env, true).await;
exit(1);
// force infalliblity through closure
#[allow(clippy::redundant_closure_call)]
(|| {
js.spawn(async move {
let retry_timeout = retry_timeout;
broker::start_broker_process(env, &retry_timeout).await
});
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout.into(),
))
.await
.map_err(|e| e.context("start storage_controller"))
});
}
for ps_conf in &env.pageservers {
js.spawn(async move {
let pageserver = PageServerNode::from_env(env, ps_conf);
pageserver
.start(&retry_timeout)
.await
.map_err(|e| e.context(format!("start pageserver {}", ps_conf.id)))
});
}
for node in env.safekeepers.iter() {
js.spawn(async move {
let safekeeper = SafekeeperNode::from_env(env, node);
safekeeper
.start(vec![], &retry_timeout)
.await
.map_err(|e| e.context(format!("start safekeeper {}", safekeeper.id)))
});
}
})();
let mut errors = Vec::new();
while let Some(result) = js.join_next().await {
let result = result.expect("we don't panic or cancel the tasks");
if let Err(e) = result {
errors.push(e);
}
}
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver.start(retry_timeout).await {
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
try_stop_all(env, true).await;
exit(1);
}
if !errors.is_empty() {
return Err(errors);
}
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start(vec![], retry_timeout).await {
eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
try_stop_all(env, false).await;
exit(1);
}
}
neon_start_status_check(env, retry_timeout).await?;
Ok(())
}

View File

@@ -702,7 +702,7 @@ impl Endpoint {
}
}
}
std::thread::sleep(ATTEMPT_INTERVAL);
tokio::time::sleep(ATTEMPT_INTERVAL).await;
}
// disarm the scopeguard, let the child outlive this function (and neon_local invoction)