diff --git a/Cargo.lock b/Cargo.lock index 326cccaecb..01b8abda9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,12 +317,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "boxfnonce" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5988cb1d626264ac94100be357308f29ff7cbdd3b36bda27f450a4ee3f713426" - [[package]] name = "bstr" version = "1.0.1" @@ -850,16 +844,6 @@ dependencies = [ "syn", ] -[[package]] -name = "daemonize" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70c24513e34f53b640819f0ac9f705b673fcf4006d7aab8778bee72ebfc89815" -dependencies = [ - "boxfnonce", - "libc", -] - [[package]] name = "darling" version = "0.14.1" @@ -2141,7 +2125,6 @@ dependencies = [ "crc32c", "criterion", "crossbeam-utils", - "daemonize", "etcd_broker", "fail", "futures", @@ -3088,7 +3071,6 @@ dependencies = [ "clap 4.0.15", "const_format", "crc32c", - "daemonize", "etcd_broker", "fs2", "git-version", @@ -3096,6 +3078,7 @@ dependencies = [ "humantime", "hyper", "metrics", + "nix 0.25.0", "once_cell", "parking_lot 0.12.1", "postgres", diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs new file mode 100644 index 0000000000..2f8098b7c9 --- /dev/null +++ b/control_plane/src/background_process.rs @@ -0,0 +1,264 @@ +//! Spawns and kills background processes that are needed by Neon CLI. +//! Applies common set-up such as log and pid files (if needed) to every process. +//! +//! Neon CLI does not run in background, so it needs to store the information about +//! spawned processes, which it does in this module. +//! We do that by storing the pid of the process in the "${process_name}.pid" file. +//! The pid file can be created by the process itself +//! (Neon storage binaries do that and also ensure that a lock is taken onto that file) +//! or we create such file after starting the process +//! (non-Neon binaries don't necessarily follow our pidfile conventions). +//! The pid stored in the file is later used to stop the service. +//! +//! See [`lock_file`] module for more info. + +use std::ffi::OsStr; +use std::io::Write; +use std::path::Path; +use std::process::{Child, Command}; +use std::time::Duration; +use std::{fs, io, thread}; + +use anyhow::{anyhow, bail, Context, Result}; +use nix::errno::Errno; +use nix::sys::signal::{kill, Signal}; +use nix::unistd::Pid; + +use utils::lock_file; + +const RETRIES: u32 = 15; +const RETRY_TIMEOUT_MILLIS: u64 = 500; + +/// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates +/// it itself. +pub enum InitialPidFile<'t> { + /// Create a pidfile, to allow future CLI invocations to manipulate the process. + Create(&'t Path), + /// The process will create the pidfile itself, need to wait for that event. + Expect(&'t Path), +} + +/// Start a background child process using the parameters given. +pub fn start_process>( + process_name: &str, + datadir: &Path, + command: &Path, + args: &[S], + initial_pid_file: InitialPidFile, + process_status_check: F, +) -> anyhow::Result +where + F: Fn() -> anyhow::Result, +{ + let log_path = datadir.join(format!("{process_name}.log")); + let process_log_file = fs::OpenOptions::new() + .create(true) + .write(true) + .append(true) + .open(&log_path) + .with_context(|| { + format!("Could not open {process_name} log file {log_path:?} for writing") + })?; + let same_file_for_stderr = process_log_file.try_clone().with_context(|| { + format!("Could not reuse {process_name} log file {log_path:?} for writing stderr") + })?; + + let mut command = Command::new(command); + let background_command = command + .stdout(process_log_file) + .stderr(same_file_for_stderr) + .args(args); + let filled_cmd = fill_aws_secrets_vars(fill_rust_env_vars(background_command)); + + let mut spawned_process = filled_cmd.spawn().with_context(|| { + format!("Could not spawn {process_name}, see console output and log files for details.") + })?; + let pid = spawned_process.id(); + let pid = Pid::from_raw( + i32::try_from(pid) + .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?, + ); + + let pid_file_to_check = match initial_pid_file { + InitialPidFile::Create(target_pid_file_path) => { + match lock_file::create_lock_file(target_pid_file_path, pid.to_string()) { + lock_file::LockCreationResult::Created { .. } => { + // We use "lock" file here only to create the pid file. The lock on the pidfile will be dropped as soon + // as this CLI invocation exits, so it's a bit useless, but doesn't any harm either. + } + lock_file::LockCreationResult::AlreadyLocked { .. } => { + anyhow::bail!("Cannot write pid file for {process_name} at path {target_pid_file_path:?}: file is already locked by another process") + } + lock_file::LockCreationResult::CreationFailed(e) => { + return Err(e.context(format!( + "Failed to create pid file for {process_name} at path {target_pid_file_path:?}" + ))) + } + } + None + } + InitialPidFile::Expect(pid_file_path) => Some(pid_file_path), + }; + + for retries in 0..RETRIES { + match process_started(pid, pid_file_to_check, &process_status_check) { + Ok(true) => { + println!("\n{process_name} started, pid: {pid}"); + return Ok(spawned_process); + } + Ok(false) => { + if retries < 5 { + print!("."); + io::stdout().flush().unwrap(); + } else { + if retries == 5 { + println!() // put a line break after dots for second message + } + println!("{process_name} has not started yet, retrying ({retries})..."); + } + thread::sleep(Duration::from_millis(RETRY_TIMEOUT_MILLIS)); + } + Err(e) => { + println!("{process_name} failed to start: {e:#}"); + if let Err(e) = spawned_process.kill() { + println!("Could not stop {process_name} subprocess: {e:#}") + }; + return Err(e); + } + } + } + anyhow::bail!("{process_name} could not start in {RETRIES} attempts"); +} + +/// Stops the process, using the pid file given. Returns Ok also if the process is already not running. +pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> anyhow::Result<()> { + if !pid_file.exists() { + println!("{process_name} is already stopped: no pid file {pid_file:?} is present"); + return Ok(()); + } + let pid = read_pidfile(pid_file)?; + + let sig = if immediate { + print!("Stopping {process_name} with pid {pid} immediately.."); + Signal::SIGQUIT + } else { + print!("Stopping {process_name} with pid {pid} gracefully.."); + Signal::SIGTERM + }; + io::stdout().flush().unwrap(); + match kill(pid, sig) { + Ok(()) => (), + Err(Errno::ESRCH) => { + println!( + "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found" + ); + return Ok(()); + } + Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"), + } + + // Wait until process is gone + for _ in 0..RETRIES { + match process_has_stopped(pid) { + Ok(true) => { + println!("\n{process_name} stopped"); + if let Err(e) = fs::remove_file(pid_file) { + if e.kind() != io::ErrorKind::NotFound { + eprintln!("Failed to remove pid file {pid_file:?} after stopping the process: {e:#}"); + } + } + return Ok(()); + } + Ok(false) => { + print!("."); + io::stdout().flush().unwrap(); + thread::sleep(Duration::from_secs(1)) + } + Err(e) => { + println!("{process_name} with pid {pid} failed to stop: {e:#}"); + return Err(e); + } + } + } + + anyhow::bail!("{process_name} with pid {pid} failed to stop in {RETRIES} attempts"); +} + +fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { + let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", "1"); + + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + filled_cmd = filled_cmd.env(var, val); + } + + const RUST_LOG_KEY: &str = "RUST_LOG"; + if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) { + filled_cmd.env(RUST_LOG_KEY, rust_log_value) + } else { + filled_cmd + } +} + +fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { + for env_key in [ + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + ] { + if let Ok(value) = std::env::var(env_key) { + cmd = cmd.env(env_key, value); + } + } + cmd +} + +fn process_started( + pid: Pid, + pid_file_to_check: Option<&Path>, + status_check: &F, +) -> anyhow::Result +where + F: Fn() -> anyhow::Result, +{ + match status_check() { + Ok(true) => match pid_file_to_check { + Some(pid_file_path) => { + if pid_file_path.exists() { + let pid_in_file = read_pidfile(pid_file_path)?; + Ok(pid_in_file == pid) + } else { + Ok(false) + } + } + None => Ok(true), + }, + Ok(false) => Ok(false), + Err(e) => anyhow::bail!("process failed to start: {e}"), + } +} + +/// Read a PID file +/// +/// We expect a file that contains a single integer. +fn read_pidfile(pidfile: &Path) -> Result { + let pid_str = fs::read_to_string(pidfile) + .with_context(|| format!("failed to read pidfile {pidfile:?}"))?; + let pid: i32 = pid_str + .parse() + .map_err(|_| anyhow!("failed to parse pidfile {pidfile:?}"))?; + if pid < 1 { + bail!("pidfile {pidfile:?} contained bad value '{pid}'"); + } + Ok(Pid::from_raw(pid)) +} + +fn process_has_stopped(pid: Pid) -> anyhow::Result { + match kill(pid, None) { + // Process exists, keep waiting + Ok(_) => Ok(false), + // Process not found, we're done + Err(Errno::ESRCH) => Ok(true), + Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"), + } +} diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 70a2c97a9e..42a9199037 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -9,8 +9,8 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use control_plane::compute::ComputeControlPlane; use control_plane::local_env::{EtcdBroker, LocalEnv}; +use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; -use control_plane::storage::PageServerNode; use control_plane::{etcd, local_env}; use pageserver_api::models::TimelineInfo; use pageserver_api::{ diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 89e4e85eb0..359948a8c9 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -18,8 +18,8 @@ use utils::{ }; use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION}; +use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; -use crate::storage::PageServerNode; // // ComputeControlPlane diff --git a/control_plane/src/etcd.rs b/control_plane/src/etcd.rs index ca2df8a50b..4c15914e24 100644 --- a/control_plane/src/etcd.rs +++ b/control_plane/src/etcd.rs @@ -1,99 +1,75 @@ -use std::{ - fs, - path::PathBuf, - process::{Command, Stdio}, -}; +use std::{fs, path::PathBuf}; use anyhow::Context; -use nix::{ - sys::signal::{kill, Signal}, - unistd::Pid, -}; -use crate::{local_env, read_pidfile}; +use crate::{background_process, local_env}; pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { let etcd_broker = &env.etcd_broker; println!( - "Starting etcd broker using {}", - etcd_broker.etcd_binary_path.display() + "Starting etcd broker using {:?}", + etcd_broker.etcd_binary_path ); let etcd_data_dir = env.base_data_dir.join("etcd"); - fs::create_dir_all(&etcd_data_dir).with_context(|| { - format!( - "Failed to create etcd data dir: {}", - etcd_data_dir.display() - ) - })?; + fs::create_dir_all(&etcd_data_dir) + .with_context(|| format!("Failed to create etcd data dir {etcd_data_dir:?}"))?; - let etcd_stdout_file = - fs::File::create(etcd_data_dir.join("etcd.stdout.log")).with_context(|| { - format!( - "Failed to create etcd stout file in directory {}", - etcd_data_dir.display() - ) - })?; - let etcd_stderr_file = - fs::File::create(etcd_data_dir.join("etcd.stderr.log")).with_context(|| { - format!( - "Failed to create etcd stderr file in directory {}", - etcd_data_dir.display() - ) - })?; let client_urls = etcd_broker.comma_separated_endpoints(); + let args = [ + format!("--data-dir={}", etcd_data_dir.display()), + format!("--listen-client-urls={client_urls}"), + format!("--advertise-client-urls={client_urls}"), + // Set --quota-backend-bytes to keep the etcd virtual memory + // size smaller. Our test etcd clusters are very small. + // See https://github.com/etcd-io/etcd/issues/7910 + "--quota-backend-bytes=100000000".to_string(), + // etcd doesn't compact (vacuum) with default settings, + // enable it to prevent space exhaustion. + "--auto-compaction-mode=revision".to_string(), + "--auto-compaction-retention=1".to_string(), + ]; - let etcd_process = Command::new(&etcd_broker.etcd_binary_path) - .args(&[ - format!("--data-dir={}", etcd_data_dir.display()), - format!("--listen-client-urls={client_urls}"), - format!("--advertise-client-urls={client_urls}"), - // Set --quota-backend-bytes to keep the etcd virtual memory - // size smaller. Our test etcd clusters are very small. - // See https://github.com/etcd-io/etcd/issues/7910 - "--quota-backend-bytes=100000000".to_string(), - // etcd doesn't compact (vacuum) with default settings, - // enable it to prevent space exhaustion. - "--auto-compaction-mode=revision".to_string(), - "--auto-compaction-retention=1".to_string(), - ]) - .stdout(Stdio::from(etcd_stdout_file)) - .stderr(Stdio::from(etcd_stderr_file)) - .spawn() - .context("Failed to spawn etcd subprocess")?; - let pid = etcd_process.id(); + let pid_file_path = etcd_pid_file_path(env); - let etcd_pid_file_path = etcd_pid_file_path(env); - fs::write(&etcd_pid_file_path, pid.to_string()).with_context(|| { - format!( - "Failed to create etcd pid file at {}", - etcd_pid_file_path.display() - ) - })?; + let client = reqwest::blocking::Client::new(); + + background_process::start_process( + "etcd", + &etcd_data_dir, + &etcd_broker.etcd_binary_path, + &args, + background_process::InitialPidFile::Create(&pid_file_path), + || { + for broker_endpoint in &etcd_broker.broker_endpoints { + let request = broker_endpoint + .join("health") + .with_context(|| { + format!( + "Failed to append /health path to broker endopint {}", + broker_endpoint + ) + }) + .and_then(|url| { + client.get(&url.to_string()).build().with_context(|| { + format!("Failed to construct request to etcd endpoint {url}") + }) + })?; + if client.execute(request).is_ok() { + return Ok(true); + } + } + + Ok(false) + }, + ) + .context("Failed to spawn etcd subprocess")?; Ok(()) } pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { - let etcd_path = &env.etcd_broker.etcd_binary_path; - println!("Stopping etcd broker at {}", etcd_path.display()); - - let etcd_pid_file_path = etcd_pid_file_path(env); - let pid = Pid::from_raw(read_pidfile(&etcd_pid_file_path).with_context(|| { - format!( - "Failed to read etcd pid file at {}", - etcd_pid_file_path.display() - ) - })?); - - kill(pid, Signal::SIGTERM).with_context(|| { - format!( - "Failed to stop etcd with pid {pid} at {}", - etcd_pid_file_path.display() - ) - })?; - - Ok(()) + background_process::stop_process(true, "etcd", &etcd_pid_file_path(env)) } fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf { diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index f22dce1810..c3b47fe81b 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -6,60 +6,12 @@ // Intended to be used in integration tests and in CLI tools for // local installations. // -use anyhow::{anyhow, bail, Context, Result}; -use std::fs; -use std::path::Path; -use std::process::Command; +mod background_process; pub mod compute; pub mod connection; pub mod etcd; pub mod local_env; +pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; -pub mod storage; - -/// Read a PID file -/// -/// We expect a file that contains a single integer. -/// We return an i32 for compatibility with libc and nix. -pub fn read_pidfile(pidfile: &Path) -> Result { - let pid_str = fs::read_to_string(pidfile) - .with_context(|| format!("failed to read pidfile {:?}", pidfile))?; - let pid: i32 = pid_str - .parse() - .map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?; - if pid < 1 { - bail!("pidfile {:?} contained bad value '{}'", pidfile, pid); - } - Ok(pid) -} - -fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { - let cmd = cmd.env_clear().env("RUST_BACKTRACE", "1"); - - let var = "LLVM_PROFILE_FILE"; - if let Some(val) = std::env::var_os(var) { - cmd.env(var, val); - } - - const RUST_LOG_KEY: &str = "RUST_LOG"; - if let Ok(rust_log_value) = std::env::var(RUST_LOG_KEY) { - cmd.env(RUST_LOG_KEY, rust_log_value) - } else { - cmd - } -} - -fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { - for env_key in [ - "AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY", - "AWS_SESSION_TOKEN", - ] { - if let Ok(value) = std::env::var(env_key) { - cmd = cmd.env(env_key, value); - } - } - cmd -} diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 10b2db6396..ac4ebd0d1e 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -226,12 +226,12 @@ impl LocalEnv { } } - pub fn pageserver_bin(&self) -> anyhow::Result { - Ok(self.neon_distrib_dir.join("pageserver")) + pub fn pageserver_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("pageserver") } - pub fn safekeeper_bin(&self) -> anyhow::Result { - Ok(self.neon_distrib_dir.join("safekeeper")) + pub fn safekeeper_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("safekeeper") } pub fn pg_data_dirs_path(&self) -> PathBuf { diff --git a/control_plane/src/storage.rs b/control_plane/src/pageserver.rs similarity index 80% rename from control_plane/src/storage.rs rename to control_plane/src/pageserver.rs index 4b705690f0..fa6d1e496a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/pageserver.rs @@ -1,17 +1,13 @@ use std::collections::HashMap; -use std::fs::File; +use std::fs::{self, File}; use std::io::{BufReader, Write}; use std::num::NonZeroU64; use std::path::{Path, PathBuf}; -use std::process::Command; -use std::time::Duration; -use std::{io, result, thread}; +use std::process::Child; +use std::{io, result}; use crate::connection::PgConnectionConfig; use anyhow::{bail, Context}; -use nix::errno::Errno; -use nix::sys::signal::{kill, Signal}; -use nix::unistd::Pid; use pageserver_api::models::{ TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo, }; @@ -25,8 +21,7 @@ use utils::{ postgres_backend::AuthType, }; -use crate::local_env::LocalEnv; -use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; +use crate::{background_process, local_env::LocalEnv}; #[derive(Error, Debug)] pub enum PageserverHttpError { @@ -160,7 +155,15 @@ impl PageServerNode { init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'"); } - self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?; + let mut pageserver_process = self + .start_node(&init_config_overrides, &self.env.base_data_dir, true) + .with_context(|| { + format!( + "Failed to start a process for pageserver {}", + self.env.pageserver.id, + ) + })?; + let init_result = self .try_init_timeline(create_tenant, initial_timeline_id, pg_version) .context("Failed to create initial tenant and timeline for pageserver"); @@ -170,7 +173,29 @@ impl PageServerNode { } Err(e) => eprintln!("{e:#}"), } - self.stop(false)?; + match pageserver_process.kill() { + Err(e) => { + eprintln!( + "Failed to stop pageserver {} process with pid {}: {e:#}", + self.env.pageserver.id, + pageserver_process.id(), + ) + } + Ok(()) => { + println!( + "Stopped pageserver {} process with pid {}", + self.env.pageserver.id, + pageserver_process.id(), + ); + // cleanup after pageserver startup, since we do not call regular `stop_process` during init + let pid_file = self.pid_file(); + if let Err(e) = fs::remove_file(&pid_file) { + if e.kind() != io::ErrorKind::NotFound { + eprintln!("Failed to remove pid file {pid_file:?} after stopping the process: {e:#}"); + } + } + } + } init_result } @@ -195,11 +220,14 @@ impl PageServerNode { self.env.pageserver_data_dir() } - pub fn pid_file(&self) -> PathBuf { + /// The pid file is created by the pageserver process, with its pid stored inside. + /// Other pageservers cannot lock the same file and overwrite it for as long as the current + /// pageserver runs. (Unless someone removes the file manually; never do that!) + fn pid_file(&self) -> PathBuf { self.repo_path().join("pageserver.pid") } - pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> { + pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result { self.start_node(config_overrides, &self.repo_path(), false) } @@ -208,7 +236,7 @@ impl PageServerNode { config_overrides: &[&str], datadir: &Path, update_config: bool, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { println!( "Starting pageserver at '{}' in '{}'", self.pg_connection_config.raw_address(), @@ -219,10 +247,7 @@ impl PageServerNode { let mut args = vec![ "-D", datadir.to_str().with_context(|| { - format!( - "Datadir path '{}' cannot be represented as a unicode string", - datadir.display() - ) + format!("Datadir path {datadir:?} cannot be represented as a unicode string") })?, ]; @@ -234,48 +259,18 @@ impl PageServerNode { args.extend(["-c", config_override]); } - let mut cmd = Command::new(self.env.pageserver_bin()?); - let mut filled_cmd = fill_rust_env_vars(cmd.args(&args).arg("--daemonize")); - filled_cmd = fill_aws_secrets_vars(filled_cmd); - - if !filled_cmd.status()?.success() { - bail!( - "Pageserver failed to start. See console output and '{}' for details.", - datadir.join("pageserver.log").display() - ); - } - - // It takes a while for the page server to start up. Wait until it is - // open for business. - const RETRIES: i8 = 15; - for retries in 1..RETRIES { - match self.check_status() { - Ok(()) => { - println!("\nPageserver started"); - return Ok(()); - } - Err(err) => { - match err { - PageserverHttpError::Transport(err) => { - if err.is_connect() && retries < 5 { - print!("."); - io::stdout().flush().unwrap(); - } else { - if retries == 5 { - println!() // put a line break after dots for second message - } - println!("Pageserver not responding yet, err {err} retrying ({retries})..."); - } - } - PageserverHttpError::Response(msg) => { - bail!("pageserver failed to start: {msg} ") - } - } - thread::sleep(Duration::from_secs(1)); - } - } - } - bail!("pageserver failed to start in {RETRIES} seconds"); + background_process::start_process( + "pageserver", + datadir, + &self.env.pageserver_bin(), + &args, + background_process::InitialPidFile::Expect(&self.pid_file()), + || match self.check_status() { + Ok(()) => Ok(true), + Err(PageserverHttpError::Transport(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + }, + ) } /// @@ -287,58 +282,7 @@ impl PageServerNode { /// If the server is not running, returns success /// pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { - let pid_file = self.pid_file(); - if !pid_file.exists() { - println!("Pageserver is already stopped"); - return Ok(()); - } - let pid = Pid::from_raw(read_pidfile(&pid_file)?); - - let sig = if immediate { - print!("Stopping pageserver immediately.."); - Signal::SIGQUIT - } else { - print!("Stopping pageserver gracefully.."); - Signal::SIGTERM - }; - io::stdout().flush().unwrap(); - match kill(pid, sig) { - Ok(_) => (), - Err(Errno::ESRCH) => { - println!("Pageserver with pid {pid} does not exist, but a PID file was found"); - return Ok(()); - } - Err(err) => bail!( - "Failed to send signal to pageserver with pid {pid}: {}", - err.desc() - ), - } - - // Wait until process is gone - for i in 0..600 { - let signal = None; // Send no signal, just get the error code - match kill(pid, signal) { - Ok(_) => (), // Process exists, keep waiting - Err(Errno::ESRCH) => { - // Process not found, we're done - println!("done!"); - return Ok(()); - } - Err(err) => bail!( - "Failed to send signal to pageserver with pid {}: {}", - pid, - err.desc() - ), - }; - - if i % 10 == 0 { - print!("."); - io::stdout().flush().unwrap(); - } - thread::sleep(Duration::from_millis(100)); - } - - bail!("Failed to stop pageserver with pid {pid}"); + background_process::stop_process(immediate, "pageserver", &self.pid_file()) } pub fn page_server_psql(&self, sql: &str) -> Vec { diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 91cedeca23..0bc35b3680 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -1,23 +1,21 @@ use std::io::Write; use std::path::PathBuf; -use std::process::Command; +use std::process::Child; use std::sync::Arc; -use std::time::Duration; -use std::{io, result, thread}; +use std::{io, result}; -use anyhow::bail; -use nix::errno::Errno; -use nix::sys::signal::{kill, Signal}; -use nix::unistd::Pid; +use anyhow::Context; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use utils::{http::error::HttpErrorBody, id::NodeId}; use crate::connection::PgConnectionConfig; -use crate::local_env::{LocalEnv, SafekeeperConf}; -use crate::storage::PageServerNode; -use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; +use crate::pageserver::PageServerNode; +use crate::{ + background_process, + local_env::{LocalEnv, SafekeeperConf}, +}; #[derive(Error, Debug)] pub enum SafekeeperHttpError { @@ -95,7 +93,7 @@ impl SafekeeperNode { } pub fn datadir_path_by_id(env: &LocalEnv, sk_id: NodeId) -> PathBuf { - env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref()) + env.safekeeper_data_dir(&format!("sk{sk_id}")) } pub fn datadir_path(&self) -> PathBuf { @@ -106,7 +104,7 @@ impl SafekeeperNode { self.datadir_path().join("safekeeper.pid") } - pub fn start(&self) -> anyhow::Result<()> { + pub fn start(&self) -> anyhow::Result { print!( "Starting safekeeper at '{}' in '{}'", self.pg_connection_config.raw_address(), @@ -116,81 +114,68 @@ impl SafekeeperNode { let listen_pg = format!("127.0.0.1:{}", self.conf.pg_port); let listen_http = format!("127.0.0.1:{}", self.conf.http_port); + let id = self.id; + let datadir = self.datadir_path(); - let mut cmd = Command::new(self.env.safekeeper_bin()?); - fill_rust_env_vars( - cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) - .args(&["--id", self.id.to_string().as_ref()]) - .args(&["--listen-pg", &listen_pg]) - .args(&["--listen-http", &listen_http]) - .arg("--daemonize"), - ); + let id_string = id.to_string(); + let mut args = vec![ + "-D", + datadir.to_str().with_context(|| { + format!("Datadir path {datadir:?} cannot be represented as a unicode string") + })?, + "--id", + &id_string, + "--listen-pg", + &listen_pg, + "--listen-http", + &listen_http, + ]; if !self.conf.sync { - cmd.arg("--no-sync"); + args.push("--no-sync"); } let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints(); if !comma_separated_endpoints.is_empty() { - cmd.args(&["--broker-endpoints", &comma_separated_endpoints]); + args.extend(["--broker-endpoints", &comma_separated_endpoints]); } if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() { - cmd.args(&["--broker-etcd-prefix", prefix]); + args.extend(["--broker-etcd-prefix", prefix]); } + + let mut backup_threads = String::new(); if let Some(threads) = self.conf.backup_threads { - cmd.args(&["--backup-threads", threads.to_string().as_ref()]); + backup_threads = threads.to_string(); + args.extend(["--backup-threads", &backup_threads]); + } else { + drop(backup_threads); } + if let Some(ref remote_storage) = self.conf.remote_storage { - cmd.args(&["--remote-storage", remote_storage]); + args.extend(["--remote-storage", remote_storage]); } + + let key_path = self.env.base_data_dir.join("auth_public_key.pem"); if self.conf.auth_enabled { - cmd.arg("--auth-validation-public-key-path"); - // PathBuf is better be passed as is, not via `String`. - cmd.arg(self.env.base_data_dir.join("auth_public_key.pem")); + args.extend([ + "--auth-validation-public-key-path", + key_path.to_str().with_context(|| { + format!("Key path {key_path:?} cannot be represented as a unicode string") + })?, + ]); } - fill_aws_secrets_vars(&mut cmd); - - if !cmd.status()?.success() { - bail!( - "Safekeeper failed to start. See '{}' for details.", - self.datadir_path().join("safekeeper.log").display() - ); - } - - // It takes a while for the safekeeper to start up. Wait until it is - // open for business. - const RETRIES: i8 = 15; - for retries in 1..RETRIES { - match self.check_status() { - Ok(_) => { - println!("\nSafekeeper started"); - return Ok(()); - } - Err(err) => { - match err { - SafekeeperHttpError::Transport(err) => { - if err.is_connect() && retries < 5 { - print!("."); - io::stdout().flush().unwrap(); - } else { - if retries == 5 { - println!() // put a line break after dots for second message - } - println!( - "Safekeeper not responding yet, err {} retrying ({})...", - err, retries - ); - } - } - SafekeeperHttpError::Response(msg) => { - bail!("safekeeper failed to start: {} ", msg) - } - } - thread::sleep(Duration::from_secs(1)); - } - } - } - bail!("safekeeper failed to start in {} seconds", RETRIES); + background_process::start_process( + &format!("safekeeper {id}"), + &datadir, + &self.env.safekeeper_bin(), + &args, + background_process::InitialPidFile::Expect(&self.pid_file()), + || match self.check_status() { + Ok(()) => Ok(true), + Err(SafekeeperHttpError::Transport(_)) => Ok(false), + Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")), + }, + ) } /// @@ -202,63 +187,11 @@ impl SafekeeperNode { /// If the server is not running, returns success /// pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { - let pid_file = self.pid_file(); - if !pid_file.exists() { - println!("Safekeeper {} is already stopped", self.id); - return Ok(()); - } - let pid = read_pidfile(&pid_file)?; - let pid = Pid::from_raw(pid); - - let sig = if immediate { - print!("Stopping safekeeper {} immediately..", self.id); - Signal::SIGQUIT - } else { - print!("Stopping safekeeper {} gracefully..", self.id); - Signal::SIGTERM - }; - io::stdout().flush().unwrap(); - match kill(pid, sig) { - Ok(_) => (), - Err(Errno::ESRCH) => { - println!( - "Safekeeper with pid {} does not exist, but a PID file was found", - pid - ); - return Ok(()); - } - Err(err) => bail!( - "Failed to send signal to safekeeper with pid {}: {}", - pid, - err.desc() - ), - } - - // Wait until process is gone - for i in 0..600 { - let signal = None; // Send no signal, just get the error code - match kill(pid, signal) { - Ok(_) => (), // Process exists, keep waiting - Err(Errno::ESRCH) => { - // Process not found, we're done - println!("done!"); - return Ok(()); - } - Err(err) => bail!( - "Failed to send signal to pageserver with pid {}: {}", - pid, - err.desc() - ), - }; - - if i % 10 == 0 { - print!("."); - io::stdout().flush().unwrap(); - } - thread::sleep(Duration::from_millis(100)); - } - - bail!("Failed to stop safekeeper with pid {}", pid); + background_process::stop_process( + immediate, + &format!("safekeeper {}", self.id), + &self.pid_file(), + ) } fn http_request(&self, method: Method, url: U) -> RequestBuilder { diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index aff86c8076..6f51465609 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -36,6 +36,8 @@ pub mod sock_split; // common log initialisation routine pub mod logging; +pub mod lock_file; + // Misc pub mod accum; pub mod shutdown; diff --git a/libs/utils/src/lock_file.rs b/libs/utils/src/lock_file.rs new file mode 100644 index 0000000000..4fef65852b --- /dev/null +++ b/libs/utils/src/lock_file.rs @@ -0,0 +1,81 @@ +//! A module to create and read lock files. A lock file ensures that only one +//! process is running at a time, in a particular directory. +//! +//! File locking is done using [`fcntl::flock`], which means that holding the +//! lock on file only prevents acquiring another lock on it; all other +//! operations are still possible on files. Other process can still open, read, +//! write, or remove the file, for example. +//! If the file is removed while a process is holding a lock on it, +//! the process that holds the lock does not get any error or notification. +//! Furthermore, you can create a new file with the same name and lock the new file, +//! while the old process is still running. +//! Deleting the lock file while the locking process is still running is a bad idea! + +use std::{fs, os::unix::prelude::AsRawFd, path::Path}; + +use anyhow::Context; +use nix::fcntl; + +use crate::crashsafe; + +pub enum LockCreationResult { + Created { + new_lock_contents: String, + file: fs::File, + }, + AlreadyLocked { + existing_lock_contents: String, + }, + CreationFailed(anyhow::Error), +} + +/// Creates a lock file in the path given and writes the given contents into the file. +/// Note: The lock is automatically released when the file closed. You might want to use Box::leak to make sure it lives until the end of the program. +pub fn create_lock_file(lock_file_path: &Path, contents: String) -> LockCreationResult { + let lock_file = match fs::OpenOptions::new() + .create(true) // O_CREAT + .write(true) + .open(lock_file_path) + .context("Failed to open lock file") + { + Ok(file) => file, + Err(e) => return LockCreationResult::CreationFailed(e), + }; + + match fcntl::flock( + lock_file.as_raw_fd(), + fcntl::FlockArg::LockExclusiveNonblock, + ) { + Ok(()) => { + match lock_file + .set_len(0) + .context("Failed to truncate lockfile") + .and_then(|()| { + fs::write(lock_file_path, &contents).with_context(|| { + format!("Failed to write '{contents}' contents into lockfile") + }) + }) + .and_then(|()| { + crashsafe::fsync_file_and_parent(lock_file_path) + .context("Failed to fsync lockfile") + }) { + Ok(()) => LockCreationResult::Created { + new_lock_contents: contents, + file: lock_file, + }, + Err(e) => LockCreationResult::CreationFailed(e), + } + } + Err(nix::errno::Errno::EAGAIN) => { + match fs::read_to_string(lock_file_path).context("Failed to read lockfile contents") { + Ok(existing_lock_contents) => LockCreationResult::AlreadyLocked { + existing_lock_contents, + }, + Err(e) => LockCreationResult::CreationFailed(e), + } + } + Err(e) => { + LockCreationResult::CreationFailed(anyhow::anyhow!("Failed to lock lockfile: {e}")) + } + } +} diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 31c0e02f98..3b1a1f5aff 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,10 +1,6 @@ -use std::{ - fs::{File, OpenOptions}, - path::Path, - str::FromStr, -}; +use std::str::FromStr; -use anyhow::{Context, Result}; +use anyhow::Context; use strum_macros::{EnumString, EnumVariantNames}; #[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)] @@ -25,19 +21,8 @@ impl LogFormat { }) } } -pub fn init( - log_filename: impl AsRef, - daemonize: bool, - log_format: LogFormat, -) -> Result { - // Don't open the same file for output multiple times; - // the different fds could overwrite each other's output. - let log_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_filename) - .with_context(|| format!("failed to open {:?}", log_filename.as_ref()))?; +pub fn init(log_format: LogFormat) -> anyhow::Result<()> { let default_filter_str = "info"; // We fall back to printing all spans at info-level or above if @@ -45,50 +30,16 @@ pub fn init( let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str)); - let x: File = log_file.try_clone().unwrap(); let base_logger = tracing_subscriber::fmt() .with_env_filter(env_filter) .with_target(false) .with_ansi(false) - .with_writer(move || -> Box { - // we are cloning and returning log file in order to allow redirecting daemonized stdout and stderr to it - // if we do not use daemonization (e.g. in docker) it is better to log to stdout directly - // for example to be in line with docker log command which expects logs comimg from stdout - if daemonize { - Box::new(x.try_clone().unwrap()) - } else { - Box::new(std::io::stdout()) - } - }); + .with_writer(std::io::stdout); match log_format { LogFormat::Json => base_logger.json().init(), LogFormat::Plain => base_logger.init(), } - Ok(log_file) -} - -// #[cfg(test)] -// Due to global logger, can't run tests in same process. -// So until there's a non-global one, the tests are in ../tests/ as separate files. -#[macro_export(local_inner_macros)] -macro_rules! test_init_file_logger { - ($log_level:expr, $log_format:expr) => {{ - use std::str::FromStr; - std::env::set_var("RUST_LOG", $log_level); - - let tmp_dir = tempfile::TempDir::new().unwrap(); - let log_file_path = tmp_dir.path().join("logfile"); - - let log_format = $crate::logging::LogFormat::from_str($log_format).unwrap(); - let _log_file = $crate::logging::init(&log_file_path, true, log_format).unwrap(); - - let log_file = std::fs::OpenOptions::new() - .read(true) - .open(&log_file_path) - .unwrap(); - - log_file - }}; + Ok(()) } diff --git a/libs/utils/tests/logger_json_test.rs b/libs/utils/tests/logger_json_test.rs deleted file mode 100644 index 5d63b9b004..0000000000 --- a/libs/utils/tests/logger_json_test.rs +++ /dev/null @@ -1,36 +0,0 @@ -// This could be in ../src/logging.rs but since the logger is global, these -// can't be run in threads of the same process -use std::fs::File; -use std::io::{BufRead, BufReader, Lines}; -use tracing::*; -use utils::test_init_file_logger; - -fn read_lines(file: File) -> Lines> { - BufReader::new(file).lines() -} - -#[test] -fn test_json_format_has_message_and_custom_field() { - std::env::set_var("RUST_LOG", "info"); - - let log_file = test_init_file_logger!("info", "json"); - - let custom_field: &str = "hi"; - trace!(custom = %custom_field, "test log message"); - debug!(custom = %custom_field, "test log message"); - info!(custom = %custom_field, "test log message"); - warn!(custom = %custom_field, "test log message"); - error!(custom = %custom_field, "test log message"); - - let lines = read_lines(log_file); - for line in lines { - let content = line.unwrap(); - let json_object = serde_json::from_str::(&content).unwrap(); - - assert_eq!(json_object["fields"]["custom"], "hi"); - assert_eq!(json_object["fields"]["message"], "test log message"); - - assert_ne!(json_object["level"], "TRACE"); - assert_ne!(json_object["level"], "DEBUG"); - } -} diff --git a/libs/utils/tests/logger_plain_test.rs b/libs/utils/tests/logger_plain_test.rs deleted file mode 100644 index bc5abf45dd..0000000000 --- a/libs/utils/tests/logger_plain_test.rs +++ /dev/null @@ -1,36 +0,0 @@ -// This could be in ../src/logging.rs but since the logger is global, these -// can't be run in threads of the same process -use std::fs::File; -use std::io::{BufRead, BufReader, Lines}; -use tracing::*; -use utils::test_init_file_logger; - -fn read_lines(file: File) -> Lines> { - BufReader::new(file).lines() -} - -#[test] -fn test_plain_format_has_message_and_custom_field() { - std::env::set_var("RUST_LOG", "warn"); - - let log_file = test_init_file_logger!("warn", "plain"); - - let custom_field: &str = "hi"; - trace!(custom = %custom_field, "test log message"); - debug!(custom = %custom_field, "test log message"); - info!(custom = %custom_field, "test log message"); - warn!(custom = %custom_field, "test log message"); - error!(custom = %custom_field, "test log message"); - - let lines = read_lines(log_file); - for line in lines { - let content = line.unwrap(); - serde_json::from_str::(&content).unwrap_err(); - assert!(content.contains("custom=hi")); - assert!(content.contains("test log message")); - - assert!(!content.contains("TRACE")); - assert!(!content.contains("DEBUG")); - assert!(!content.contains("INFO")); - } -} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index b075b86aa1..4262ca9820 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -24,7 +24,6 @@ hex = "0.4.3" hyper = "0.14" itertools = "0.10.3" clap = { version = "4.0", features = ["string"] } -daemonize = "0.4.1" tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } tokio-util = { version = "0.7.3", features = ["io", "io-util"] } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 802352be90..62119b51c6 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -1,17 +1,14 @@ //! Main entry point for the Page Server executable. -use remote_storage::GenericRemoteStorage; use std::{env, ops::ControlFlow, path::Path, str::FromStr}; + +use anyhow::{anyhow, Context}; +use clap::{Arg, ArgAction, Command}; +use fail::FailScenario; +use nix::unistd::Pid; use tracing::*; -use anyhow::{anyhow, bail, Context, Result}; - -use clap::{Arg, ArgAction, Command}; -use daemonize::Daemonize; - -use fail::FailScenario; use metrics::set_build_info_metric; - use pageserver::{ config::{defaults::*, PageServerConf}, http, page_cache, page_service, profiling, task_mgr, @@ -19,20 +16,22 @@ use pageserver::{ task_mgr::{ BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME, }, - tenant_mgr, virtual_file, LOG_FILE_NAME, + tenant_mgr, virtual_file, }; +use remote_storage::GenericRemoteStorage; use utils::{ auth::JwtAuth, - logging, + lock_file, logging, postgres_backend::AuthType, project_git_version, - shutdown::exit_now, signals::{self, Signal}, tcp_listener, }; project_git_version!(GIT_VERSION); +const PID_FILE_NAME: &str = "pageserver.pid"; + const FEATURES: &[&str] = &[ #[cfg(feature = "testing")] "testing", @@ -65,6 +64,7 @@ fn main() -> anyhow::Result<()> { let workdir = workdir .canonicalize() .with_context(|| format!("Error opening workdir '{}'", workdir.display()))?; + let cfg_file_path = workdir.join("pageserver.toml"); // Set CWD to workdir for non-daemon modes @@ -75,8 +75,6 @@ fn main() -> anyhow::Result<()> { ) })?; - let daemonize = arg_matches.get_flag("daemonize"); - let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? { ControlFlow::Continue(conf) => conf, ControlFlow::Break(()) => { @@ -102,7 +100,7 @@ fn main() -> anyhow::Result<()> { virtual_file::init(conf.max_file_descriptors); page_cache::init(conf.page_cache_size); - start_pageserver(conf, daemonize).context("Failed to start pageserver")?; + start_pageserver(conf).context("Failed to start pageserver")?; scenario.teardown(); Ok(()) @@ -197,12 +195,34 @@ fn initialize_config( }) } -fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()> { - // Initialize logger - let log_file = logging::init(LOG_FILE_NAME, daemonize, conf.log_format)?; - +fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { + logging::init(conf.log_format)?; info!("version: {}", version()); + let lock_file_path = conf.workdir.join(PID_FILE_NAME); + let lock_file = match lock_file::create_lock_file(&lock_file_path, Pid::this().to_string()) { + lock_file::LockCreationResult::Created { + new_lock_contents, + file, + } => { + info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}"); + file + } + lock_file::LockCreationResult::AlreadyLocked { + existing_lock_contents, + } => anyhow::bail!( + "Could not lock pid file; pageserver is already running in {:?} with PID {}", + conf.workdir, + existing_lock_contents + ), + lock_file::LockCreationResult::CreationFailed(e) => { + return Err(e.context(format!("Failed to create lock file at {lock_file_path:?}"))) + } + }; + // ensure that the lock file is held even if the main thread of the process is panics + // we need to release the lock file only when the current process is gone + let _ = Box::leak(Box::new(lock_file)); + // TODO: Check that it looks like a valid repository before going further // bind sockets before daemonizing so we report errors early and do not return until we are listening @@ -218,33 +238,6 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() ); let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?; - // NB: Don't spawn any threads before daemonizing! - if daemonize { - info!("daemonizing..."); - - // There shouldn't be any logging to stdin/stdout. Redirect it to the main log so - // that we will see any accidental manual fprintf's or backtraces. - let stdout = log_file - .try_clone() - .with_context(|| format!("Failed to clone log file '{:?}'", log_file))?; - let stderr = log_file; - - let daemonize = Daemonize::new() - .pid_file("pageserver.pid") - .working_directory(".") - .stdout(stdout) - .stderr(stderr); - - // XXX: The parent process should exit abruptly right after - // it has spawned a child to prevent coverage machinery from - // dumping stats into a `profraw` file now owned by the child. - // Otherwise, the coverage data will be damaged. - match daemonize.exit_action(|| exit_now(0)).start() { - Ok(_) => info!("Success, daemonized"), - Err(err) => bail!("{err}. could not daemonize. bailing."), - } - } - let signals = signals::install_shutdown_handlers()?; // start profiler (if enabled) @@ -347,14 +340,6 @@ fn cli() -> Command { Command::new("Neon page server") .about("Materializes WAL stream to pages and serves them to the postgres") .version(version()) - .arg( - - Arg::new("daemonize") - .short('d') - .long("daemonize") - .action(ArgAction::SetTrue) - .help("Run in the background"), - ) .arg( Arg::new("init") .long("init") diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 52a4cb0381..11be649e9f 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -43,8 +43,6 @@ pub const DEFAULT_PG_VERSION: u32 = 14; pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; pub const DELTA_FILE_MAGIC: u16 = 0x5A61; -pub const LOG_FILE_NAME: &str = "pageserver.log"; - static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]); /// Config for the Repository checkpointer @@ -81,7 +79,6 @@ pub async fn shutdown_pageserver(exit_code: i32) { // There should be nothing left, but let's be sure task_mgr::shutdown_tasks(None, None, None).await; - info!("Shut down successfully completed"); std::process::exit(exit_code); } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 39dccf2eba..e21ec4d742 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -671,10 +671,6 @@ impl PostgresRedoProcess { // The Rust standard library makes sure to mark any file descriptors with // as close-on-exec by default, but that's not enough, since we use // libraries that directly call libc open without setting that flag. - // - // One example is the pidfile of the daemonize library, which doesn't - // currently mark file descriptors as close-on-exec. Either way, we - // want to be on the safe side and prevent accidental regression. .close_fds() .spawn() .map_err(|e| { diff --git a/poetry.lock b/poetry.lock index dfcb16107f..fdfe88acf1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1568,7 +1568,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "17cdbfe90f1b06dffaf24c3e076384ec08dd4a2dce5a05e50565f7364932eb2d" +content-hash = "9352a89d49d34807f6a58f6c3f898acbd8cf3570e0f45ede973673644bde4d0e" [metadata.files] aiopg = [ @@ -1978,6 +1978,7 @@ prometheus-client = [ psycopg2-binary = [ {file = "psycopg2-binary-2.9.3.tar.gz", hash = "sha256:761df5313dc15da1502b21453642d7599d26be88bff659382f8f9747c7ebea4e"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:539b28661b71da7c0e428692438efbcd048ca21ea81af618d845e06ebfd29478"}, + {file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2f2534ab7dc7e776a263b463a16e189eb30e85ec9bbe1bff9e78dae802608932"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e82d38390a03da28c7985b394ec3f56873174e2c88130e6966cb1c946508e65"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:57804fc02ca3ce0dbfbef35c4b3a4a774da66d66ea20f4bda601294ad2ea6092"}, {file = "psycopg2_binary-2.9.3-cp310-cp310-manylinux_2_24_aarch64.whl", hash = "sha256:083a55275f09a62b8ca4902dd11f4b33075b743cf0d360419e2051a8a5d5ff76"}, @@ -2011,6 +2012,7 @@ psycopg2-binary = [ {file = "psycopg2_binary-2.9.3-cp37-cp37m-win32.whl", hash = "sha256:adf20d9a67e0b6393eac162eb81fb10bc9130a80540f4df7e7355c2dd4af9fba"}, {file = "psycopg2_binary-2.9.3-cp37-cp37m-win_amd64.whl", hash = "sha256:2f9ffd643bc7349eeb664eba8864d9e01f057880f510e4681ba40a6532f93c71"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:def68d7c21984b0f8218e8a15d514f714d96904265164f75f8d3a70f9c295667"}, + {file = "psycopg2_binary-2.9.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e6aa71ae45f952a2205377773e76f4e3f27951df38e69a4c95440c779e013560"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dffc08ca91c9ac09008870c9eb77b00a46b3378719584059c034b8945e26b272"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:280b0bb5cbfe8039205c7981cceb006156a675362a00fe29b16fbc264e242834"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-manylinux_2_24_aarch64.whl", hash = "sha256:af9813db73395fb1fc211bac696faea4ca9ef53f32dc0cfa27e4e7cf766dcf24"}, @@ -2022,6 +2024,7 @@ psycopg2-binary = [ {file = "psycopg2_binary-2.9.3-cp38-cp38-win32.whl", hash = "sha256:6472a178e291b59e7f16ab49ec8b4f3bdada0a879c68d3817ff0963e722a82ce"}, {file = "psycopg2_binary-2.9.3-cp38-cp38-win_amd64.whl", hash = "sha256:35168209c9d51b145e459e05c31a9eaeffa9a6b0fd61689b48e07464ffd1a83e"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:47133f3f872faf28c1e87d4357220e809dfd3fa7c64295a4a148bcd1e6e34ec9"}, + {file = "psycopg2_binary-2.9.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b3a24a1982ae56461cc24f6680604fffa2c1b818e9dc55680da038792e004d18"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:91920527dea30175cc02a1099f331aa8c1ba39bf8b7762b7b56cbf54bc5cce42"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:887dd9aac71765ac0d0bac1d0d4b4f2c99d5f5c1382d8b770404f0f3d0ce8a39"}, {file = "psycopg2_binary-2.9.3-cp39-cp39-manylinux_2_24_aarch64.whl", hash = "sha256:1f14c8b0942714eb3c74e1e71700cbbcb415acbc311c730370e70c578a44a25c"}, @@ -2038,18 +2041,7 @@ py = [ {file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"}, ] pyasn1 = [ - {file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"}, - {file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"}, - {file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"}, - {file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"}, {file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"}, - {file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"}, - {file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"}, - {file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"}, - {file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"}, - {file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"}, - {file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"}, - {file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"}, {file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"}, ] pycodestyle = [ @@ -2159,6 +2151,13 @@ pyyaml = [ {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"}, {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"}, {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"}, + {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"}, + {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"}, + {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"}, + {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"}, {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"}, {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"}, diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 64c541ddef..0c0ca2ff9f 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -12,7 +12,7 @@ fs2 = "0.4.3" serde_json = "1" tracing = "0.1.27" clap = "4.0" -daemonize = "0.4.1" +nix = "0.25" tokio = { version = "1.17", features = ["macros", "fs"] } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 67c2c62f73..42f8188d6a 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -4,8 +4,7 @@ use anyhow::{bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; -use daemonize::Daemonize; -use fs2::FileExt; +use nix::unistd::Pid; use remote_storage::RemoteStorageConfig; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; @@ -16,6 +15,7 @@ use tokio::sync::mpsc; use toml_edit::Document; use tracing::*; use url::{ParseError, Url}; +use utils::lock_file; use metrics::set_build_info_metric; use safekeeper::broker; @@ -35,12 +35,10 @@ use utils::{ http::endpoint, id::NodeId, logging::{self, LogFormat}, - project_git_version, - shutdown::exit_now, - signals, tcp_listener, + project_git_version, signals, tcp_listener, }; -const LOCK_FILE_NAME: &str = "safekeeper.lock"; +const PID_FILE_NAME: &str = "safekeeper.pid"; const ID_FILE_NAME: &str = "safekeeper.id"; project_git_version!(GIT_VERSION); @@ -65,10 +63,6 @@ fn main() -> anyhow::Result<()> { conf.no_sync = true; } - if arg_matches.get_flag("daemonize") { - conf.daemonize = true; - } - if let Some(addr) = arg_matches.get_one::("listen-pg") { conf.listen_pg_addr = addr.to_string(); } @@ -143,19 +137,33 @@ fn main() -> anyhow::Result<()> { } fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { - let log_file = logging::init("safekeeper.log", conf.daemonize, conf.log_format)?; - + logging::init(conf.log_format)?; info!("version: {GIT_VERSION}"); // Prevent running multiple safekeepers on the same directory - let lock_file_path = conf.workdir.join(LOCK_FILE_NAME); - let lock_file = File::create(&lock_file_path).context("failed to open lockfile")?; - lock_file.try_lock_exclusive().with_context(|| { - format!( - "control file {} is locked by some other process", - lock_file_path.display() - ) - })?; + let lock_file_path = conf.workdir.join(PID_FILE_NAME); + let lock_file = match lock_file::create_lock_file(&lock_file_path, Pid::this().to_string()) { + lock_file::LockCreationResult::Created { + new_lock_contents, + file, + } => { + info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}"); + file + } + lock_file::LockCreationResult::AlreadyLocked { + existing_lock_contents, + } => anyhow::bail!( + "Could not lock pid file; safekeeper is already running in {:?} with PID {}", + conf.workdir, + existing_lock_contents + ), + lock_file::LockCreationResult::CreationFailed(e) => { + return Err(e.context(format!("Failed to create lock file at {lock_file_path:?}"))) + } + }; + // ensure that the lock file is held even if the main thread of the process is panics + // we need to release the lock file only when the current process is gone + let _ = Box::leak(Box::new(lock_file)); // Set or read our ID. set_id(&mut conf, given_id)?; @@ -187,31 +195,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo } }; - // XXX: Don't spawn any threads before daemonizing! - if conf.daemonize { - info!("daemonizing..."); - - // There should'n be any logging to stdin/stdout. Redirect it to the main log so - // that we will see any accidental manual fprintf's or backtraces. - let stdout = log_file.try_clone().unwrap(); - let stderr = log_file; - - let daemonize = Daemonize::new() - .pid_file("safekeeper.pid") - .working_directory(Path::new(".")) - .stdout(stdout) - .stderr(stderr); - - // XXX: The parent process should exit abruptly right after - // it has spawned a child to prevent coverage machinery from - // dumping stats into a `profraw` file now owned by the child. - // Otherwise, the coverage data will be damaged. - match daemonize.exit_action(|| exit_now(0)).start() { - Ok(_) => info!("Success, daemonized"), - Err(err) => bail!("Error: {err}. could not daemonize. bailing."), - } - } - // Register metrics collector for active timelines. It's important to do this // after daemonizing, otherwise process collector will be upset. let timeline_collector = safekeeper::metrics::TimelineCollector::new(); @@ -384,13 +367,6 @@ fn cli() -> Command { .short('p') .long("pageserver"), ) - .arg( - Arg::new("daemonize") - .short('d') - .long("daemonize") - .action(ArgAction::SetTrue) - .help("Run in the background"), - ) .arg( Arg::new("no-sync") .short('n') diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c3b8227e17..395a29c9ed 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -54,7 +54,6 @@ pub struct SafeKeeperConf { // data directories to avoid clashing with each other. pub workdir: PathBuf, - pub daemonize: bool, pub no_sync: bool, pub listen_pg_addr: String, pub listen_http_addr: String, @@ -88,7 +87,6 @@ impl Default for SafeKeeperConf { // command line, so that when the server is running, all paths are relative // to that. workdir: PathBuf::from("./"), - daemonize: false, no_sync: false, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e7e0e4ce56..b62c80824a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -19,7 +19,7 @@ from dataclasses import dataclass, field from enum import Flag, auto from functools import cached_property from pathlib import Path -from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar, Union, cast +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union, cast import asyncpg import backoff # type: ignore @@ -36,7 +36,7 @@ from psycopg2.extensions import connection as PgConnection from psycopg2.extensions import make_dsn, parse_dsn from typing_extensions import Literal -from .utils import allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture +from .utils import Fn, allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture """ This file contains pytest fixtures. A fixture is a test resource that can be @@ -56,7 +56,6 @@ put directly-importable functions into utils.py or another separate file. """ Env = Dict[str, str] -Fn = TypeVar("Fn", bound=Callable[..., Any]) DEFAULT_OUTPUT_DIR = "test_output" DEFAULT_BRANCH_NAME = "main" @@ -965,11 +964,11 @@ def neon_env_builder( yield builder -class NeonPageserverApiException(Exception): +class PageserverApiException(Exception): pass -class NeonPageserverHttpClient(requests.Session): +class PageserverHttpClient(requests.Session): def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None): super().__init__() self.port = port @@ -987,7 +986,7 @@ class NeonPageserverHttpClient(requests.Session): msg = res.json()["msg"] except: # noqa: E722 msg = "" - raise NeonPageserverApiException(msg) from e + raise PageserverApiException(msg) from e def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() @@ -1624,8 +1623,6 @@ class ComputeCtl(AbstractNeonCli): class NeonPageserver(PgProtocol): """ An object representing a running pageserver. - - Initializes the repository via `neon init`. """ TEMP_FILE_SUFFIX = "___temp" @@ -1674,8 +1671,8 @@ class NeonPageserver(PgProtocol): if '"profiling"' not in self.version: pytest.skip("pageserver was built without 'profiling' feature") - def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient: - return NeonPageserverHttpClient( + def http_client(self, auth_token: Optional[str] = None) -> PageserverHttpClient: + return PageserverHttpClient( port=self.service_port.http, auth_token=auth_token, is_testing_enabled_or_skip=self.is_testing_enabled_or_skip, @@ -2260,11 +2257,6 @@ class PostgresFactory: return self -def read_pid(path: Path) -> int: - """Read content of file into number""" - return int(path.read_text()) - - @dataclass class SafekeeperPort: pg: int @@ -2688,26 +2680,8 @@ def check_restored_datadir_content( assert (mismatch, error) == ([], []) -def wait_until(number_of_iterations: int, interval: float, func): - """ - Wait until 'func' returns successfully, without exception. Returns the - last return value from the function. - """ - last_exception = None - for i in range(number_of_iterations): - try: - res = func() - except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) - last_exception = e - time.sleep(interval) - continue - return res - raise Exception("timed out while waiting for %s" % func) from last_exception - - def assert_no_in_progress_downloads_for_tenant( - pageserver_http_client: NeonPageserverHttpClient, + pageserver_http_client: PageserverHttpClient, tenant: TenantId, ): tenant_status = pageserver_http_client.tenant_status(tenant) @@ -2715,7 +2689,7 @@ def assert_no_in_progress_downloads_for_tenant( def remote_consistent_lsn( - pageserver_http_client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId ) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) @@ -2730,7 +2704,7 @@ def remote_consistent_lsn( def wait_for_upload( - pageserver_http_client: NeonPageserverHttpClient, + pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId, lsn: Lsn, @@ -2754,7 +2728,7 @@ def wait_for_upload( def last_record_lsn( - pageserver_http_client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId + pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId ) -> Lsn: detail = pageserver_http_client.timeline_detail(tenant, timeline) @@ -2764,7 +2738,7 @@ def last_record_lsn( def wait_for_last_record_lsn( - pageserver_http_client: NeonPageserverHttpClient, + pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId, lsn: Lsn, diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 5fb91344ad..1242305ec3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -4,13 +4,16 @@ import re import shutil import subprocess import tarfile +import time from pathlib import Path -from typing import Any, List, Tuple +from typing import Any, Callable, List, Tuple, TypeVar import allure # type: ignore from fixtures.log_helper import log from psycopg2.extensions import cursor +Fn = TypeVar("Fn", bound=Callable[..., Any]) + def get_self_dir() -> str: """Get the path to the directory where this script lives.""" @@ -188,3 +191,57 @@ def allure_attach_from_dir(dir: Path): extension = attachment.suffix.removeprefix(".") allure.attach.file(source, name, attachment_type, extension) + + +def start_in_background( + command: list[str], cwd: Path, log_file_name: str, is_started: Fn +) -> subprocess.Popen[bytes]: + """Starts a process, creates the logfile and redirects stderr and stdout there. Runs the start checks before the process is started, or errors.""" + + log.info(f'Running command "{" ".join(command)}"') + + with open(cwd / log_file_name, "wb") as log_file: + spawned_process = subprocess.Popen(command, stdout=log_file, stderr=log_file, cwd=cwd) + error = None + try: + return_code = spawned_process.poll() + if return_code is not None: + error = f"expected subprocess to run but it exited with code {return_code}" + else: + attempts = 10 + try: + wait_until( + number_of_iterations=attempts, + interval=1, + func=is_started, + ) + except Exception: + error = f"Failed to get correct status from subprocess in {attempts} attempts" + except Exception as e: + error = f"expected subprocess to start but it failed with exception: {e}" + + if error is not None: + log.error(error) + spawned_process.kill() + raise Exception(f"Failed to run subprocess as {command}, reason: {error}") + + log.info("subprocess spawned") + return spawned_process + + +def wait_until(number_of_iterations: int, interval: float, func: Fn): + """ + Wait until 'func' returns successfully, without exception. Returns the + last return value from the function. + """ + last_exception = None + for i in range(number_of_iterations): + try: + res = func() + except Exception as e: + log.info("waiting for %s iteration %s failed", func, i + 1) + last_exception = e + time.sleep(interval) + continue + return res + raise Exception("timed out while waiting for %s" % func) from last_exception diff --git a/test_runner/regress/test_auth.py b/test_runner/regress/test_auth.py index ce4a8ffa9e..8443aa029f 100644 --- a/test_runner/regress/test_auth.py +++ b/test_runner/regress/test_auth.py @@ -1,7 +1,7 @@ from contextlib import closing import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException +from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException from fixtures.types import TenantId @@ -39,7 +39,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): # fail to create branch using token with different tenant_id with pytest.raises( - NeonPageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied" + PageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied" ): invalid_tenant_http_client.timeline_create( tenant_id=env.initial_tenant, ancestor_timeline_id=new_timeline_id @@ -50,7 +50,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder): # fail to create tenant using tenant token with pytest.raises( - NeonPageserverApiException, + PageserverApiException, match="Forbidden: Attempt to access management api with tenant scope. Permission denied", ): tenant_http_client.tenant_create() diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 0487cd8f2c..1f0940cab7 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -10,7 +10,7 @@ import toml from fixtures.neon_fixtures import ( NeonCli, NeonEnvBuilder, - NeonPageserverHttpClient, + PageserverHttpClient, PgBin, PortDistributor, wait_for_last_record_lsn, @@ -208,7 +208,7 @@ def test_backward_compatibility( timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] pageserver_port = snapshot_config["pageserver"]["listen_http_addr"].split(":")[-1] auth_token = snapshot_config["pageserver"]["auth_token"] - pageserver_http = NeonPageserverHttpClient( + pageserver_http = PageserverHttpClient( port=pageserver_port, is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled auth_token=auth_token, diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index a9dc63dd50..d146f78c3a 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -5,13 +5,13 @@ from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, - NeonPageserverHttpClient, + PageserverHttpClient, ) from fixtures.types import TenantId, TimelineId def helper_compare_timeline_list( - pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv, initial_tenant: TenantId + pageserver_http_client: PageserverHttpClient, env: NeonEnv, initial_tenant: TenantId ): """ Compare timelines list returned by CLI and directly via API. @@ -56,7 +56,7 @@ def test_cli_timeline_list(neon_simple_env: NeonEnv): assert nested_timeline_id in timelines_cli -def helper_compare_tenant_list(pageserver_http_client: NeonPageserverHttpClient, env: NeonEnv): +def helper_compare_tenant_list(pageserver_http_client: PageserverHttpClient, env: NeonEnv): tenants = pageserver_http_client.tenant_list() tenants_api = sorted(map(lambda t: cast(str, t["id"]), tenants)) diff --git a/test_runner/regress/test_normal_work.py b/test_runner/regress/test_normal_work.py index 73918ee476..73933021a4 100644 --- a/test_runner/regress/test_normal_work.py +++ b/test_runner/regress/test_normal_work.py @@ -1,9 +1,9 @@ import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverHttpClient +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PageserverHttpClient -def check_tenant(env: NeonEnv, pageserver_http: NeonPageserverHttpClient): +def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient): tenant_id, timeline_id = env.neon_cli.create_tenant() pg = env.postgres.create_start("main", tenant_id=tenant_id) # we rely upon autocommit after each statement diff --git a/test_runner/regress/test_pageserver_api.py b/test_runner/regress/test_pageserver_api.py index bab96cff4f..f5e02af8dd 100644 --- a/test_runner/regress/test_pageserver_api.py +++ b/test_runner/regress/test_pageserver_api.py @@ -6,12 +6,12 @@ from fixtures.neon_fixtures import ( DEFAULT_BRANCH_NAME, NeonEnv, NeonEnvBuilder, - NeonPageserverHttpClient, + PageserverHttpClient, neon_binpath, pg_distrib_dir, - wait_until, ) from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import wait_until # test that we cannot override node id after init @@ -29,8 +29,9 @@ def test_pageserver_init_node_id(neon_simple_env: NeonEnv): stderr=subprocess.PIPE, ) - # remove initial config + # remove initial config and stop existing pageserver pageserver_config.unlink() + neon_simple_env.pageserver.stop() bad_init = run_pageserver(["--init", "-c", f'pg_distrib_dir="{pg_distrib_dir}"']) assert ( @@ -60,7 +61,7 @@ def test_pageserver_init_node_id(neon_simple_env: NeonEnv): assert "has node id already, it cannot be overridden" in bad_update.stderr -def check_client(client: NeonPageserverHttpClient, initial_tenant: TenantId): +def check_client(client: PageserverHttpClient, initial_tenant: TenantId): client.check_status() # check initial tenant is there @@ -116,7 +117,7 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): def expect_updated_msg_lsn( - client: NeonPageserverHttpClient, + client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, prev_msg_lsn: Optional[Lsn], diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 56b14dc42b..4fb5a5406d 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -15,10 +15,9 @@ from fixtures.neon_fixtures import ( available_remote_storages, wait_for_last_record_lsn, wait_for_upload, - wait_until, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar +from fixtures.utils import query_scalar, wait_until # diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index a310eac1f7..dc4cd2e37e 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -2,16 +2,12 @@ from threading import Thread import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - NeonEnvBuilder, - NeonPageserverApiException, - NeonPageserverHttpClient, -) +from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PageserverHttpClient from fixtures.types import TenantId, TimelineId def do_gc_target( - pageserver_http: NeonPageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId ): """Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211""" try: @@ -27,7 +23,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): # first check for non existing tenant tenant_id = TenantId.generate() with pytest.raises( - expected_exception=NeonPageserverApiException, + expected_exception=PageserverApiException, match=f"Tenant not found for id {tenant_id}", ): pageserver_http.tenant_detach(tenant_id) @@ -49,7 +45,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): # gc should not try to even start with pytest.raises( - expected_exception=NeonPageserverApiException, match="gc target timeline does not exist" + expected_exception=PageserverApiException, match="gc target timeline does not exist" ): bogus_timeline_id = TimelineId.generate() pageserver_http.timeline_gc(tenant_id, bogus_timeline_id, 0) @@ -78,6 +74,6 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): assert not (env.repo_dir / "tenants" / str(tenant_id)).exists() with pytest.raises( - expected_exception=NeonPageserverApiException, match=f"Tenant {tenant_id} not found" + expected_exception=PageserverApiException, match=f"Tenant {tenant_id} not found" ): pageserver_http.timeline_gc(tenant_id, timeline_id, 0) diff --git a/test_runner/regress/test_tenant_relocation.py b/test_runner/regress/test_tenant_relocation.py index e14434ffdc..2c11812a7b 100644 --- a/test_runner/regress/test_tenant_relocation.py +++ b/test_runner/regress/test_tenant_relocation.py @@ -1,7 +1,5 @@ import os import pathlib -import signal -import subprocess import threading from contextlib import closing, contextmanager from typing import Any, Dict, Optional, Tuple @@ -12,7 +10,7 @@ from fixtures.neon_fixtures import ( Etcd, NeonEnv, NeonEnvBuilder, - NeonPageserverHttpClient, + PageserverHttpClient, PortDistributor, Postgres, assert_no_in_progress_downloads_for_tenant, @@ -21,10 +19,9 @@ from fixtures.neon_fixtures import ( pg_distrib_dir, wait_for_last_record_lsn, wait_for_upload, - wait_until, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar, subprocess_capture +from fixtures.utils import query_scalar, start_in_background, subprocess_capture, wait_until def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): @@ -32,7 +29,7 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float): @contextmanager -def new_pageserver_helper( +def new_pageserver_service( new_pageserver_dir: pathlib.Path, pageserver_bin: pathlib.Path, remote_storage_mock_path: pathlib.Path, @@ -49,7 +46,6 @@ def new_pageserver_helper( str(pageserver_bin), "--workdir", str(new_pageserver_dir), - "--daemonize", "--update-config", f"-c listen_pg_addr='localhost:{pg_port}'", f"-c listen_http_addr='localhost:{http_port}'", @@ -61,16 +57,26 @@ def new_pageserver_helper( cmd.append( f"-c broker_endpoints=['{broker.client_url()}']", ) - - log.info("starting new pageserver %s", cmd) - out = subprocess.check_output(cmd, text=True) - log.info("started new pageserver %s", out) + pageserver_client = PageserverHttpClient( + port=http_port, + auth_token=None, + is_testing_enabled_or_skip=lambda: True, # TODO: check if testing really enabled + ) try: - yield + pageserver_process = start_in_background( + cmd, new_pageserver_dir, "pageserver.log", pageserver_client.check_status + ) + except Exception as e: + log.error(e) + pageserver_process.kill() + raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}") + + log.info("new pageserver started") + try: + yield pageserver_process finally: log.info("stopping new pageserver") - pid = int((new_pageserver_dir / "pageserver.pid").read_text()) - os.kill(pid, signal.SIGQUIT) + pageserver_process.kill() @contextmanager @@ -113,7 +119,7 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve def populate_branch( pg: Postgres, tenant_id: TenantId, - ps_http: NeonPageserverHttpClient, + ps_http: PageserverHttpClient, create_table: bool, expected_sum: Optional[int], ) -> Tuple[TimelineId, Lsn]: @@ -146,7 +152,7 @@ def populate_branch( def ensure_checkpoint( - pageserver_http: NeonPageserverHttpClient, + pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, current_lsn: Lsn, @@ -159,7 +165,7 @@ def ensure_checkpoint( def check_timeline_attached( - new_pageserver_http_client: NeonPageserverHttpClient, + new_pageserver_http_client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId, old_timeline_detail: Dict[str, Any], @@ -346,13 +352,13 @@ def test_tenant_relocation( log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port) pageserver_bin = pathlib.Path(neon_binpath) / "pageserver" - new_pageserver_http = NeonPageserverHttpClient( + new_pageserver_http = PageserverHttpClient( port=new_pageserver_http_port, auth_token=None, is_testing_enabled_or_skip=env.pageserver.is_testing_enabled_or_skip, ) - with new_pageserver_helper( + with new_pageserver_service( new_pageserver_dir, pageserver_bin, remote_storage_mock_path, diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 97a13bbcb0..a6e935035c 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -1,6 +1,7 @@ from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, wait_until +from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until def get_only_element(l): # noqa: E741 diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py index 96c1fc25db..9a4cbe135b 100644 --- a/test_runner/regress/test_tenants_with_remote_storage.py +++ b/test_runner/regress/test_tenants_with_remote_storage.py @@ -25,10 +25,9 @@ from fixtures.neon_fixtures import ( available_remote_storages, wait_for_last_record_lsn, wait_for_upload, - wait_until, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import query_scalar +from fixtures.utils import query_scalar, wait_until async def tenant_workload(env: NeonEnv, pg: Postgres): diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index 4a78a2746e..450f7f2381 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -1,6 +1,7 @@ import pytest -from fixtures.neon_fixtures import NeonEnv, NeonPageserverApiException, wait_until +from fixtures.neon_fixtures import NeonEnv, PageserverApiException from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until def test_timeline_delete(neon_simple_env: NeonEnv): @@ -11,13 +12,13 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # first try to delete non existing timeline # for existing tenant: invalid_timeline_id = TimelineId.generate() - with pytest.raises(NeonPageserverApiException, match="timeline not found"): + with pytest.raises(PageserverApiException, match="timeline not found"): ps_http.timeline_delete(tenant_id=env.initial_tenant, timeline_id=invalid_timeline_id) # for non existing tenant: invalid_tenant_id = TenantId.generate() with pytest.raises( - NeonPageserverApiException, + PageserverApiException, match=f"Tenant {invalid_tenant_id} not found in the local state", ): ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id) @@ -32,7 +33,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): ps_http = env.pageserver.http_client() with pytest.raises( - NeonPageserverApiException, match="Cannot delete timeline which has child timelines" + PageserverApiException, match="Cannot delete timeline which has child timelines" ): timeline_path = ( @@ -64,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( - NeonPageserverApiException, + PageserverApiException, match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} was not found", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id) diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index d783f897f9..c87e9a6720 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -11,7 +11,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - NeonPageserverHttpClient, + PageserverHttpClient, PgBin, PortDistributor, Postgres, @@ -462,7 +462,7 @@ def assert_physical_size(env: NeonEnv, tenant_id: TenantId, timeline_id: Timelin # Timeline logical size initialization is an asynchronous background task that runs once, # try a few times to ensure it's activated properly def wait_for_timeline_size_init( - client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId + client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId ): for i in range(10): timeline_details = client.timeline_detail( diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 79adfb7b68..09f6f4b9f9 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -27,6 +27,7 @@ from fixtures.neon_fixtures import ( RemoteStorageKind, RemoteStorageUsers, Safekeeper, + SafekeeperHttpClient, SafekeeperPort, available_remote_storages, neon_binpath, @@ -34,7 +35,7 @@ from fixtures.neon_fixtures import ( wait_for_upload, ) from fixtures.types import Lsn, TenantId, TimelineId -from fixtures.utils import get_dir_size, query_scalar +from fixtures.utils import get_dir_size, query_scalar, start_in_background def wait_lsn_force_checkpoint( @@ -841,7 +842,7 @@ class SafekeeperEnv: safekeeper_dir = self.repo_dir / f"sk{i}" safekeeper_dir.mkdir(exist_ok=True) - args = [ + cmd = [ self.bin_safekeeper, "-l", f"127.0.0.1:{port.pg}", @@ -853,11 +854,22 @@ class SafekeeperEnv: str(i), "--broker-endpoints", self.broker.client_url(), - "--daemonize", ] + log.info(f'Running command "{" ".join(cmd)}"') - log.info(f'Running command "{" ".join(args)}"') - return subprocess.run(args, check=True) + safekeeper_client = SafekeeperHttpClient( + port=port.http, + auth_token=None, + ) + try: + safekeeper_process = start_in_background( + cmd, safekeeper_dir, "safekeeper.log", safekeeper_client.check_status + ) + return safekeeper_process + except Exception as e: + log.error(e) + safekeeper_process.kill() + raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}") def get_safekeeper_connstrs(self): return ",".join([sk_proc.args[2] for sk_proc in self.safekeepers])