diff --git a/control_plane/safekeepers.conf b/control_plane/safekeepers.conf index 828d5a5a1e..df7dd2adca 100644 --- a/control_plane/safekeepers.conf +++ b/control_plane/safekeepers.conf @@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898' auth_type = 'Trust' [[safekeepers]] -name = 'sk1' +id = 1 pg_port = 5454 http_port = 7676 [[safekeepers]] -name = 'sk2' +id = 2 pg_port = 5455 http_port = 7677 [[safekeepers]] -name = 'sk3' +id = 3 pg_port = 5456 http_port = 7678 diff --git a/control_plane/simple.conf b/control_plane/simple.conf index 796c6adbd9..2243a0a5f8 100644 --- a/control_plane/simple.conf +++ b/control_plane/simple.conf @@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898' auth_type = 'Trust' [[safekeepers]] -name = 'single' +id = 1 pg_port = 5454 http_port = 7676 diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index b80e137cb9..787028eece 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -12,7 +12,9 @@ use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{opt_display_serde, ZTenantId}; +use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId}; + +use crate::safekeeper::SafekeeperNode; // // This data structures represents zenith CLI config @@ -87,7 +89,7 @@ impl Default for PageServerConf { #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(default)] pub struct SafekeeperConf { - pub name: String, + pub id: ZNodeId, pub pg_port: u16, pub http_port: u16, pub sync: bool, @@ -96,7 +98,7 @@ pub struct SafekeeperConf { impl Default for SafekeeperConf { fn default() -> Self { Self { - name: String::new(), + id: ZNodeId(0), pg_port: 0, http_port: 0, sync: true, @@ -136,8 +138,8 @@ impl LocalEnv { self.base_data_dir.clone() } - pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf { - self.base_data_dir.join("safekeepers").join(node_name) + pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf { + self.base_data_dir.join("safekeepers").join(data_dir_name) } /// Create a LocalEnv from a config file. @@ -285,7 +287,7 @@ impl LocalEnv { fs::create_dir_all(self.pg_data_dirs_path())?; for safekeeper in &self.safekeepers { - fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?; + fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?; } let mut conf_content = String::new(); diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index f5478b5922..351d1efbbc 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; +use zenith_utils::zid::ZNodeId; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; @@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response { // #[derive(Debug)] pub struct SafekeeperNode { - pub name: String, + pub id: ZNodeId, pub conf: SafekeeperConf, @@ -77,10 +78,10 @@ impl SafekeeperNode { pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode { let pageserver = Arc::new(PageServerNode::from_env(env)); - println!("initializing for {} for {}", conf.name, conf.http_port); + println!("initializing for sk {} for {}", conf.id, conf.http_port); SafekeeperNode { - name: conf.name.clone(), + id: conf.id, conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), env: env.clone(), @@ -98,8 +99,12 @@ impl SafekeeperNode { .unwrap() } + pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf { + env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref()) + } + pub fn datadir_path(&self) -> PathBuf { - self.env.safekeeper_data_dir(&self.name) + SafekeeperNode::datadir_path_by_id(&self.env, self.id) } pub fn pid_file(&self) -> PathBuf { @@ -120,6 +125,7 @@ impl SafekeeperNode { let mut cmd = Command::new(self.env.safekeeper_bin()?); fill_rust_env_vars( cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) + .args(&["--id", self.id.to_string().as_ref()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--recall", "1 second"]) @@ -183,7 +189,7 @@ impl SafekeeperNode { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { let pid_file = self.pid_file(); if !pid_file.exists() { - println!("Safekeeper {} is already stopped", self.name); + println!("Safekeeper {} is already stopped", self.id); return Ok(()); } let pid = read_pidfile(&pid_file)?; diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 4d9e18bb58..5685bd57e4 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -521,12 +521,7 @@ class SafekeeperEnv: http=self.port_distributor.get_port(), ) - if self.num_safekeepers == 1: - name = "single" - else: - name = f"sk{i}" - - safekeeper_dir = os.path.join(self.repo_dir, name) + safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}") mkdir_if_needed(safekeeper_dir) args = [ @@ -537,6 +532,8 @@ class SafekeeperEnv: f"127.0.0.1:{port.http}", "-D", safekeeper_dir, + "--id", + str(i), "--daemonize" ] @@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str, def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): - def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str: - return ','.join( - [f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names]) + def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str: + return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names]) def execute_payload(pg: Postgres): with closing(pg.connect()) as conn: @@ -628,9 +624,9 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): http_cli = sk.http_client() try: status = http_cli.timeline_status(tenant_id, timeline_id) - log.info(f"Safekeeper {sk.name} status: {status}") + log.info(f"Safekeeper {sk.id} status: {status}") except Exception as e: - log.info(f"Safekeeper {sk.name} status error: {e}") + log.info(f"Safekeeper {sk.id} status error: {e}") zenith_env_builder.num_safekeepers = 4 env = zenith_env_builder.init() @@ -638,7 +634,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = ['sk1', 'sk2', 'sk3'] + active_safekeepers = [1, 2, 3] pg = env.postgres.create('test_replace_safekeeper') pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.start() @@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): log.info("Recreate postgres to replace failed sk1 with new sk4") pg.stop_and_destroy().create('test_replace_safekeeper') - active_safekeepers = ['sk2', 'sk3', 'sk4'] + active_safekeepers = [2, 3, 4] env.safekeepers[3].start() pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.start() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 6238ad745f..6312f5d99d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -556,19 +556,15 @@ class ZenithEnv: pg=self.port_distributor.get_port(), http=self.port_distributor.get_port(), ) - - if config.num_safekeepers == 1: - name = "single" - else: - name = f"sk{i}" + id = i # assign ids sequentially toml += f""" [[safekeepers]] -name = '{name}' +id = {id} pg_port = {port.pg} http_port = {port.http} sync = false # Disable fsyncs to make the tests go faster """ - safekeeper = Safekeeper(env=self, name=name, port=port) + safekeeper = Safekeeper(env=self, id=id, port=port) self.safekeepers.append(safekeeper) log.info(f"Config: {toml}") @@ -848,17 +844,17 @@ class ZenithCli: log.info(f"Stopping pageserver with {cmd}") return self.raw_cli(cmd) - def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]': - return self.raw_cli(['safekeeper', 'start', name]) + def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]': + return self.raw_cli(['safekeeper', 'start', str(id)]) def safekeeper_stop(self, - name: Optional[str] = None, + id: Optional[int] = None, immediate=False) -> 'subprocess.CompletedProcess[str]': args = ['safekeeper', 'stop'] + if id is not None: + args.extend(str(id)) if immediate: args.extend(['-m', 'immediate']) - if name is not None: - args.append(name) return self.raw_cli(args) def pg_create( @@ -1397,11 +1393,11 @@ class Safekeeper: """ An object representing a running safekeeper daemon. """ env: ZenithEnv port: SafekeeperPort - name: str # identifier for logging + id: int auth_token: Optional[str] = None def start(self) -> 'Safekeeper': - self.env.zenith_cli.safekeeper_start(self.name) + self.env.zenith_cli.safekeeper_start(self.id) # wait for wal acceptor start by checking its status started_at = time.time() @@ -1420,8 +1416,8 @@ class Safekeeper: return self def stop(self, immediate=False) -> 'Safekeeper': - log.info('Stopping safekeeper {}'.format(self.name)) - self.env.zenith_cli.safekeeper_stop(self.name, immediate) + log.info('Stopping safekeeper {}'.format(self.id)) + self.env.zenith_cli.safekeeper_stop(self.id, immediate) return self def append_logical_message(self, diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index ea5d0cba14..f36ced7075 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -1,17 +1,19 @@ // // Main entry point for the safekeeper executable // -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; use fs2::FileExt; -use std::fs::File; +use std::fs::{self, File}; +use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; use walkeeper::control_file::{self, CreateControlFile}; use zenith_utils::http::endpoint; +use zenith_utils::zid::ZNodeId; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; use tokio::sync::mpsc; @@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now; use zenith_utils::signals; const LOCK_FILE_NAME: &str = "safekeeper.lock"; +const ID_FILE_NAME: &str = "safekeeper.id"; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -93,6 +96,9 @@ fn main() -> Result<()> { .takes_value(true) .help("Dump control file at path specifed by this argument and exit"), ) + .arg( + Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer") + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -136,10 +142,19 @@ fn main() -> Result<()> { conf.recall_period = humantime::parse_duration(recall)?; } - start_safekeeper(conf) + let mut given_id = None; + if let Some(given_id_str) = arg_matches.value_of("id") { + given_id = Some(ZNodeId( + given_id_str + .parse() + .context("failed to parse safekeeper id")?, + )); + } + + start_safekeeper(conf, given_id) } -fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option) -> Result<()> { let log_file = logging::init("safekeeper.log", conf.daemonize)?; info!("version: {}", GIT_VERSION); @@ -154,6 +169,9 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { ) })?; + // Set or read our ID. + set_id(&mut conf, given_id)?; + let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { error!("failed to bind to address {}: {}", conf.listen_http_addr, e); e @@ -260,3 +278,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { std::process::exit(111); }) } + +/// Determine safekeeper id and set it in config. +fn set_id(conf: &mut SafeKeeperConf, given_id: Option) -> Result<()> { + let id_file_path = conf.workdir.join(ID_FILE_NAME); + + let my_id: ZNodeId; + // If ID exists, read it in; otherwise set one passed + match fs::read(&id_file_path) { + Ok(id_serialized) => { + my_id = ZNodeId( + std::str::from_utf8(&id_serialized) + .context("failed to parse safekeeper id")? + .parse() + .context("failed to parse safekeeper id")?, + ); + if let Some(given_id) = given_id { + if given_id != my_id { + bail!( + "safekeeper already initialized with id {}, can't set {}", + my_id, + given_id + ); + } + } + info!("safekeeper ID {}", my_id); + } + Err(error) => match error.kind() { + ErrorKind::NotFound => { + my_id = if let Some(given_id) = given_id { + given_id + } else { + bail!("safekeeper id is not specified"); + }; + let mut f = File::create(&id_file_path)?; + f.write_all(my_id.to_string().as_bytes())?; + f.sync_all()?; + info!("initialized safekeeper ID {}", my_id); + } + _ => { + return Err(error.into()); + } + }, + } + conf.my_id = my_id; + Ok(()) +} diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 11a29ac6d3..bc992c6a6f 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use std::sync::Arc; use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZNodeId; use zenith_utils::zid::ZTenantTimelineId; use crate::control_file::CreateControlFile; @@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response; use zenith_utils::http::request::parse_request_param; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +#[derive(Debug, Serialize)] +struct SafekeeperStatus { + id: ZNodeId, +} + /// Healthcheck handler. -async fn status_handler(_: Request) -> Result, ApiError> { - Ok(json_response(StatusCode::OK, "")?) +async fn status_handler(request: Request) -> Result, ApiError> { + let conf = get_conf(&request); + let status = SafekeeperStatus { id: conf.my_id }; + Ok(json_response(StatusCode::OK, status)?) } fn get_conf(request: &Request) -> &SafeKeeperConf { diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 6c3e0b264e..dfd71e4de2 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::time::Duration; -use zenith_utils::zid::ZTenantTimelineId; +use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; pub mod callmemaybe; pub mod control_file; @@ -46,6 +46,7 @@ pub struct SafeKeeperConf { pub listen_http_addr: String, pub ttl: Option, pub recall_period: Duration, + pub my_id: ZNodeId, } impl SafeKeeperConf { @@ -69,6 +70,7 @@ impl Default for SafeKeeperConf { listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), ttl: None, recall_period: defaults::DEFAULT_RECALL_PERIOD, + my_id: ZNodeId(0), } } } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index a2a762f5be..904896f01e 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -18,13 +18,13 @@ use walkeeper::defaults::{ }; use zenith_utils::auth::{Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use zenith_utils::GIT_VERSION; use pageserver::branches::BranchInfo; -// Default name of a safekeeper node, if not specified on the command line. -const DEFAULT_SAFEKEEPER_NAME: &str = "single"; +// Default id of a safekeeper node, if not specified on the command line. +const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1); fn default_conf() -> String { format!( @@ -36,14 +36,14 @@ listen_http_addr = '{pageserver_http_addr}' auth_type = '{pageserver_auth_type}' [[safekeepers]] -name = '{safekeeper_name}' +id = '{safekeeper_id}' pg_port = {safekeeper_pg_port} http_port = {safekeeper_http_port} "#, pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR, pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR, pageserver_auth_type = AuthType::Trust, - safekeeper_name = DEFAULT_SAFEKEEPER_NAME, + safekeeper_id = DEFAULT_SAFEKEEPER_ID, safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT, safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT, ) @@ -74,9 +74,9 @@ fn main() -> Result<()> { .required(true); #[rustfmt::skip] - let safekeeper_node_arg = Arg::new("node") + let safekeeper_id_arg = Arg::new("id") .index(1) - .help("Node name") + .help("safekeeper id") .required(false); let timeline_arg = Arg::new("timeline") @@ -154,16 +154,16 @@ fn main() -> Result<()> { .about("Manage safekeepers") .subcommand(App::new("start") .about("Start local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) ) .subcommand(App::new("stop") .about("Stop local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) .arg(stop_mode_arg.clone()) ) .subcommand(App::new("restart") .about("Restart local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) .arg(stop_mode_arg.clone()) ) ) @@ -628,11 +628,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result { - if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) { +fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result { + if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) { Ok(SafekeeperNode::from_env(env, node)) } else { - bail!("could not find safekeeper '{}'", name) + bail!("could not find safekeeper '{}'", id) } } @@ -643,8 +643,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul }; // All the commands take an optional safekeeper name argument - let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME); - let safekeeper = get_safekeeper(env, node_name)?; + let sk_id = if let Some(id_str) = sub_args.value_of("id") { + ZNodeId(id_str.parse().context("while parsing safekeeper id")?) + } else { + DEFAULT_SAFEKEEPER_ID + }; + let safekeeper = get_safekeeper(env, sk_id)?; match sub_name { "start" => { @@ -697,7 +701,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.start() { - eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e); + eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e); exit(1); } } @@ -724,7 +728,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result< for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.stop(immediate) { - eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e); + eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e); } } Ok(()) diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index 2e93ab596c..7dfffd96d7 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId { } } +// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued +// by the console. +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ZNodeId(pub u64); + +impl fmt::Display for ZNodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + #[cfg(test)] mod tests { use std::fmt::Display;