From 689ad72e92955c8e7f2c3e640b0ae6299fbb9276 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 25 Jan 2024 19:20:02 +0100 Subject: [PATCH] fix(neon_local): leaks child process if it fails to start & pass checks (#6474) refs https://github.com/neondatabase/neon/issues/6473 Before this PR, if process_started() didn't return Ok(true) until we ran out of retries, we'd return an error but leave the process running. Try it by adding a 20s sleep to the pageserver `main()`, e.g., right before we claim the pidfile. Without this PR, output looks like so: ``` (.venv) cs@devvm-mbp:[~/src/neon-work-2]: ./target/debug/neon_local start Starting neon broker at 127.0.0.1:50051. storage_broker started, pid: 2710939 . attachment_service started, pid: 2710949 Starting pageserver node 1 at '127.0.0.1:64000' in ".neon/pageserver_1"..... pageserver has not started yet, continuing to wait..... pageserver 1 start failed: pageserver did not start in 10 seconds No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/pageserver_1/pageserver.pid" No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/safekeepers/sk1/safekeeper.pid" Stopping storage_broker with pid 2710939 immediately....... storage_broker has not stopped yet, continuing to wait..... neon broker stop failed: storage_broker with pid 2710939 did not stop in 10 seconds Stopping attachment_service with pid 2710949 immediately....... attachment_service has not stopped yet, continuing to wait..... attachment service stop failed: attachment_service with pid 2710949 did not stop in 10 seconds ``` and we leak the pageserver process ``` (.venv) cs@devvm-mbp:[~/src/neon-work-2]: ps aux | grep pageserver cs 2710959 0.0 0.2 2377960 47616 pts/4 Sl 14:36 0:00 /home/cs/src/neon-work-2/target/debug/pageserver -D .neon/pageserver_1 -c id=1 -c pg_distrib_dir='/home/cs/src/neon-work-2/pg_install' -c http_auth_type='Trust' -c pg_auth_type='Trust' -c listen_http_addr='127.0.0.1:9898' -c listen_pg_addr='127.0.0.1:64000' -c broker_endpoint='http://127.0.0.1:50051/' -c control_plane_api='http://127.0.0.1:1234/' -c remote_storage={local_path='../local_fs_remote_storage/pageserver'} ``` After this PR, there is no leaked process. --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/attachment_service.rs | 5 ++-- control_plane/src/background_process.rs | 33 +++++++++++++++++-------- control_plane/src/pageserver.rs | 6 ++--- control_plane/src/safekeeper.rs | 3 +-- 6 files changed, 32 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2f31192f0..f0e8b6a0ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1342,6 +1342,7 @@ dependencies = [ "regex", "reqwest", "safekeeper_api", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 898ad05add..75e5dcb7f8 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -19,6 +19,7 @@ hex.workspace = true hyper.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } +scopeguard.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 0a353d8b12..2d43c46270 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -9,7 +9,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use postgres_connection::parse_host_port; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{path::PathBuf, process::Child, str::FromStr}; +use std::{path::PathBuf, str::FromStr}; use tracing::instrument; use utils::{ auth::{Claims, Scope}, @@ -220,7 +220,7 @@ impl AttachmentService { .expect("non-Unicode path") } - pub async fn start(&self) -> anyhow::Result { + pub async fn start(&self) -> anyhow::Result<()> { let path_str = self.path.to_string_lossy(); let mut args = vec!["-l", &self.listen, "-p", &path_str] @@ -254,6 +254,7 @@ impl AttachmentService { ) .await; + // TODO: shouldn't we bail if we fail to spawn the process? for ps_conf in &self.env.pageservers { let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 20fa3af9b8..3ffb8734d0 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -17,7 +17,7 @@ use std::io::Write; use std::os::unix::prelude::AsRawFd; use std::os::unix::process::CommandExt; use std::path::Path; -use std::process::{Child, Command}; +use std::process::Command; use std::time::Duration; use std::{fs, io, thread}; @@ -60,7 +60,7 @@ pub async fn start_process( envs: EI, initial_pid_file: InitialPidFile, process_status_check: F, -) -> anyhow::Result +) -> anyhow::Result<()> where F: Fn() -> Fut, Fut: std::future::Future>, @@ -98,7 +98,7 @@ where InitialPidFile::Expect(path) => path, }; - let mut spawned_process = filled_cmd.spawn().with_context(|| { + let spawned_process = filled_cmd.spawn().with_context(|| { format!("Could not spawn {process_name}, see console output and log files for details.") })?; let pid = spawned_process.id(); @@ -106,12 +106,26 @@ where i32::try_from(pid) .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?, ); + // set up a scopeguard to kill & wait for the child in case we panic or bail below + let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| { + println!("SIGKILL & wait the started process"); + (|| { + // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo). + spawned_process.kill().context("SIGKILL child")?; + spawned_process.wait().context("wait() for child process")?; + anyhow::Ok(()) + })() + .with_context(|| format!("scopeguard kill&wait child {process_name:?}")) + .unwrap(); + }); for retries in 0..RETRIES { match process_started(pid, pid_file_to_check, &process_status_check).await { Ok(true) => { - println!("\n{process_name} started, pid: {pid}"); - return Ok(spawned_process); + println!("\n{process_name} started and passed status check, pid: {pid}"); + // leak the child process, it'll outlive this neon_local invocation + drop(scopeguard::ScopeGuard::into_inner(spawned_process)); + return Ok(()); } Ok(false) => { if retries == NOTICE_AFTER_RETRIES { @@ -126,16 +140,15 @@ where thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS)); } Err(e) => { - println!("{process_name} failed to start: {e:#}"); - if let Err(e) = spawned_process.kill() { - println!("Could not stop {process_name} subprocess: {e:#}") - }; + println!("error starting process {process_name:?}: {e:#}"); return Err(e); } } } println!(); - anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds"); + anyhow::bail!( + "{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds" + ); } /// Stops the process, using the pid file given. Returns Ok also if the process is already not running. diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 18ccf6bd98..1db21c9a37 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -11,7 +11,7 @@ use std::io; use std::io::Write; use std::num::NonZeroU64; use std::path::PathBuf; -use std::process::{Child, Command}; +use std::process::Command; use std::time::Duration; use anyhow::{bail, Context}; @@ -161,7 +161,7 @@ impl PageServerNode { .expect("non-Unicode path") } - pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result { + pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { self.start_node(config_overrides, false).await } @@ -207,7 +207,7 @@ impl PageServerNode { &self, config_overrides: &[&str], update_config: bool, - ) -> anyhow::Result { + ) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); print!( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 4026ef0eb9..6ac71dfe51 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -7,7 +7,6 @@ //! ``` use std::io::Write; use std::path::PathBuf; -use std::process::Child; use std::{io, result}; use anyhow::Context; @@ -104,7 +103,7 @@ impl SafekeeperNode { .expect("non-Unicode path") } - pub async fn start(&self, extra_opts: Vec) -> anyhow::Result { + pub async fn start(&self, extra_opts: Vec) -> anyhow::Result<()> { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(),