Add --id argument to safekeeper setting its unique u64 id.

In preparation for storage node messaging. IDs are supposed to be monotonically
assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli
level and fixtures, string name is completely replaced by integer id. Example
TOML configs are adjusted accordingly.

Sequential ids are chosen over Zid mainly because they are compact and easy to
type/remember.
This commit is contained in:
Arseny Sher
2022-02-14 15:51:34 +03:00
parent b815f5fb9f
commit 5865f85ae2
11 changed files with 158 additions and 68 deletions

View File

@@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust'
[[safekeepers]]
name = 'sk1'
id = 1
pg_port = 5454
http_port = 7676
[[safekeepers]]
name = 'sk2'
id = 2
pg_port = 5455
http_port = 7677
[[safekeepers]]
name = 'sk3'
id = 3
pg_port = 5456
http_port = 7678

View File

@@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898'
auth_type = 'Trust'
[[safekeepers]]
name = 'single'
id = 1
pg_port = 5454
http_port = 7676

View File

@@ -12,7 +12,9 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{opt_display_serde, ZTenantId};
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
use crate::safekeeper::SafekeeperNode;
//
// This data structures represents zenith CLI config
@@ -87,7 +89,7 @@ impl Default for PageServerConf {
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(default)]
pub struct SafekeeperConf {
pub name: String,
pub id: ZNodeId,
pub pg_port: u16,
pub http_port: u16,
pub sync: bool,
@@ -96,7 +98,7 @@ pub struct SafekeeperConf {
impl Default for SafekeeperConf {
fn default() -> Self {
Self {
name: String::new(),
id: ZNodeId(0),
pg_port: 0,
http_port: 0,
sync: true,
@@ -136,8 +138,8 @@ impl LocalEnv {
self.base_data_dir.clone()
}
pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(node_name)
pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
/// Create a LocalEnv from a config file.
@@ -285,7 +287,7 @@ impl LocalEnv {
fs::create_dir_all(self.pg_data_dirs_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?;
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;
}
let mut conf_content = String::new();

View File

@@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::zid::ZNodeId;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
@@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response {
//
#[derive(Debug)]
pub struct SafekeeperNode {
pub name: String,
pub id: ZNodeId,
pub conf: SafekeeperConf,
@@ -77,10 +78,10 @@ impl SafekeeperNode {
pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode {
let pageserver = Arc::new(PageServerNode::from_env(env));
println!("initializing for {} for {}", conf.name, conf.http_port);
println!("initializing for sk {} for {}", conf.id, conf.http_port);
SafekeeperNode {
name: conf.name.clone(),
id: conf.id,
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
env: env.clone(),
@@ -98,8 +99,12 @@ impl SafekeeperNode {
.unwrap()
}
pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf {
env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref())
}
pub fn datadir_path(&self) -> PathBuf {
self.env.safekeeper_data_dir(&self.name)
SafekeeperNode::datadir_path_by_id(&self.env, self.id)
}
pub fn pid_file(&self) -> PathBuf {
@@ -120,6 +125,7 @@ impl SafekeeperNode {
let mut cmd = Command::new(self.env.safekeeper_bin()?);
fill_rust_env_vars(
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--id", self.id.to_string().as_ref()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
@@ -183,7 +189,7 @@ impl SafekeeperNode {
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid_file = self.pid_file();
if !pid_file.exists() {
println!("Safekeeper {} is already stopped", self.name);
println!("Safekeeper {} is already stopped", self.id);
return Ok(());
}
let pid = read_pidfile(&pid_file)?;

View File

@@ -521,12 +521,7 @@ class SafekeeperEnv:
http=self.port_distributor.get_port(),
)
if self.num_safekeepers == 1:
name = "single"
else:
name = f"sk{i}"
safekeeper_dir = os.path.join(self.repo_dir, name)
safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
mkdir_if_needed(safekeeper_dir)
args = [
@@ -537,6 +532,8 @@ class SafekeeperEnv:
f"127.0.0.1:{port.http}",
"-D",
safekeeper_dir,
"--id",
str(i),
"--daemonize"
]
@@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str,
def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str:
return ','.join(
[f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names])
def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str:
return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(pg: Postgres):
with closing(pg.connect()) as conn:
@@ -628,9 +624,9 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
http_cli = sk.http_client()
try:
status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"Safekeeper {sk.name} status: {status}")
log.info(f"Safekeeper {sk.id} status: {status}")
except Exception as e:
log.info(f"Safekeeper {sk.name} status error: {e}")
log.info(f"Safekeeper {sk.id} status error: {e}")
zenith_env_builder.num_safekeepers = 4
env = zenith_env_builder.init()
@@ -638,7 +634,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
log.info("Use only first 3 safekeepers")
env.safekeepers[3].stop()
active_safekeepers = ['sk1', 'sk2', 'sk3']
active_safekeepers = [1, 2, 3]
pg = env.postgres.create('test_replace_safekeeper')
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start()
@@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder):
log.info("Recreate postgres to replace failed sk1 with new sk4")
pg.stop_and_destroy().create('test_replace_safekeeper')
active_safekeepers = ['sk2', 'sk3', 'sk4']
active_safekeepers = [2, 3, 4]
env.safekeepers[3].start()
pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers))
pg.start()

View File

@@ -556,19 +556,15 @@ class ZenithEnv:
pg=self.port_distributor.get_port(),
http=self.port_distributor.get_port(),
)
if config.num_safekeepers == 1:
name = "single"
else:
name = f"sk{i}"
id = i # assign ids sequentially
toml += f"""
[[safekeepers]]
name = '{name}'
id = {id}
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
"""
safekeeper = Safekeeper(env=self, name=name, port=port)
safekeeper = Safekeeper(env=self, id=id, port=port)
self.safekeepers.append(safekeeper)
log.info(f"Config: {toml}")
@@ -848,17 +844,17 @@ class ZenithCli:
log.info(f"Stopping pageserver with {cmd}")
return self.raw_cli(cmd)
def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', name])
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', str(id)])
def safekeeper_stop(self,
name: Optional[str] = None,
id: Optional[int] = None,
immediate=False) -> 'subprocess.CompletedProcess[str]':
args = ['safekeeper', 'stop']
if id is not None:
args.extend(str(id))
if immediate:
args.extend(['-m', 'immediate'])
if name is not None:
args.append(name)
return self.raw_cli(args)
def pg_create(
@@ -1397,11 +1393,11 @@ class Safekeeper:
""" An object representing a running safekeeper daemon. """
env: ZenithEnv
port: SafekeeperPort
name: str # identifier for logging
id: int
auth_token: Optional[str] = None
def start(self) -> 'Safekeeper':
self.env.zenith_cli.safekeeper_start(self.name)
self.env.zenith_cli.safekeeper_start(self.id)
# wait for wal acceptor start by checking its status
started_at = time.time()
@@ -1420,8 +1416,8 @@ class Safekeeper:
return self
def stop(self, immediate=False) -> 'Safekeeper':
log.info('Stopping safekeeper {}'.format(self.name))
self.env.zenith_cli.safekeeper_stop(self.name, immediate)
log.info('Stopping safekeeper {}'.format(self.id))
self.env.zenith_cli.safekeeper_stop(self.id, immediate)
return self
def append_logical_message(self,

View File

@@ -1,17 +1,19 @@
//
// Main entry point for the safekeeper executable
//
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use fs2::FileExt;
use std::fs::File;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::control_file::{self, CreateControlFile};
use zenith_utils::http::endpoint;
use zenith_utils::zid::ZNodeId;
use zenith_utils::{logging, tcp_listener, GIT_VERSION};
use tokio::sync::mpsc;
@@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now;
use zenith_utils::signals;
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const ID_FILE_NAME: &str = "safekeeper.id";
fn main() -> Result<()> {
zenith_metrics::set_common_metrics_prefix("safekeeper");
@@ -93,6 +96,9 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Dump control file at path specifed by this argument and exit"),
)
.arg(
Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer")
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -136,10 +142,19 @@ fn main() -> Result<()> {
conf.recall_period = humantime::parse_duration(recall)?;
}
start_safekeeper(conf)
let mut given_id = None;
if let Some(given_id_str) = arg_matches.value_of("id") {
given_id = Some(ZNodeId(
given_id_str
.parse()
.context("failed to parse safekeeper id")?,
));
}
start_safekeeper(conf, given_id)
}
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {}", GIT_VERSION);
@@ -154,6 +169,9 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
)
})?;
// Set or read our ID.
set_id(&mut conf, given_id)?;
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
e
@@ -260,3 +278,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
std::process::exit(111);
})
}
/// Determine safekeeper id and set it in config.
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
let id_file_path = conf.workdir.join(ID_FILE_NAME);
let my_id: ZNodeId;
// If ID exists, read it in; otherwise set one passed
match fs::read(&id_file_path) {
Ok(id_serialized) => {
my_id = ZNodeId(
std::str::from_utf8(&id_serialized)
.context("failed to parse safekeeper id")?
.parse()
.context("failed to parse safekeeper id")?,
);
if let Some(given_id) = given_id {
if given_id != my_id {
bail!(
"safekeeper already initialized with id {}, can't set {}",
my_id,
given_id
);
}
}
info!("safekeeper ID {}", my_id);
}
Err(error) => match error.kind() {
ErrorKind::NotFound => {
my_id = if let Some(given_id) = given_id {
given_id
} else {
bail!("safekeeper id is not specified");
};
let mut f = File::create(&id_file_path)?;
f.write_all(my_id.to_string().as_bytes())?;
f.sync_all()?;
info!("initialized safekeeper ID {}", my_id);
}
_ => {
return Err(error.into());
}
},
}
conf.my_id = my_id;
Ok(())
}

View File

@@ -5,6 +5,7 @@ use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZNodeId;
use zenith_utils::zid::ZTenantTimelineId;
use crate::control_file::CreateControlFile;
@@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response;
use zenith_utils::http::request::parse_request_param;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: ZNodeId,
}
/// Healthcheck handler.
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let conf = get_conf(&request);
let status = SafekeeperStatus { id: conf.my_id };
Ok(json_response(StatusCode::OK, status)?)
}
fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {

View File

@@ -2,7 +2,7 @@
use std::path::PathBuf;
use std::time::Duration;
use zenith_utils::zid::ZTenantTimelineId;
use zenith_utils::zid::{ZNodeId, ZTenantTimelineId};
pub mod callmemaybe;
pub mod control_file;
@@ -46,6 +46,7 @@ pub struct SafeKeeperConf {
pub listen_http_addr: String,
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub my_id: ZNodeId,
}
impl SafeKeeperConf {
@@ -69,6 +70,7 @@ impl Default for SafeKeeperConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
}
}
}

View File

@@ -18,13 +18,13 @@ use walkeeper::defaults::{
};
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId};
use zenith_utils::GIT_VERSION;
use pageserver::branches::BranchInfo;
// Default name of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_NAME: &str = "single";
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1);
fn default_conf() -> String {
format!(
@@ -36,14 +36,14 @@ listen_http_addr = '{pageserver_http_addr}'
auth_type = '{pageserver_auth_type}'
[[safekeepers]]
name = '{safekeeper_name}'
id = '{safekeeper_id}'
pg_port = {safekeeper_pg_port}
http_port = {safekeeper_http_port}
"#,
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
pageserver_auth_type = AuthType::Trust,
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
safekeeper_id = DEFAULT_SAFEKEEPER_ID,
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
)
@@ -74,9 +74,9 @@ fn main() -> Result<()> {
.required(true);
#[rustfmt::skip]
let safekeeper_node_arg = Arg::new("node")
let safekeeper_id_arg = Arg::new("id")
.index(1)
.help("Node name")
.help("safekeeper id")
.required(false);
let timeline_arg = Arg::new("timeline")
@@ -154,16 +154,16 @@ fn main() -> Result<()> {
.about("Manage safekeepers")
.subcommand(App::new("start")
.about("Start local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
)
.subcommand(App::new("stop")
.about("Stop local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
.subcommand(App::new("restart")
.about("Restart local safekeeper")
.arg(safekeeper_node_arg.clone())
.arg(safekeeper_id_arg.clone())
.arg(stop_mode_arg.clone())
)
)
@@ -628,11 +628,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) {
fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result<SafekeeperNode> {
if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) {
Ok(SafekeeperNode::from_env(env, node))
} else {
bail!("could not find safekeeper '{}'", name)
bail!("could not find safekeeper '{}'", id)
}
}
@@ -643,8 +643,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
};
// All the commands take an optional safekeeper name argument
let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME);
let safekeeper = get_safekeeper(env, node_name)?;
let sk_id = if let Some(id_str) = sub_args.value_of("id") {
ZNodeId(id_str.parse().context("while parsing safekeeper id")?)
} else {
DEFAULT_SAFEKEEPER_ID
};
let safekeeper = get_safekeeper(env, sk_id)?;
match sub_name {
"start" => {
@@ -697,7 +701,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start() {
eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e);
eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
exit(1);
}
}
@@ -724,7 +728,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.stop(immediate) {
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e);
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
}
}
Ok(())

View File

@@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId {
}
}
// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued
// by the console.
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZNodeId(pub u64);
impl fmt::Display for ZNodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use std::fmt::Display;