diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 3f4ddbdb2b..a272c306e7 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -36,11 +36,11 @@ use utils::pid_file::{self, PidFileRead}; // it's waiting. If the process hasn't started/stopped after 5 seconds, // it prints a notice that it's taking long, but keeps waiting. // -const RETRY_UNTIL_SECS: u64 = 10; -const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS; -const RETRY_INTERVAL_MILLIS: u64 = 100; -const DOT_EVERY_RETRIES: u64 = 10; -const NOTICE_AFTER_RETRIES: u64 = 50; +const STOP_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +const STOP_RETRIES: u128 = STOP_RETRY_TIMEOUT.as_millis() / RETRY_INTERVAL.as_millis(); +const RETRY_INTERVAL: Duration = Duration::from_millis(100); +const DOT_EVERY_RETRIES: u128 = 10; +const NOTICE_AFTER_RETRIES: u128 = 50; /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates /// it itself. @@ -52,6 +52,7 @@ pub enum InitialPidFile { } /// Start a background child process using the parameters given. +#[allow(clippy::too_many_arguments)] pub async fn start_process( process_name: &str, datadir: &Path, @@ -59,6 +60,7 @@ pub async fn start_process( args: AI, envs: EI, initial_pid_file: InitialPidFile, + retry_timeout: &Duration, process_status_check: F, ) -> anyhow::Result<()> where @@ -69,6 +71,7 @@ where // Not generic AsRef, otherwise empty `envs` prevents type inference EI: IntoIterator, { + let retries: u128 = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis(); if !datadir.metadata().context("stat datadir")?.is_dir() { anyhow::bail!("`datadir` must be a directory when calling this function: {datadir:?}"); } @@ -130,7 +133,7 @@ where .unwrap(); }); - for retries in 0..RETRIES { + for retries in 0..retries { match process_started(pid, pid_file_to_check, &process_status_check).await { Ok(true) => { println!("\n{process_name} started and passed status check, pid: {pid}"); @@ -148,7 +151,7 @@ where print!("."); io::stdout().flush().unwrap(); } - thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS)); + thread::sleep(RETRY_INTERVAL); } Err(e) => { println!("error starting process {process_name:?}: {e:#}"); @@ -157,9 +160,10 @@ where } } println!(); - anyhow::bail!( - "{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds" - ); + anyhow::bail!(format!( + "{} did not start+pass status checks within {:?} seconds", + process_name, retry_timeout + )); } /// Stops the process, using the pid file given. Returns Ok also if the process is already not running. @@ -215,7 +219,7 @@ pub fn stop_process( } pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> { - for retries in 0..RETRIES { + for retries in 0..STOP_RETRIES { match process_has_stopped(pid) { Ok(true) => { println!("\n{process_name} stopped"); @@ -231,7 +235,7 @@ pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> { print!("."); io::stdout().flush().unwrap(); } - thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS)); + thread::sleep(RETRY_INTERVAL); } Err(e) => { println!("{process_name} with pid {pid} failed to stop: {e:#}"); @@ -240,7 +244,10 @@ pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> { } } println!(); - anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds"); + anyhow::bail!(format!( + "{} with pid {} did not stop in {:?} seconds", + process_name, pid, STOP_RETRY_TIMEOUT + )); } fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8fe959792b..3f656932d5 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -36,6 +36,7 @@ use std::collections::{BTreeSet, HashMap}; use std::path::PathBuf; use std::process::exit; use std::str::FromStr; +use std::time::Duration; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; use url::Host; use utils::{ @@ -99,7 +100,7 @@ fn main() -> Result<()> { let subcommand_result = match sub_name { "tenant" => rt.block_on(handle_tenant(sub_args, &mut env)), "timeline" => rt.block_on(handle_timeline(sub_args, &mut env)), - "start" => rt.block_on(handle_start_all(&env)), + "start" => rt.block_on(handle_start_all(&env, get_start_timeout(sub_args))), "stop" => rt.block_on(handle_stop_all(sub_args, &env)), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), @@ -1048,10 +1049,20 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result &Duration { + let humantime_duration = args + .get_one::("start-timeout") + .expect("invalid value for start-timeout"); + humantime_duration.as_ref() +} + async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { - if let Err(e) = get_pageserver(env, subcommand_args)?.start().await { + if let Err(e) = get_pageserver(env, subcommand_args)? + .start(get_start_timeout(subcommand_args)) + .await + { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -1077,7 +1088,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> exit(1); } - if let Err(e) = pageserver.start().await { + if let Err(e) = pageserver.start(get_start_timeout(sub_match)).await { eprintln!("pageserver start failed: {e}"); exit(1); } @@ -1105,8 +1116,8 @@ async fn handle_storage_controller( ) -> Result<()> { let svc = StorageController::from_env(env); match sub_match.subcommand() { - Some(("start", _start_match)) => { - if let Err(e) = svc.start().await { + Some(("start", start_match)) => { + if let Err(e) = svc.start(get_start_timeout(start_match)).await { eprintln!("start failed: {e}"); exit(1); } @@ -1165,7 +1176,10 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> "start" => { let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts).await { + if let Err(e) = safekeeper + .start(extra_opts, get_start_timeout(sub_args)) + .await + { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1191,7 +1205,10 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } let extra_opts = safekeeper_extra_opts(sub_args); - if let Err(e) = safekeeper.start(extra_opts).await { + if let Err(e) = safekeeper + .start(extra_opts, get_start_timeout(sub_args)) + .await + { eprintln!("safekeeper start failed: {}", e); exit(1); } @@ -1204,15 +1221,18 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } -async fn handle_start_all(env: &local_env::LocalEnv) -> anyhow::Result<()> { +async fn handle_start_all( + env: &local_env::LocalEnv, + retry_timeout: &Duration, +) -> anyhow::Result<()> { // Endpoints are not started automatically - broker::start_broker_process(env).await?; + broker::start_broker_process(env, retry_timeout).await?; // Only start the storage controller if the pageserver is configured to need it if env.control_plane_api.is_some() { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.start().await { + if let Err(e) = storage_controller.start(retry_timeout).await { eprintln!("storage_controller start failed: {:#}", e); try_stop_all(env, true).await; exit(1); @@ -1221,7 +1241,7 @@ async fn handle_start_all(env: &local_env::LocalEnv) -> anyhow::Result<()> { for ps_conf in &env.pageservers { let pageserver = PageServerNode::from_env(env, ps_conf); - if let Err(e) = pageserver.start().await { + if let Err(e) = pageserver.start(retry_timeout).await { eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e); try_stop_all(env, true).await; exit(1); @@ -1230,7 +1250,7 @@ async fn handle_start_all(env: &local_env::LocalEnv) -> anyhow::Result<()> { for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); - if let Err(e) = safekeeper.start(vec![]).await { + if let Err(e) = safekeeper.start(vec![], retry_timeout).await { eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e); try_stop_all(env, false).await; exit(1); @@ -1290,6 +1310,15 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } fn cli() -> Command { + let timeout_arg = Arg::new("start-timeout") + .long("start-timeout") + .short('t') + .global(true) + .help("timeout until we fail the command, e.g. 30s") + .value_parser(value_parser!(humantime::Duration)) + .default_value("10s") + .required(false); + let branch_name_arg = Arg::new("branch-name") .long("branch-name") .help("Name of the branch to be created or used as an alias for other services") @@ -1509,6 +1538,7 @@ fn cli() -> Command { .subcommand(Command::new("status")) .subcommand(Command::new("start") .about("Start local pageserver") + .arg(timeout_arg.clone()) ) .subcommand(Command::new("stop") .about("Stop local pageserver") @@ -1516,13 +1546,15 @@ fn cli() -> Command { ) .subcommand(Command::new("restart") .about("Restart local pageserver") + .arg(timeout_arg.clone()) ) ) .subcommand( Command::new("storage_controller") .arg_required_else_help(true) .about("Manage storage_controller") - .subcommand(Command::new("start").about("Start storage controller")) + .subcommand(Command::new("start").about("Start storage controller") + .arg(timeout_arg.clone())) .subcommand(Command::new("stop").about("Stop storage controller") .arg(stop_mode_arg.clone())) ) @@ -1534,6 +1566,7 @@ fn cli() -> Command { .about("Start local safekeeper") .arg(safekeeper_id_arg.clone()) .arg(safekeeper_extra_opt_arg.clone()) + .arg(timeout_arg.clone()) ) .subcommand(Command::new("stop") .about("Stop local safekeeper") @@ -1545,6 +1578,7 @@ fn cli() -> Command { .arg(safekeeper_id_arg) .arg(stop_mode_arg.clone()) .arg(safekeeper_extra_opt_arg) + .arg(timeout_arg.clone()) ) ) .subcommand( @@ -1579,6 +1613,7 @@ fn cli() -> Command { .arg(remote_ext_config_args) .arg(create_test_user) .arg(allow_multiple.clone()) + .arg(timeout_arg.clone()) ) .subcommand(Command::new("reconfigure") .about("Reconfigure the endpoint") @@ -1630,6 +1665,7 @@ fn cli() -> Command { .subcommand( Command::new("start") .about("Start page server and safekeepers") + .arg(timeout_arg.clone()) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index f40705863b..c3cfc140da 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -5,13 +5,18 @@ //! ```text //! .neon/safekeepers/ //! ``` +use std::time::Duration; + use anyhow::Context; use camino::Utf8PathBuf; use crate::{background_process, local_env}; -pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { +pub async fn start_broker_process( + env: &local_env::LocalEnv, + retry_timeout: &Duration, +) -> anyhow::Result<()> { let broker = &env.broker; let listen_addr = &broker.listen_addr; @@ -27,6 +32,7 @@ pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<( args, [], background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), + retry_timeout, || async { let url = broker.client_url(); let status_url = url.join("status").with_context(|| { diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 13e684da24..da4b987849 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -158,8 +158,8 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self) -> anyhow::Result<()> { - self.start_node().await + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { + self.start_node(retry_timeout).await } fn pageserver_init(&self, conf: NeonLocalInitPageserverConf) -> anyhow::Result<()> { @@ -214,14 +214,15 @@ impl PageServerNode { Ok(()) } - async fn start_node(&self) -> anyhow::Result<()> { + async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( - "Starting pageserver node {} at '{}' in {:?}", + "Starting pageserver node {} at '{}' in {:?}, retrying for {:?}", self.conf.id, self.pg_connection_config.raw_address(), - datadir + datadir, + retry_timeout ); io::stdout().flush().context("flush stdout")?; @@ -239,6 +240,7 @@ impl PageServerNode { args, self.pageserver_env_variables()?, background_process::InitialPidFile::Expect(self.pid_file()), + retry_timeout, || async { let st = self.check_status().await; match st { diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 4a320ce53d..a0a73f5609 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -7,6 +7,7 @@ //! ``` use std::io::Write; use std::path::PathBuf; +use std::time::Duration; use std::{io, result}; use anyhow::Context; @@ -111,11 +112,16 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub async fn start(&self, extra_opts: Vec) -> anyhow::Result<()> { + pub async fn start( + &self, + extra_opts: Vec, + retry_timeout: &Duration, + ) -> anyhow::Result<()> { print!( - "Starting safekeeper at '{}' in '{}'", + "Starting safekeeper at '{}' in '{}', retrying for {:?}", self.pg_connection_config.raw_address(), - self.datadir_path().display() + self.datadir_path().display(), + retry_timeout, ); io::stdout().flush().unwrap(); @@ -200,6 +206,7 @@ impl SafekeeperNode { &args, self.safekeeper_env_variables()?, background_process::InitialPidFile::Expect(self.pid_file()), + retry_timeout, || async { match self.check_status().await { Ok(()) => Ok(true), diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 4f9f0ba794..1c56d5f80f 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -18,7 +18,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{fs, str::FromStr}; +use std::{fs, str::FromStr, time::Duration}; use tokio::process::Command; use tracing::instrument; use url::Url; @@ -224,7 +224,7 @@ impl StorageController { Ok(database_url) } - pub async fn start(&self) -> anyhow::Result<()> { + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { // Start a vanilla Postgres process used by the storage controller for persistence. let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) .unwrap() @@ -272,6 +272,7 @@ impl StorageController { db_start_args, [], background_process::InitialPidFile::Create(self.postgres_pid_file()), + retry_timeout, || self.pg_isready(&pg_bin_dir), ) .await?; @@ -326,6 +327,7 @@ impl StorageController { args, [], background_process::InitialPidFile::Create(self.pid_file()), + retry_timeout, || async { match self.ready().await { Ok(_) => Ok(true), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b5e40f5a46..4ff1705ca4 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1177,10 +1177,10 @@ class NeonEnv: force=config.config_init_force, ) - def start(self): + def start(self, timeout_in_seconds: Optional[int] = None): # Storage controller starts first, so that pageserver /re-attach calls don't # bounce through retries on startup - self.storage_controller.start() + self.storage_controller.start(timeout_in_seconds=timeout_in_seconds) # Wait for storage controller readiness to prevent unnecessary post start-up # reconcile. @@ -1196,10 +1196,18 @@ class NeonEnv: ) # The `or None` is for the linter for pageserver in self.pageservers: - futs.append(executor.submit(lambda ps=pageserver: ps.start())) + futs.append( + executor.submit( + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + ) + ) for safekeeper in self.safekeepers: - futs.append(executor.submit(lambda sk=safekeeper: sk.start())) + futs.append( + executor.submit( + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + ) + ) for f in futs: f.result() @@ -1783,8 +1791,13 @@ class NeonCli(AbstractNeonCli): res.check_returncode() return res - def storage_controller_start(self): + def storage_controller_start( + self, + timeout_in_seconds: Optional[int] = None, + ): cmd = ["storage_controller", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") return self.raw_cli(cmd) def storage_controller_stop(self, immediate: bool): @@ -1797,8 +1810,11 @@ class NeonCli(AbstractNeonCli): self, id: int, extra_env_vars: Optional[Dict[str, str]] = None, + timeout_in_seconds: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": start_args = ["pageserver", "start", f"--id={id}"] + if timeout_in_seconds is not None: + start_args.append(f"--start-timeout={timeout_in_seconds}s") storage = self.env.pageserver_remote_storage if isinstance(storage, S3Storage): @@ -1816,7 +1832,10 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(cmd) def safekeeper_start( - self, id: int, extra_opts: Optional[List[str]] = None + self, + id: int, + extra_opts: Optional[List[str]] = None, + timeout_in_seconds: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": s3_env_vars = None if isinstance(self.env.safekeepers_remote_storage, S3Storage): @@ -1826,6 +1845,8 @@ class NeonCli(AbstractNeonCli): extra_opts = [f"-e={opt}" for opt in extra_opts] else: extra_opts = [] + if timeout_in_seconds is not None: + extra_opts.append(f"--start-timeout={timeout_in_seconds}s") return self.raw_cli( ["safekeeper", "start", str(id), *extra_opts], extra_env_vars=s3_env_vars ) @@ -2077,9 +2098,9 @@ class NeonStorageController(MetricsGetter, LogUtils): self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS self.logfile = self.workdir / "storage_controller.log" - def start(self): + def start(self, timeout_in_seconds: Optional[int] = None): assert not self.running - self.env.neon_cli.storage_controller_start() + self.env.neon_cli.storage_controller_start(timeout_in_seconds) self.running = True return self @@ -2531,6 +2552,7 @@ class NeonPageserver(PgProtocol, LogUtils): def start( self, extra_env_vars: Optional[Dict[str, str]] = None, + timeout_in_seconds: Optional[int] = None, ) -> "NeonPageserver": """ Start the page server. @@ -2539,7 +2561,9 @@ class NeonPageserver(PgProtocol, LogUtils): """ assert self.running is False - self.env.neon_cli.pageserver_start(self.id, extra_env_vars=extra_env_vars) + self.env.neon_cli.pageserver_start( + self.id, extra_env_vars=extra_env_vars, timeout_in_seconds=timeout_in_seconds + ) self.running = True return self @@ -2553,13 +2577,17 @@ class NeonPageserver(PgProtocol, LogUtils): self.running = False return self - def restart(self, immediate: bool = False): + def restart( + self, + immediate: bool = False, + timeout_in_seconds: Optional[int] = None, + ): """ High level wrapper for restart: restarts the process, and waits for tenant state to stabilize. """ self.stop(immediate=immediate) - self.start() + self.start(timeout_in_seconds=timeout_in_seconds) self.quiesce_tenants() def quiesce_tenants(self): @@ -3835,9 +3863,13 @@ class Safekeeper(LogUtils): self.running = running self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log" - def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper": + def start( + self, extra_opts: Optional[List[str]] = None, timeout_in_seconds: Optional[int] = None + ) -> "Safekeeper": assert self.running is False - self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts) + self.env.neon_cli.safekeeper_start( + self.id, extra_opts=extra_opts, timeout_in_seconds=timeout_in_seconds + ) self.running = True # wait for wal acceptor start by checking its status started_at = time.time() diff --git a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py index 68f3d9dcbe..1d579214b0 100644 --- a/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py +++ b/test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py @@ -85,6 +85,8 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn( f"max_throughput_latest_lsn-{n_tenants}-{pgbench_scale}", n_tenants, setup_wrapper, + # https://github.com/neondatabase/neon/issues/8070 + timeout_in_seconds=60, ) env.pageserver.allowed_errors.append( diff --git a/test_runner/performance/pageserver/util.py b/test_runner/performance/pageserver/util.py index f31cd9a9f8..92e05663ce 100644 --- a/test_runner/performance/pageserver/util.py +++ b/test_runner/performance/pageserver/util.py @@ -2,7 +2,7 @@ Utilities used by all code in this sub-directory """ -from typing import Any, Callable, Dict, Tuple +from typing import Any, Callable, Dict, Optional, Tuple import fixtures.pageserver.many_tenants as many_tenants from fixtures.common_types import TenantId, TimelineId @@ -41,6 +41,7 @@ def setup_pageserver_with_tenants( name: str, n_tenants: int, setup: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]], + timeout_in_seconds: Optional[int] = None, ) -> NeonEnv: """ Utility function to set up a pageserver with a given number of identical tenants. @@ -50,6 +51,6 @@ def setup_pageserver_with_tenants( return many_tenants.single_timeline(neon_env_builder, setup, n_tenants) env = neon_env_builder.build_and_use_snapshot(name, doit) - env.start() + env.start(timeout_in_seconds=timeout_in_seconds) ensure_pageserver_ready_for_benchmarking(env, n_tenants) return env