Improve shutdown story for code coverage

This patch introduces fixes for several problems affecting
LLVM-based code coverage:

* Daemonizing parent processes should call _exit() to prevent
coverage data file corruption (*.profraw) due to concurrent writes.

* Implement proper shutdown handlers in safekeeper.
This commit is contained in:
Dmitry Ivanov
2021-11-30 15:36:00 +03:00
parent b7685eb6ba
commit 7cec13d1df
13 changed files with 210 additions and 134 deletions

3
Cargo.lock generated
View File

@@ -1888,7 +1888,6 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1"
dependencies = [
"cc",
"libc",
"signal-hook-registry",
]
@@ -2360,6 +2359,7 @@ dependencies = [
"rust-s3",
"serde",
"serde_json",
"signal-hook",
"tempfile",
"tokio",
"tokio-stream",
@@ -2611,6 +2611,7 @@ dependencies = [
"rustls-split",
"serde",
"serde_json",
"signal-hook",
"tempfile",
"thiserror",
"tokio",

View File

@@ -130,9 +130,8 @@ impl SafekeeperNode {
let listen_pg = format!("localhost:{}", self.conf.pg_port);
let listen_http = format!("localhost:{}", self.conf.http_port);
let mut cmd: &mut Command = &mut Command::new(self.env.safekeeper_bin()?);
cmd = cmd
.args(&["-D", self.datadir_path().to_str().unwrap()])
let mut cmd = Command::new(self.env.safekeeper_bin()?);
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--pageserver", &pageserver_conn])
@@ -141,13 +140,18 @@ impl SafekeeperNode {
.env_clear()
.env("RUST_BACKTRACE", "1");
if !self.conf.sync {
cmd = cmd.arg("--no-sync");
cmd.arg("--no-sync");
}
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
}
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Safekeeper failed to start. See '{}' for details.",

View File

@@ -5,7 +5,7 @@ use std::process::Command;
use std::time::Duration;
use std::{io, result, thread};
use anyhow::{anyhow, bail};
use anyhow::bail;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
@@ -97,7 +97,6 @@ impl PageServerNode {
}
pub fn init(&self, create_tenant: Option<&str>) -> anyhow::Result<()> {
let mut cmd = Command::new(self.env.pageserver_bin()?);
let listen_pg = format!("localhost:{}", self.env.pageserver.pg_port);
let listen_http = format!("localhost:{}", self.env.pageserver.http_port);
let mut args = vec![
@@ -122,18 +121,19 @@ impl PageServerNode {
args.extend(&["--create-tenant", tenantid])
}
let status = cmd
.args(args)
.env_clear()
.env("RUST_BACKTRACE", "1")
.status()
.expect("pageserver init failed");
let mut cmd = Command::new(self.env.pageserver_bin()?);
cmd.args(args).env_clear().env("RUST_BACKTRACE", "1");
if status.success() {
Ok(())
} else {
Err(anyhow!("pageserver init failed"))
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!("pageserver init failed");
}
Ok(())
}
pub fn repo_path(&self) -> PathBuf {
@@ -158,6 +158,11 @@ impl PageServerNode {
.env_clear()
.env("RUST_BACKTRACE", "1");
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);
}
if !cmd.status()?.success() {
bail!(
"Pageserver failed to start. See '{}' for details.",

View File

@@ -35,7 +35,7 @@ scopeguard = "1.1.0"
async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
signal-hook = "0.3.10"
url = "2"
nix = "0.23"
once_cell = "1.8.0"

View File

@@ -14,14 +14,6 @@ use tracing::*;
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType, tcp_listener, GIT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use signal_hook::consts::signal::*;
use signal_hook::consts::TERM_SIGNALS;
use signal_hook::flag;
use signal_hook::iterator::exfiltrator::WithOrigin;
use signal_hook::iterator::SignalsInfo;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
@@ -32,6 +24,8 @@ use pageserver::{
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals::{self, Signal};
use const_format::formatcp;
@@ -524,17 +518,6 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
info!("version: {}", GIT_VERSION);
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
// TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening
@@ -550,6 +533,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
);
let pageserver_listener = tcp_listener::bind(conf.listen_pg_addr.clone())?;
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
@@ -564,18 +548,21 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
.stdout(stdout)
.stderr(stderr);
match daemonize.start() {
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(err) => error!(%err, "could not daemonize"),
}
}
// keep join handles for spawned threads
// don't spawn threads before daemonizing
let mut join_handles = Vec::new();
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? {
join_handles.push(handle);
threads.push(handle);
}
// Initialize tenant manager.
tenant_mgr::init(conf);
@@ -594,61 +581,55 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Spawn a new thread for the http endpoint
// bind before launching separate thread so the error reported before startup exits
let cloned = auth.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, http_listener)
})?;
join_handles.push(http_endpoint_thread);
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(move || {
let router = http::make_router(conf, cloned);
endpoint::serve_thread_main(router, http_listener)
})?,
);
// Spawn a thread to listen for connections. It will spawn further threads
// for each connection.
let page_service_thread = thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
threads.push(
thread::Builder::new()
.name("Page Service thread".into())
.spawn(move || {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?,
);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal {
SIGQUIT => {
info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode");
exit(111);
}
SIGINT | SIGTERM => {
info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode");
// Terminate postgres backends
postgres_backend::set_pgbackend_shutdown_requested();
// Stop all tenants and flush their data
tenant_mgr::shutdown_all_tenants()?;
// Wait for pageservice thread to complete the job
page_service_thread
signals.handle(|signal| match signal {
Signal::Quit => {
info!(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
std::process::exit(111);
}
Signal::Interrupt | Signal::Terminate => {
info!(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
postgres_backend::set_pgbackend_shutdown_requested();
tenant_mgr::shutdown_all_tenants()?;
endpoint::shutdown();
for handle in std::mem::take(&mut threads) {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
// Shut down http router
endpoint::shutdown();
// Wait for all threads
for handle in join_handles.into_iter() {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
}
info!("Pageserver shut down successfully completed");
exit(0);
}
unknown_signal => {
debug!("Unknown signal {}", unknown_signal);
}
info!("Shut down successfully completed");
std::process::exit(0);
}
}
Ok(())
})
}
#[cfg(test)]

View File

@@ -509,6 +509,12 @@ sync = false # Disable fsyncs to make the tests go faster
env_vars['ZENITH_REPO_DIR'] = str(self.repo_dir)
env_vars['POSTGRES_DISTRIB_DIR'] = str(pg_distrib_dir)
# Pass coverage settings
var = 'LLVM_PROFILE_FILE'
val = os.environ.get(var)
if val:
env_vars[var] = val
# Intercept CalledProcessError and print more info
try:
res = subprocess.run(args,

View File

@@ -28,6 +28,7 @@ anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
walkdir = "2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3"
const_format = "0.2.21"

View File

@@ -8,14 +8,15 @@ use daemonize::Daemonize;
use log::*;
use std::path::{Path, PathBuf};
use std::thread;
use zenith_utils::http::endpoint;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::SafeKeeperConf;
use zenith_utils::http::endpoint;
use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -131,6 +132,7 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
e
})?;
// XXX: Don't spawn any threads before daemonizing!
if conf.daemonize {
info!("daemonizing...");
@@ -145,51 +147,59 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.stdout(stdout)
.stderr(stderr);
match daemonize.start() {
// XXX: The parent process should exit abruptly right after
// it has spawned a child to prevent coverage machinery from
// dumping stats into a `profraw` file now owned by the child.
// Otherwise, the coverage data will be damaged.
match daemonize.exit_action(|| exit_now(0)).start() {
Ok(_) => info!("Success, daemonized"),
Err(e) => error!("Error, {}", e),
}
}
let mut threads = Vec::new();
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let conf_cloned = conf.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_cloned);
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
threads.push(http_endpoint_thread);
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// TODO authentication
let router = http::make_router(conf_);
endpoint::serve_thread_main(router, http_listener).unwrap();
})?,
);
if conf.ttl.is_some() {
let s3_conf = conf.clone();
let s3_offload_thread = thread::Builder::new()
.name("S3 offload thread".into())
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("S3 offload thread".into())
.spawn(|| {
s3_offload::thread_main(conf_);
})?,
);
}
threads.push(
thread::Builder::new()
.name("WAL acceptor thread".into())
.spawn(|| {
// thread code
s3_offload::thread_main(s3_conf);
})
.unwrap();
threads.push(s3_offload_thread);
}
let thread_result = wal_service::thread_main(conf, pg_listener);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
}
})?,
);
let wal_acceptor_thread = thread::Builder::new()
.name("WAL acceptor thread".into())
.spawn(|| {
// thread code
let thread_result = wal_service::thread_main(conf, pg_listener);
if let Err(e) = thread_result {
info!("wal_service thread terminated: {}", e);
}
})
.unwrap();
threads.push(wal_acceptor_thread);
for t in threads {
t.join().unwrap()
}
Ok(())
// NOTE: we still have to handle signals like SIGQUIT to prevent coredumps
signals.handle(|signal| {
// TODO: implement graceful shutdown with joining threads etc
info!(
"Got {}. Terminating in immediate shutdown mode",
signal.name()
);
std::process::exit(111);
})
}

View File

@@ -20,18 +20,17 @@ tokio = "1.11"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
nix = "0.23.0"
zenith_metrics = { path = "../zenith_metrics" }
workspace_hack = { path = "../workspace_hack" }
signal-hook = "0.3.10"
rand = "0.8.3"
jsonwebtoken = "7"
hex = { version = "0.4.3", features = ["serde"] }
rustls = "0.19.1"
rustls-split = "0.2.1"
git-version = "0.3.5"
zenith_metrics = { path = "../zenith_metrics" }
workspace_hack = { path = "../workspace_hack" }
[dev-dependencies]
hex-literal = "0.3"
bytes = "1.0"

View File

@@ -153,7 +153,7 @@ pub fn check_permission(req: &Request<Body>, tenantid: Option<ZTenantId>) -> Res
}
}
// Send shutdown signal
/// Initiate graceful shutdown of the http endpoint
pub fn shutdown() {
if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() {
let _ = tx.send(());

View File

@@ -40,6 +40,7 @@ pub mod logging;
// Misc
pub mod accum;
pub mod shutdown;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;
@@ -47,6 +48,9 @@ pub mod tcp_listener;
// Utility for putting a raw file descriptor into non-blocking mode
pub mod nonblock;
// Default signal handling
pub mod signals;
// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
//
// we have several cases:

View File

@@ -0,0 +1,6 @@
/// Immediately terminate the calling process without calling
/// atexit callbacks, C runtime destructors etc. We mainly use
/// this to protect coverage data from concurrent writes.
pub fn exit_now(code: u8) {
unsafe { nix::libc::_exit(code as _) };
}

View File

@@ -0,0 +1,59 @@
use signal_hook::flag;
use signal_hook::iterator::Signals;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub fn install_shutdown_handlers() -> anyhow::Result<ShutdownSignals> {
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
Ok(ShutdownSignals)
}
pub enum Signal {
Quit,
Interrupt,
Terminate,
}
impl Signal {
pub fn name(&self) -> &'static str {
match self {
Signal::Quit => "SIGQUIT",
Signal::Interrupt => "SIGINT",
Signal::Terminate => "SIGTERM",
}
}
}
pub struct ShutdownSignals;
impl ShutdownSignals {
pub fn handle(
self,
mut handler: impl FnMut(Signal) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
for raw_signal in Signals::new(TERM_SIGNALS)?.into_iter() {
let signal = match raw_signal {
SIGINT => Signal::Interrupt,
SIGTERM => Signal::Terminate,
SIGQUIT => Signal::Quit,
other => panic!("unknown signal: {}", other),
};
handler(signal)?;
}
Ok(())
}
}