diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index d3b0366d31..c5d0e132b6 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -26,7 +26,7 @@ //! //! ```text //! .neon/endpoints/main/ -//! compute.log - log output of `compute_ctl` and `postgres` +//! compute_ctl.log - log output of `compute_ctl` and `postgres` //! endpoint.json - serialized `EndpointConf` struct //! postgresql.conf - postgresql settings //! spec.json - passed to `compute_ctl` @@ -45,6 +45,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; +use camino::Utf8PathBuf; use compute_api::spec::RemoteExtSpec; use nix::sys::signal::kill; use nix::sys::signal::Signal; @@ -53,6 +54,7 @@ use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; use crate::attachment_service::AttachmentService; +use crate::background_process; use crate::local_env::LocalEnv; use crate::postgresql_conf::PostgresConf; @@ -550,89 +552,84 @@ impl Endpoint { let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; - // Open log file. We'll redirect the stdout and stderr of `compute_ctl` to it. - let logfile = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(self.endpoint_path().join("compute.log"))?; - // Launch compute_ctl println!("Starting postgres node at '{}'", self.connstr()); - let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); - cmd.args(["--http-port", &self.http_address.port().to_string()]) - .args(["--pgdata", self.pgdata().to_str().unwrap()]) - .args(["--connstr", &self.connstr()]) - .args([ - "--spec-path", - self.endpoint_path().join("spec.json").to_str().unwrap(), - ]) - .args([ - "--pgbin", - self.env - .pg_bin_dir(self.pg_version)? - .join("postgres") - .to_str() - .unwrap(), - ]) - .stdin(std::process::Stdio::null()) - .stderr(logfile.try_clone()?) - .stdout(logfile); + let pidfile_path: Utf8PathBuf = self + .endpoint_path() + .join("compute_ctl.pid") + .try_into() + .unwrap(); + let mut args = vec![ + "--http-port".to_string(), + self.http_address.port().to_string(), + "--pgdata".to_string(), + self.pgdata().to_str().unwrap().to_string(), + "--connstr".to_string(), + self.connstr().to_string(), + "--spec-path".to_string(), + self.endpoint_path() + .join("spec.json") + .to_str() + .unwrap() + .to_string(), + "--pgbin".to_string(), + self.env + .pg_bin_dir(self.pg_version)? + .join("postgres") + .to_str() + .unwrap() + .to_string(), + ]; if let Some(remote_ext_config) = remote_ext_config { - cmd.args(["--remote-ext-config", remote_ext_config]); + args.extend(["--remote-ext-config".to_string(), remote_ext_config.clone()]); } - let child = cmd.spawn()?; - // Write down the pid so we can wait for it when we want to stop - // TODO use background_process::start_process instead - let pid = child.id(); - let pidfile_path = self.endpoint_path().join("compute_ctl.pid"); - std::fs::write(pidfile_path, pid.to_string())?; - - // Wait for it to start - let mut attempt = 0; - const ATTEMPT_INTERVAL: Duration = Duration::from_millis(100); - const MAX_ATTEMPTS: u32 = 10 * 30; // Wait up to 30 s - loop { - attempt += 1; - match self.get_status().await { - Ok(state) => { - match state.status { - ComputeStatus::Init => { - if attempt == MAX_ATTEMPTS { - bail!("compute startup timed out; still in Init state"); - } - // keep retrying - } - ComputeStatus::Running => { - // All good! - break; - } - ComputeStatus::Failed => { - bail!( - "compute startup failed: {}", - state - .error - .as_deref() - .unwrap_or("") - ); - } - ComputeStatus::Empty - | ComputeStatus::ConfigurationPending - | ComputeStatus::Configuration => { - bail!("unexpected compute status: {:?}", state.status) - } + background_process::start_process( + "compute_ctl", + &self.endpoint_path(), + &self.env.neon_distrib_dir.join("compute_ctl"), + args, + [], + background_process::InitialPidFile::Create(pidfile_path.clone()), + || async { + let st = tokio::time::timeout(Duration::from_secs(1), self.get_status()).await; + let Ok(st) = st else { + // timeout, it's not up yet + return Ok(false); + }; + let Ok(state) = st else { + // unspecified error + return Ok(false); + }; + match state.status { + ComputeStatus::Init => { + // keep retrying + return Ok(false); + } + ComputeStatus::Running => { + // All good! + return Ok(true); + } + ComputeStatus::Failed => { + bail!( + "compute startup failed: {}", + state + .error + .as_deref() + .unwrap_or("") + ); + } + ComputeStatus::Empty + | ComputeStatus::ConfigurationPending + | ComputeStatus::Configuration => { + bail!("unexpected compute status: {:?}", state.status) } } - Err(e) => { - if attempt == MAX_ATTEMPTS { - return Err(e).context("timed out waiting to connect to compute_ctl HTTP"); - } - } - } - std::thread::sleep(ATTEMPT_INTERVAL); - } + }, + ) + .await?; Ok(()) } diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 7822e29ed9..0768768991 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -24,7 +24,7 @@ def wait_caughtup(primary: Endpoint, secondary: Endpoint): # Check for corrupted WAL messages which might otherwise go unnoticed if # reconnection fixes this. def scan_standby_log_for_errors(secondary): - log_path = secondary.endpoint_path() / "compute.log" + log_path = secondary.endpoint_path() / "compute_ctl.log" with log_path.open("r") as f: markers = re.compile( r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr" diff --git a/test_runner/regress/test_migrations.py b/test_runner/regress/test_migrations.py index 121fa91f66..97238699cc 100644 --- a/test_runner/regress/test_migrations.py +++ b/test_runner/regress/test_migrations.py @@ -8,7 +8,7 @@ def test_migrations(neon_simple_env: NeonEnv): env.neon_cli.create_branch("test_migrations", "empty") endpoint = env.endpoints.create("test_migrations") - log_path = endpoint.endpoint_path() / "compute.log" + log_path = endpoint.endpoint_path() / "compute_ctl.log" endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"]) endpoint.start()