diff --git a/Cargo.lock b/Cargo.lock index a7b90e18b7..59facab172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,7 +1888,6 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" dependencies = [ - "cc", "libc", "signal-hook-registry", ] @@ -2360,6 +2359,7 @@ dependencies = [ "rust-s3", "serde", "serde_json", + "signal-hook", "tempfile", "tokio", "tokio-stream", @@ -2611,6 +2611,7 @@ dependencies = [ "rustls-split", "serde", "serde_json", + "signal-hook", "tempfile", "thiserror", "tokio", diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index ca8dbf38dd..fcbf840397 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -130,9 +130,8 @@ impl SafekeeperNode { let listen_pg = format!("localhost:{}", self.conf.pg_port); let listen_http = format!("localhost:{}", self.conf.http_port); - let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?); - cmd = cmd - .args(&["-D", self.datadir_path().to_str().unwrap()]) + let mut cmd = Command::new(self.env.safekeeper_bin()?); + cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--pageserver", &pageserver_conn]) @@ -141,13 +140,18 @@ impl SafekeeperNode { .env_clear() .env("RUST_BACKTRACE", "1"); if !self.conf.sync { - cmd = cmd.arg("--no-sync"); + cmd.arg("--no-sync"); } if self.env.pageserver.auth_type == AuthType::ZenithJWT { cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token); } + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Safekeeper failed to start. See '{}' for details.", diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index f4cef0a72b..b8118d7f4b 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -5,7 +5,7 @@ use std::process::Command; use std::time::Duration; use std::{io, result, thread}; -use anyhow::{anyhow, bail}; +use anyhow::bail; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; @@ -97,7 +97,6 @@ impl PageServerNode { } pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> { - let mut cmd = Command::new(self.env.pageserver_bin()?); let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port); let listen_http = format!("localhost:{}", self.env.pageserver.http_port); let mut args = vec![ @@ -122,18 +121,19 @@ impl PageServerNode { args.extend(&["--create-tenant", tenantid]) } - let status = cmd - .args(args) - .env_clear() - .env("RUST_BACKTRACE", "1") - .status() - .expect("pageserver init failed"); + let mut cmd = Command::new(self.env.pageserver_bin()?); + cmd.args(args).env_clear().env("RUST_BACKTRACE", "1"); - if status.success() { - Ok(()) - } else { - Err(anyhow!("pageserver init failed")) + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); } + + if !cmd.status()?.success() { + bail!("pageserver init failed"); + } + + Ok(()) } pub fn repo_path(&self) -> PathBuf { @@ -158,6 +158,11 @@ impl PageServerNode { .env_clear() .env("RUST_BACKTRACE", "1"); + let var = "LLVM_PROFILE_FILE"; + if let Some(val) = std::env::var_os(var) { + cmd.env(var, val); + } + if !cmd.status()?.success() { bail!( "Pageserver failed to start. See '{}' for details.", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6cb26404c5..7bb6d1c945 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -35,7 +35,7 @@ scopeguard = "1.1.0" async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" -signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } +signal-hook = "0.3.10" url = "2" nix = "0.23" once_cell = "1.8.0" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 63de235003..e237afbd16 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -14,14 +14,6 @@ use tracing::*; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION}; use anyhow::{bail, ensure, Context, Result}; -use signal_hook::consts::signal::*; -use signal_hook::consts::TERM_SIGNALS; -use signal_hook::flag; -use signal_hook::iterator::exfiltrator::WithOrigin; -use signal_hook::iterator::SignalsInfo; -use std::process::exit; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; @@ -32,6 +24,8 @@ use pageserver::{ }; use zenith_utils::http::endpoint; use zenith_utils::postgres_backend; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals::{self, Signal}; use const_format::formatcp; @@ -524,17 +518,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { info!("version: {}", GIT_VERSION); - let term_now = Arc::new(AtomicBool::new(false)); - for sig in TERM_SIGNALS { - // When terminated by a second term signal, exit with exit code 1. - // This will do nothing the first time (because term_now is false). - flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; - // But this will "arm" the above for the second time, by setting it to true. - // The order of registering these is important, if you put this one first, it will - // first arm and then terminate ‒ all in the first round. - flag::register(*sig, Arc::clone(&term_now))?; - } - // TODO: Check that it looks like a valid repository before going further // bind sockets before daemonizing so we report errors early and do not return until we are listening @@ -550,6 +533,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { ); let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -564,18 +548,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(err) => error!(%err, "could not daemonize"), } } - // keep join handles for spawned threads - // don't spawn threads before daemonizing - let mut join_handles = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? { - join_handles.push(handle); + threads.push(handle); } // Initialize tenant manager. tenant_mgr::init(conf); @@ -594,61 +581,55 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits let cloned = auth.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(move || { - let router = http::make_router(conf, cloned); - endpoint::serve_thread_main(router, http_listener) - })?; - - join_handles.push(http_endpoint_thread); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(move || { + let router = http::make_router(conf, cloned); + endpoint::serve_thread_main(router, http_listener) + })?, + ); // Spawn a thread to listen for connections. It will spawn further threads // for each connection. - let page_service_thread = thread::Builder::new() - .name("Page Service thread".into()) - .spawn(move || { - page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) - })?; + threads.push( + thread::Builder::new() + .name("Page Service thread".into()) + .spawn(move || { + page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) + })?, + ); - for info in SignalsInfo::::new(TERM_SIGNALS)?.into_iter() { - match info.signal { - SIGQUIT => { - info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode"); - exit(111); - } - SIGINT | SIGTERM => { - info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode"); - // Terminate postgres backends - postgres_backend::set_pgbackend_shutdown_requested(); - // Stop all tenants and flush their data - tenant_mgr::shutdown_all_tenants()?; - // Wait for pageservice thread to complete the job - page_service_thread + signals.handle(|signal| match signal { + Signal::Quit => { + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + } + + Signal::Interrupt | Signal::Terminate => { + info!( + "Got {}. Terminating gracefully in fast shutdown mode", + signal.name() + ); + + postgres_backend::set_pgbackend_shutdown_requested(); + tenant_mgr::shutdown_all_tenants()?; + endpoint::shutdown(); + + for handle in std::mem::take(&mut threads) { + handle .join() .expect("thread panicked") .expect("thread exited with an error"); - - // Shut down http router - endpoint::shutdown(); - - // Wait for all threads - for handle in join_handles.into_iter() { - handle - .join() - .expect("thread panicked") - .expect("thread exited with an error"); - } - info!("Pageserver shut down successfully completed"); - exit(0); - } - unknown_signal => { - debug!("Unknown signal {}", unknown_signal); } + + info!("Shut down successfully completed"); + std::process::exit(0); } - } - - Ok(()) + }) } #[cfg(test)] diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 326cb060d1..954e9d75ff 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -509,6 +509,12 @@ sync = false # Disable fsyncs to make the tests go faster env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir) env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir) + # Pass coverage settings + var = 'LLVM_PROFILE_FILE' + val = os.environ.get(var) + if val: + env_vars[var] = val + # Intercept CalledProcessError and print more info try: res = subprocess.run(args, diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 539a925ebd..69d5f681c5 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -28,6 +28,7 @@ anyhow = "1.0" crc32c = "0.6.0" humantime = "2.1.0" walkdir = "2" +signal-hook = "0.3.10" serde = { version = "1.0", features = ["derive"] } hex = "0.4.3" const_format = "0.2.21" diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index e85d49a8c6..0f06983574 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -8,14 +8,15 @@ use daemonize::Daemonize; use log::*; use std::path::{Path, PathBuf}; use std::thread; -use zenith_utils::http::endpoint; -use zenith_utils::{logging, tcp_listener, GIT_VERSION}; - use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::SafeKeeperConf; +use zenith_utils::http::endpoint; +use zenith_utils::shutdown::exit_now; +use zenith_utils::signals; +use zenith_utils::{logging, tcp_listener, GIT_VERSION}; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -131,6 +132,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { e })?; + // XXX: Don't spawn any threads before daemonizing! if conf.daemonize { info!("daemonizing..."); @@ -145,51 +147,59 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .stdout(stdout) .stderr(stderr); - match daemonize.start() { + // XXX: The parent process should exit abruptly right after + // it has spawned a child to prevent coverage machinery from + // dumping stats into a `profraw` file now owned by the child. + // Otherwise, the coverage data will be damaged. + match daemonize.exit_action(|| exit_now(0)).start() { Ok(_) => info!("Success, daemonized"), Err(e) => error!("Error, {}", e), } } - let mut threads = Vec::new(); + let signals = signals::install_shutdown_handlers()?; + let mut threads = vec![]; - let conf_cloned = conf.clone(); - let http_endpoint_thread = thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(|| { - // TODO authentication - let router = http::make_router(conf_cloned); - endpoint::serve_thread_main(router, http_listener).unwrap(); - }) - .unwrap(); - threads.push(http_endpoint_thread); + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("http_endpoint_thread".into()) + .spawn(|| { + // TODO authentication + let router = http::make_router(conf_); + endpoint::serve_thread_main(router, http_listener).unwrap(); + })?, + ); if conf.ttl.is_some() { - let s3_conf = conf.clone(); - let s3_offload_thread = thread::Builder::new() - .name("S3 offload thread".into()) + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("S3 offload thread".into()) + .spawn(|| { + s3_offload::thread_main(conf_); + })?, + ); + } + + threads.push( + thread::Builder::new() + .name("WAL acceptor thread".into()) .spawn(|| { - // thread code - s3_offload::thread_main(s3_conf); - }) - .unwrap(); - threads.push(s3_offload_thread); - } + let thread_result = wal_service::thread_main(conf, pg_listener); + if let Err(e) = thread_result { + info!("wal_service thread terminated: {}", e); + } + })?, + ); - let wal_acceptor_thread = thread::Builder::new() - .name("WAL acceptor thread".into()) - .spawn(|| { - // thread code - let thread_result = wal_service::thread_main(conf, pg_listener); - if let Err(e) = thread_result { - info!("wal_service thread terminated: {}", e); - } - }) - .unwrap(); - threads.push(wal_acceptor_thread); - - for t in threads { - t.join().unwrap() - } - Ok(()) + // NOTE: we still have to handle signals like SIGQUIT to prevent coredumps + signals.handle(|signal| { + // TODO: implement graceful shutdown with joining threads etc + info!( + "Got {}. Terminating in immediate shutdown mode", + signal.name() + ); + std::process::exit(111); + }) } diff --git a/zenith_utils/Cargo.toml b/zenith_utils/Cargo.toml index 6292971c21..3a81e9bd38 100644 --- a/zenith_utils/Cargo.toml +++ b/zenith_utils/Cargo.toml @@ -20,18 +20,17 @@ tokio = "1.11" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } nix = "0.23.0" - -zenith_metrics = { path = "../zenith_metrics" } -workspace_hack = { path = "../workspace_hack" } +signal-hook = "0.3.10" rand = "0.8.3" jsonwebtoken = "7" hex = { version = "0.4.3", features = ["serde"] } - rustls = "0.19.1" rustls-split = "0.2.1" - git-version = "0.3.5" +zenith_metrics = { path = "../zenith_metrics" } +workspace_hack = { path = "../workspace_hack" } + [dev-dependencies] hex-literal = "0.3" bytes = "1.0" diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 9c35f77328..ffb798fe83 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -153,7 +153,7 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res } } -// Send shutdown signal +/// Initiate graceful shutdown of the http endpoint pub fn shutdown() { if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() { let _ = tx.send(()); diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index eb9948ed64..b0e5131a11 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -40,6 +40,7 @@ pub mod logging; // Misc pub mod accum; +pub mod shutdown; // Utility for binding TcpListeners with proper socket options. pub mod tcp_listener; @@ -47,6 +48,9 @@ pub mod tcp_listener; // Utility for putting a raw file descriptor into non-blocking mode pub mod nonblock; +// Default signal handling +pub mod signals; + // This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages // // we have several cases: diff --git a/zenith_utils/src/shutdown.rs b/zenith_utils/src/shutdown.rs new file mode 100644 index 0000000000..7eba905997 --- /dev/null +++ b/zenith_utils/src/shutdown.rs @@ -0,0 +1,6 @@ +/// Immediately terminate the calling process without calling +/// atexit callbacks, C runtime destructors etc. We mainly use +/// this to protect coverage data from concurrent writes. +pub fn exit_now(code: u8) { + unsafe { nix::libc::_exit(code as _) }; +} diff --git a/zenith_utils/src/signals.rs b/zenith_utils/src/signals.rs new file mode 100644 index 0000000000..6586da2339 --- /dev/null +++ b/zenith_utils/src/signals.rs @@ -0,0 +1,59 @@ +use signal_hook::flag; +use signal_hook::iterator::Signals; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +pub use signal_hook::consts::{signal::*, TERM_SIGNALS}; + +pub fn install_shutdown_handlers() -> anyhow::Result { + let term_now = Arc::new(AtomicBool::new(false)); + for sig in TERM_SIGNALS { + // When terminated by a second term signal, exit with exit code 1. + // This will do nothing the first time (because term_now is false). + flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; + // But this will "arm" the above for the second time, by setting it to true. + // The order of registering these is important, if you put this one first, it will + // first arm and then terminate ‒ all in the first round. + flag::register(*sig, Arc::clone(&term_now))?; + } + + Ok(ShutdownSignals) +} + +pub enum Signal { + Quit, + Interrupt, + Terminate, +} + +impl Signal { + pub fn name(&self) -> &'static str { + match self { + Signal::Quit => "SIGQUIT", + Signal::Interrupt => "SIGINT", + Signal::Terminate => "SIGTERM", + } + } +} + +pub struct ShutdownSignals; + +impl ShutdownSignals { + pub fn handle( + self, + mut handler: impl FnMut(Signal) -> anyhow::Result<()>, + ) -> anyhow::Result<()> { + for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() { + let signal = match raw_signal { + SIGINT => Signal::Interrupt, + SIGTERM => Signal::Terminate, + SIGQUIT => Signal::Quit, + other => panic!("unknown signal: {}", other), + }; + + handler(signal)?; + } + + Ok(()) + } +}