Use -m immediate for 'immediate' shutdown

This commit is contained in:
Heikki Linnakangas
2021-10-26 22:33:17 +03:00
parent af429fb401
commit 1bc917324d
6 changed files with 79 additions and 39 deletions

8
Cargo.lock generated
View File

@@ -180,9 +180,9 @@ dependencies = [
[[package]]
name = "bitflags"
version = "1.2.1"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitvec"
@@ -1047,9 +1047,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.20.2"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5e06129fb611568ef4e868c14b326274959aa70ff7776e9d55323531c374945"
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
dependencies = [
"bitflags",
"cc",

View File

@@ -18,7 +18,7 @@ regex = "1"
anyhow = "1.0"
thiserror = "1"
bytes = "1.0.1"
nix = "0.20"
nix = "0.23"
url = "2.2.2"
hex = { version = "0.4.3", features = ["serde"] }
reqwest = { version = "0.11", features = ["blocking", "json"] }

View File

@@ -7,6 +7,7 @@ use std::time::Duration;
use std::{io, result, thread};
use anyhow::bail;
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::Config;
@@ -190,24 +191,44 @@ impl SafekeeperNode {
bail!("safekeeper failed to start in {} seconds", RETRIES);
}
///
/// Stop the server.
///
/// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
/// Otherwise we use SIGTERM, triggering a clean shutdown
///
/// If the server is not running, returns success
///
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file();
if !pid_file.exists() {
println!("Safekeeper {} is already stopped", self.name);
return Ok(())
return Ok(());
}
let pid = read_pidfile(&pid_file)?;
let pid = Pid::from_raw(pid);
if immediate {
let sig = if immediate {
println!("Stop safekeeper immediately");
if kill(pid, Signal::SIGQUIT).is_err() {
bail!("Failed to kill safekeeper with pid {}", pid);
}
Signal::SIGQUIT
} else {
println!("Stop safekeeper gracefully");
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to stop safekeeper with pid {}", pid);
Signal::SIGTERM
};
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!(
"Safekeeper with pid {} does not exist, but a PID file was found",
pid
);
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to safekeeper with pid {}: {}",
pid,
err.desc()
),
}
let address = connection_address(&self.pg_connection_config);

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use std::{io, result, thread};
use anyhow::{anyhow, bail};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{BranchCreateRequest, TenantCreateRequest};
@@ -206,25 +207,37 @@ impl PageServerNode {
/// If 'immediate' is true, we use SIGQUIT, killing the process immediately.
/// Otherwise we use SIGTERM, triggering a clean shutdown
///
/// If the page server is not running, returns success
/// If the server is not running, returns success
///
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file();
if !pid_file.exists() {
println!("Pageserver is already stopped");
return Ok(())
return Ok(());
}
let pid = Pid::from_raw(read_pidfile(&pid_file)?);
if immediate {
let sig = if immediate {
println!("Stop pageserver immediately");
if kill(pid, Signal::SIGQUIT).is_err() {
bail!("Failed to kill pageserver with pid {}", pid);
}
Signal::SIGQUIT
} else {
println!("Stop pageserver gracefully");
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to stop pageserver with pid {}", pid);
Signal::SIGTERM
};
match kill(pid, sig) {
Ok(_) => (),
Err(Errno::ESRCH) => {
println!(
"Pageserver with pid {} does not exist, but a PID file was found",
pid
);
return Ok(());
}
Err(err) => bail!(
"Failed to send signal to pageserver with pid {}: {}",
pid,
err.desc()
),
}
let address = connection_address(&self.pg_connection_config);

View File

@@ -342,7 +342,7 @@ class ZenithEnvBuilder:
log.info('Cleaning up all storage and compute nodes')
self.env.postgres.stop_all()
for sk in self.env.safekeepers:
sk.stop()
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
@@ -675,7 +675,7 @@ class ZenithPageserver(PgProtocol):
"""
cmd = ['pageserver', 'stop']
if immediate:
cmd.append('immediate')
cmd.extend(['-m', 'immediate'])
log.info(f"Stopping pageserver with {cmd}")
if self.running:
@@ -1024,9 +1024,14 @@ class Safekeeper:
break # success
return self
def stop(self) -> 'Safekeeper':
def stop(self, immediate=False) -> 'Safekeeper':
cmd = ['safekeeper', 'stop']
if immediate:
cmd.extend(['-m', 'immediate'])
cmd.append(self.name)
log.info('Stopping safekeeper {}'.format(self.name))
self.env.zenith_cli(['safekeeper', 'stop', self.name])
self.env.zenith_cli(cmd)
return self
def append_logical_message(self, tenant_id: str, timeline_id: str,

View File

@@ -93,6 +93,14 @@ fn main() -> Result<()> {
.required(false)
.value_name("port");
let stop_mode_arg = Arg::with_name("stop-mode")
.short("m")
.takes_value(true)
.possible_values(&["fast", "immediate"])
.help("If 'immediate', don't flush repository data at shutdown")
.required(false)
.value_name("stop-mode");
let matches = App::new("Zenith CLI")
.setting(AppSettings::ArgRequiredElseHelp)
.subcommand(
@@ -125,10 +133,7 @@ fn main() -> Result<()> {
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
.arg(Arg::with_name("immediate")
.help("Don't flush repository data at shutdown")
.required(false)
))
.arg(stop_mode_arg.clone()))
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
)
.subcommand(
@@ -142,13 +147,12 @@ fn main() -> Result<()> {
.subcommand(SubCommand::with_name("stop")
.about("Stop local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(Arg::with_name("immediate")
.help("Don't flush data at shutdown")
.required(false)
))
.arg(stop_mode_arg.clone())
)
.subcommand(SubCommand::with_name("restart")
.about("Restart local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(stop_mode_arg.clone())
)
)
.subcommand(
@@ -195,10 +199,7 @@ fn main() -> Result<()> {
.subcommand(
SubCommand::with_name("stop")
.about("Stop page server and safekeepers")
.arg(Arg::with_name("immediate")
.help("Don't flush repository data at shutdown")
.required(false)
)
.arg(stop_mode_arg.clone())
)
.get_matches();
@@ -600,7 +601,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
("stop", Some(stop_match)) => {
let immediate = stop_match.is_present("immediate");
let immediate = stop_match.value_of("stop-mode") == Some("immediate");
if let Err(e) = pageserver.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
@@ -652,7 +653,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
let node_name = sub_match
.value_of("node")
.unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let immediate = sub_match.is_present("immediate");
let immediate = sub_match.value_of("stop-mode") == Some("immediate");
let safekeeper = get_safekeeper(env, node_name)?;
@@ -707,7 +708,7 @@ fn handle_start_all(_sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let immediate = sub_match.is_present("immediate");
let immediate = sub_match.value_of("stop-mode") == Some("immediate");
let pageserver = PageServerNode::from_env(env);