diff --git a/control_plane/safekeepers.conf b/control_plane/safekeepers.conf index 828d5a5a1e..df7dd2adca 100644 --- a/control_plane/safekeepers.conf +++ b/control_plane/safekeepers.conf @@ -5,16 +5,16 @@ listen_http_addr = '127.0.0.1:9898' auth_type = 'Trust' [[safekeepers]] -name = 'sk1' +id = 1 pg_port = 5454 http_port = 7676 [[safekeepers]] -name = 'sk2' +id = 2 pg_port = 5455 http_port = 7677 [[safekeepers]] -name = 'sk3' +id = 3 pg_port = 5456 http_port = 7678 diff --git a/control_plane/simple.conf b/control_plane/simple.conf index 796c6adbd9..2243a0a5f8 100644 --- a/control_plane/simple.conf +++ b/control_plane/simple.conf @@ -6,6 +6,6 @@ listen_http_addr = '127.0.0.1:9898' auth_type = 'Trust' [[safekeepers]] -name = 'single' +id = 1 pg_port = 5454 http_port = 7676 diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index b80e137cb9..55d0b00496 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -12,7 +12,9 @@ use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{opt_display_serde, ZTenantId}; +use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId}; + +use crate::safekeeper::SafekeeperNode; // // This data structures represents zenith CLI config @@ -62,6 +64,8 @@ pub struct LocalEnv { #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(default)] pub struct PageServerConf { + // node id + pub id: ZNodeId, // Pageserver connection settings pub listen_pg_addr: String, pub listen_http_addr: String, @@ -76,6 +80,7 @@ pub struct PageServerConf { impl Default for PageServerConf { fn default() -> Self { Self { + id: ZNodeId(0), listen_pg_addr: String::new(), listen_http_addr: String::new(), auth_type: AuthType::Trust, @@ -87,7 +92,7 @@ impl Default for PageServerConf { #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(default)] pub struct SafekeeperConf { - pub name: String, + pub id: ZNodeId, pub pg_port: u16, pub http_port: u16, pub sync: bool, @@ -96,7 +101,7 @@ pub struct SafekeeperConf { impl Default for SafekeeperConf { fn default() -> Self { Self { - name: String::new(), + id: ZNodeId(0), pg_port: 0, http_port: 0, sync: true, @@ -136,8 +141,8 @@ impl LocalEnv { self.base_data_dir.clone() } - pub fn safekeeper_data_dir(&self, node_name: &str) -> PathBuf { - self.base_data_dir.join("safekeepers").join(node_name) + pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf { + self.base_data_dir.join("safekeepers").join(data_dir_name) } /// Create a LocalEnv from a config file. @@ -285,7 +290,7 @@ impl LocalEnv { fs::create_dir_all(self.pg_data_dirs_path())?; for safekeeper in &self.safekeepers { - fs::create_dir_all(self.safekeeper_data_dir(&safekeeper.name))?; + fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?; } let mut conf_content = String::new(); diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index f5478b5922..351d1efbbc 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -15,6 +15,7 @@ use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; +use zenith_utils::zid::ZNodeId; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; @@ -61,7 +62,7 @@ impl ResponseErrorMessageExt for Response { // #[derive(Debug)] pub struct SafekeeperNode { - pub name: String, + pub id: ZNodeId, pub conf: SafekeeperConf, @@ -77,10 +78,10 @@ impl SafekeeperNode { pub fn from_env(env: &LocalEnv, conf: &SafekeeperConf) -> SafekeeperNode { let pageserver = Arc::new(PageServerNode::from_env(env)); - println!("initializing for {} for {}", conf.name, conf.http_port); + println!("initializing for sk {} for {}", conf.id, conf.http_port); SafekeeperNode { - name: conf.name.clone(), + id: conf.id, conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(conf.pg_port), env: env.clone(), @@ -98,8 +99,12 @@ impl SafekeeperNode { .unwrap() } + pub fn datadir_path_by_id(env: &LocalEnv, sk_id: ZNodeId) -> PathBuf { + env.safekeeper_data_dir(format!("sk{}", sk_id).as_ref()) + } + pub fn datadir_path(&self) -> PathBuf { - self.env.safekeeper_data_dir(&self.name) + SafekeeperNode::datadir_path_by_id(&self.env, self.id) } pub fn pid_file(&self) -> PathBuf { @@ -120,6 +125,7 @@ impl SafekeeperNode { let mut cmd = Command::new(self.env.safekeeper_bin()?); fill_rust_env_vars( cmd.args(&["-D", self.datadir_path().to_str().unwrap()]) + .args(&["--id", self.id.to_string().as_ref()]) .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--recall", "1 second"]) @@ -183,7 +189,7 @@ impl SafekeeperNode { pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { let pid_file = self.pid_file(); if !pid_file.exists() { - println!("Safekeeper {} is already stopped", self.name); + println!("Safekeeper {} is already stopped", self.id); return Ok(()); } let pid = read_pidfile(&pid_file)?; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index be594889ab..cd429e3f7a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -103,6 +103,8 @@ impl PageServerNode { ) -> anyhow::Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); + let id = format!("id={}", self.env.pageserver.id); + // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. let base_data_dir_param = self.env.base_data_dir.display().to_string(); let pg_distrib_dir_param = @@ -122,6 +124,7 @@ impl PageServerNode { args.extend(["-c", &authg_type_param]); args.extend(["-c", &listen_http_addr_param]); args.extend(["-c", &listen_pg_addr_param]); + args.extend(["-c", &id]); for config_override in config_overrides { args.extend(["-c", config_override]); diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 45c41b4c19..93bb5f9cd7 100755 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -4,7 +4,7 @@ set -eux if [ "$1" = 'pageserver' ]; then if [ ! -d "/data/tenants" ]; then echo "Initializing pageserver data directory" - pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" + pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" fi echo "Staring pageserver at 0.0.0.0:6400" pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fb8baa28f6..d8d4033340 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -61,7 +61,7 @@ fn main() -> Result<()> { .number_of_values(1) .multiple_occurrences(true) .help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). - Any option has to be a valid toml document, example: `-c \"foo='hey'\"` `-c \"foo={value=1}\"`"), + Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"), ) .get_matches(); @@ -115,7 +115,14 @@ fn main() -> Result<()> { option_line ) })?; + for (key, item) in doc.iter() { + if key == "id" { + anyhow::ensure!( + init, + "node id can only be set during pageserver init and cannot be overridden" + ); + } toml.insert(key, item.clone()); } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 8b65e7e2e6..3deabb7521 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -8,7 +8,7 @@ use anyhow::{bail, ensure, Context, Result}; use toml_edit; use toml_edit::{Document, Item}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use std::convert::TryInto; use std::env; @@ -78,6 +78,10 @@ pub mod defaults { #[derive(Debug, Clone, PartialEq, Eq)] pub struct PageServerConf { + // Identifier of that particular pageserver so e g safekeepers + // can safely distinguish different pageservers + pub id: ZNodeId, + /// Example (default): 127.0.0.1:64000 pub listen_pg_addr: String, /// Example (default): 127.0.0.1:9898 @@ -118,6 +122,206 @@ pub struct PageServerConf { pub remote_storage_config: Option, } +// use dedicated enum for builder to better indicate the intention +// and avoid possible confusion with nested options +pub enum BuilderValue { + Set(T), + NotSet, +} + +impl BuilderValue { + pub fn ok_or(self, err: E) -> Result { + match self { + Self::Set(v) => Ok(v), + Self::NotSet => Err(err), + } + } +} + +// needed to simplify config construction +struct PageServerConfigBuilder { + listen_pg_addr: BuilderValue, + + listen_http_addr: BuilderValue, + + checkpoint_distance: BuilderValue, + checkpoint_period: BuilderValue, + + gc_horizon: BuilderValue, + gc_period: BuilderValue, + + wait_lsn_timeout: BuilderValue, + wal_redo_timeout: BuilderValue, + + superuser: BuilderValue, + + page_cache_size: BuilderValue, + max_file_descriptors: BuilderValue, + + workdir: BuilderValue, + + pg_distrib_dir: BuilderValue, + + auth_type: BuilderValue, + + // + auth_validation_public_key_path: BuilderValue>, + remote_storage_config: BuilderValue>, + + id: BuilderValue, +} + +impl Default for PageServerConfigBuilder { + fn default() -> Self { + use self::BuilderValue::*; + use defaults::*; + Self { + listen_pg_addr: Set(DEFAULT_PG_LISTEN_ADDR.to_string()), + listen_http_addr: Set(DEFAULT_HTTP_LISTEN_ADDR.to_string()), + checkpoint_distance: Set(DEFAULT_CHECKPOINT_DISTANCE), + checkpoint_period: Set(humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD) + .expect("cannot parse default checkpoint period")), + gc_horizon: Set(DEFAULT_GC_HORIZON), + gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD) + .expect("cannot parse default gc period")), + wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT) + .expect("cannot parse default wait lsn timeout")), + wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT) + .expect("cannot parse default wal redo timeout")), + superuser: Set(DEFAULT_SUPERUSER.to_string()), + page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE), + max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS), + workdir: Set(PathBuf::new()), + pg_distrib_dir: Set(env::current_dir() + .expect("cannot access current directory") + .join("tmp_install")), + auth_type: Set(AuthType::Trust), + auth_validation_public_key_path: Set(None), + remote_storage_config: Set(None), + id: NotSet, + } + } +} + +impl PageServerConfigBuilder { + pub fn listen_pg_addr(&mut self, listen_pg_addr: String) { + self.listen_pg_addr = BuilderValue::Set(listen_pg_addr) + } + + pub fn listen_http_addr(&mut self, listen_http_addr: String) { + self.listen_http_addr = BuilderValue::Set(listen_http_addr) + } + + pub fn checkpoint_distance(&mut self, checkpoint_distance: u64) { + self.checkpoint_distance = BuilderValue::Set(checkpoint_distance) + } + + pub fn checkpoint_period(&mut self, checkpoint_period: Duration) { + self.checkpoint_period = BuilderValue::Set(checkpoint_period) + } + + pub fn gc_horizon(&mut self, gc_horizon: u64) { + self.gc_horizon = BuilderValue::Set(gc_horizon) + } + + pub fn gc_period(&mut self, gc_period: Duration) { + self.gc_period = BuilderValue::Set(gc_period) + } + + pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) { + self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout) + } + + pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) { + self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout) + } + + pub fn superuser(&mut self, superuser: String) { + self.superuser = BuilderValue::Set(superuser) + } + + pub fn page_cache_size(&mut self, page_cache_size: usize) { + self.page_cache_size = BuilderValue::Set(page_cache_size) + } + + pub fn max_file_descriptors(&mut self, max_file_descriptors: usize) { + self.max_file_descriptors = BuilderValue::Set(max_file_descriptors) + } + + pub fn workdir(&mut self, workdir: PathBuf) { + self.workdir = BuilderValue::Set(workdir) + } + + pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) { + self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir) + } + + pub fn auth_type(&mut self, auth_type: AuthType) { + self.auth_type = BuilderValue::Set(auth_type) + } + + pub fn auth_validation_public_key_path( + &mut self, + auth_validation_public_key_path: Option, + ) { + self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path) + } + + pub fn remote_storage_config(&mut self, remote_storage_config: Option) { + self.remote_storage_config = BuilderValue::Set(remote_storage_config) + } + + pub fn id(&mut self, node_id: ZNodeId) { + self.id = BuilderValue::Set(node_id) + } + + pub fn build(self) -> Result { + Ok(PageServerConf { + listen_pg_addr: self + .listen_pg_addr + .ok_or(anyhow::anyhow!("missing listen_pg_addr"))?, + listen_http_addr: self + .listen_http_addr + .ok_or(anyhow::anyhow!("missing listen_http_addr"))?, + checkpoint_distance: self + .checkpoint_distance + .ok_or(anyhow::anyhow!("missing checkpoint_distance"))?, + checkpoint_period: self + .checkpoint_period + .ok_or(anyhow::anyhow!("missing checkpoint_period"))?, + gc_horizon: self + .gc_horizon + .ok_or(anyhow::anyhow!("missing gc_horizon"))?, + gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?, + wait_lsn_timeout: self + .wait_lsn_timeout + .ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?, + wal_redo_timeout: self + .wal_redo_timeout + .ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?, + superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?, + page_cache_size: self + .page_cache_size + .ok_or(anyhow::anyhow!("missing page_cache_size"))?, + max_file_descriptors: self + .max_file_descriptors + .ok_or(anyhow::anyhow!("missing max_file_descriptors"))?, + workdir: self.workdir.ok_or(anyhow::anyhow!("missing workdir"))?, + pg_distrib_dir: self + .pg_distrib_dir + .ok_or(anyhow::anyhow!("missing pg_distrib_dir"))?, + auth_type: self.auth_type.ok_or(anyhow::anyhow!("missing auth_type"))?, + auth_validation_public_key_path: self + .auth_validation_public_key_path + .ok_or(anyhow::anyhow!("missing auth_validation_public_key_path"))?, + remote_storage_config: self + .remote_storage_config + .ok_or(anyhow::anyhow!("missing remote_storage_config"))?, + id: self.id.ok_or(anyhow::anyhow!("missing id"))?, + }) + } +} + /// External backup storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone, PartialEq, Eq)] pub struct RemoteStorageConfig { @@ -233,61 +437,41 @@ impl PageServerConf { /// /// This leaves any options not present in the file in the built-in defaults. pub fn parse_and_validate(toml: &Document, workdir: &Path) -> Result { - use defaults::*; - - let mut conf = PageServerConf { - workdir: workdir.to_path_buf(), - - listen_pg_addr: DEFAULT_PG_LISTEN_ADDR.to_string(), - listen_http_addr: DEFAULT_HTTP_LISTEN_ADDR.to_string(), - checkpoint_distance: DEFAULT_CHECKPOINT_DISTANCE, - checkpoint_period: humantime::parse_duration(DEFAULT_CHECKPOINT_PERIOD)?, - gc_horizon: DEFAULT_GC_HORIZON, - gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)?, - wait_lsn_timeout: humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)?, - wal_redo_timeout: humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)?, - page_cache_size: DEFAULT_PAGE_CACHE_SIZE, - max_file_descriptors: DEFAULT_MAX_FILE_DESCRIPTORS, - - pg_distrib_dir: PathBuf::new(), - auth_validation_public_key_path: None, - auth_type: AuthType::Trust, - - remote_storage_config: None, - - superuser: DEFAULT_SUPERUSER.to_string(), - }; + let mut builder = PageServerConfigBuilder::default(); + builder.workdir(workdir.to_owned()); for (key, item) in toml.iter() { match key { - "listen_pg_addr" => conf.listen_pg_addr = parse_toml_string(key, item)?, - "listen_http_addr" => conf.listen_http_addr = parse_toml_string(key, item)?, - "checkpoint_distance" => conf.checkpoint_distance = parse_toml_u64(key, item)?, - "checkpoint_period" => conf.checkpoint_period = parse_toml_duration(key, item)?, - "gc_horizon" => conf.gc_horizon = parse_toml_u64(key, item)?, - "gc_period" => conf.gc_period = parse_toml_duration(key, item)?, - "wait_lsn_timeout" => conf.wait_lsn_timeout = parse_toml_duration(key, item)?, - "wal_redo_timeout" => conf.wal_redo_timeout = parse_toml_duration(key, item)?, - "initial_superuser_name" => conf.superuser = parse_toml_string(key, item)?, - "page_cache_size" => conf.page_cache_size = parse_toml_u64(key, item)? as usize, + "listen_pg_addr" => builder.listen_pg_addr(parse_toml_string(key, item)?), + "listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?), + "checkpoint_distance" => builder.checkpoint_distance(parse_toml_u64(key, item)?), + "checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?), + "gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?), + "gc_period" => builder.gc_period(parse_toml_duration(key, item)?), + "wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?), + "wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?), + "initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?), + "page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize), "max_file_descriptors" => { - conf.max_file_descriptors = parse_toml_u64(key, item)? as usize + builder.max_file_descriptors(parse_toml_u64(key, item)? as usize) } "pg_distrib_dir" => { - conf.pg_distrib_dir = PathBuf::from(parse_toml_string(key, item)?) + builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?)) } - "auth_validation_public_key_path" => { - conf.auth_validation_public_key_path = - Some(PathBuf::from(parse_toml_string(key, item)?)) - } - "auth_type" => conf.auth_type = parse_toml_auth_type(key, item)?, + "auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some( + PathBuf::from(parse_toml_string(key, item)?), + )), + "auth_type" => builder.auth_type(parse_toml_auth_type(key, item)?), "remote_storage" => { - conf.remote_storage_config = Some(Self::parse_remote_storage_config(item)?) + builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?)) } + "id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)), _ => bail!("unrecognized pageserver option '{}'", key), } } + let mut conf = builder.build().context("invalid config")?; + if conf.auth_type == AuthType::ZenithJWT { let auth_validation_public_key_path = conf .auth_validation_public_key_path @@ -301,9 +485,6 @@ impl PageServerConf { ); } - if conf.pg_distrib_dir == PathBuf::new() { - conf.pg_distrib_dir = env::current_dir()?.join("tmp_install") - }; if !conf.pg_distrib_dir.join("bin/postgres").exists() { bail!( "Can't find postgres binary at {}", @@ -398,6 +579,7 @@ impl PageServerConf { #[cfg(test)] pub fn dummy_conf(repo_dir: PathBuf) -> Self { PageServerConf { + id: ZNodeId(0), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, checkpoint_period: Duration::from_secs(10), gc_horizon: defaults::DEFAULT_GC_HORIZON, @@ -482,15 +664,16 @@ max_file_descriptors = 333 # initial superuser role name to use when creating a new tenant initial_superuser_name = 'zzzz' +id = 10 - "#; +"#; #[test] fn parse_defaults() -> anyhow::Result<()> { let tempdir = tempdir()?; let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?; // we have to create dummy pathes to overcome the validation errors - let config_string = format!("pg_distrib_dir='{}'", pg_distrib_dir.display()); + let config_string = format!("pg_distrib_dir='{}'\nid=10", pg_distrib_dir.display()); let toml = config_string.parse()?; let parsed_config = @@ -501,6 +684,7 @@ initial_superuser_name = 'zzzz' assert_eq!( parsed_config, PageServerConf { + id: ZNodeId(10), listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, @@ -544,6 +728,7 @@ initial_superuser_name = 'zzzz' assert_eq!( parsed_config, PageServerConf { + id: ZNodeId(10), listen_pg_addr: "127.0.0.1:64000".to_string(), listen_http_addr: "127.0.0.1:9898".to_string(), checkpoint_distance: 111, diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 6ce377c535..5d7398ef03 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; use crate::ZTenantId; +use zenith_utils::zid::ZNodeId; #[derive(Serialize, Deserialize)] pub struct BranchCreateRequest { @@ -15,3 +16,8 @@ pub struct TenantCreateRequest { #[serde(with = "hex")] pub tenant_id: ZTenantId, } + +#[derive(Serialize)] +pub struct StatusResponse { + pub id: ZNodeId, +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index dcb81849e0..baf81fcf21 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -17,6 +17,11 @@ paths: application/json: schema: type: object + required: + - id + properties: + id: + type: integer /v1/timeline/{tenant_id}: parameters: - name: tenant_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index b13a45750e..4fc41d6e82 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use hyper::header; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use serde::Serialize; @@ -23,6 +22,7 @@ use zenith_utils::lsn::Lsn; use zenith_utils::zid::{opt_display_serde, ZTimelineId}; use super::models::BranchCreateRequest; +use super::models::StatusResponse; use super::models::TenantCreateRequest; use crate::branches::BranchInfo; use crate::repository::RepositoryTimeline; @@ -64,12 +64,12 @@ fn get_config(request: &Request) -> &'static PageServerConf { } // healthcheck handler -async fn status_handler(_: Request) -> Result, ApiError> { - Ok(Response::builder() - .status(StatusCode::OK) - .header(header::CONTENT_TYPE, "application/json") - .body(Body::from("{}")) - .map_err(ApiError::from_err)?) +async fn status_handler(request: Request) -> Result, ApiError> { + let config = get_config(&request); + Ok(json_response( + StatusCode::OK, + StatusResponse { id: config.id }, + )?) } async fn branch_create_handler(mut request: Request) -> Result, ApiError> { diff --git a/test_runner/README.md b/test_runner/README.md index 514c5f1e3a..a56c2df2c0 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -89,7 +89,7 @@ def test_foobar(zenith_env_builder: ZenithEnvBuilder): # Now create the environment. This initializes the repository, and starts # up the page server and the safekeepers - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # Run the test ... diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index 7f86986e2e..ee1a09c917 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -8,7 +8,7 @@ import pytest def test_pageserver_auth(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.pageserver_auth_enabled = True - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() ps = env.pageserver @@ -51,7 +51,7 @@ def test_compute_auth_to_pageserver(zenith_env_builder: ZenithEnvBuilder, with_w zenith_env_builder.pageserver_auth_enabled = True if with_wal_acceptors: zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() branch = f"test_compute_auth_to_pageserver{with_wal_acceptors}" env.zenith_cli.create_branch(branch, "main") diff --git a/test_runner/batch_others/test_backpressure.py b/test_runner/batch_others/test_backpressure.py index 23af5b90ed..2b064c9fa8 100644 --- a/test_runner/batch_others/test_backpressure.py +++ b/test_runner/batch_others/test_backpressure.py @@ -93,7 +93,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # Create a branch for us env.zenith_cli.create_branch("test_backpressure", "main") diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 860db51c8a..509c46975e 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -19,7 +19,7 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): # # See https://github.com/zenithdb/zenith/issues/1068 zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # Branch at the point where only 100 rows were inserted env.zenith_cli.create_branch("test_branch_behind", "main") diff --git a/test_runner/batch_others/test_next_xid.py b/test_runner/batch_others/test_next_xid.py index 625abc39d3..fd0f761409 100644 --- a/test_runner/batch_others/test_next_xid.py +++ b/test_runner/batch_others/test_next_xid.py @@ -11,7 +11,7 @@ from fixtures.log_helper import log def test_next_xid(zenith_env_builder: ZenithEnvBuilder): # One safekeeper is enough for this test. zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() pg = env.postgres.create_start('main') diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index eccffc4d69..ba1f106c4b 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -1,8 +1,15 @@ -import json from uuid import uuid4, UUID -from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient -from typing import cast -import pytest, psycopg2 +import pytest +from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient, zenith_binpath + + +# test that we cannot override node id +def test_pageserver_init_node_id(zenith_env_builder: ZenithEnvBuilder): + env = zenith_env_builder.init() + with pytest.raises( + Exception, + match="node id can only be set during pageserver init and cannot be overridden"): + env.pageserver.start(overrides=['--pageserver-config-override=id=10']) def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID): @@ -41,7 +48,7 @@ def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv): def test_pageserver_http_api_client_auth_enabled(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.pageserver_auth_enabled = True - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() management_token = env.auth_keys.generate_management_token() diff --git a/test_runner/batch_others/test_pageserver_catchup.py b/test_runner/batch_others/test_pageserver_catchup.py index 97dc0f3260..985d1a3af0 100644 --- a/test_runner/batch_others/test_pageserver_catchup.py +++ b/test_runner/batch_others/test_pageserver_catchup.py @@ -14,7 +14,7 @@ from fixtures.log_helper import log # and new compute node contains all data. def test_pageserver_catchup_while_compute_down(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_pageserver_catchup_while_compute_down", "main") pg = env.postgres.create_start('test_pageserver_catchup_while_compute_down') diff --git a/test_runner/batch_others/test_pageserver_restart.py b/test_runner/batch_others/test_pageserver_restart.py index 0cfc50f0ff..ec93c2cf5b 100644 --- a/test_runner/batch_others/test_pageserver_restart.py +++ b/test_runner/batch_others/test_pageserver_restart.py @@ -13,7 +13,7 @@ from fixtures.log_helper import log def test_pageserver_restart(zenith_env_builder: ZenithEnvBuilder): # One safekeeper is enough for this test. zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_pageserver_restart", "main") pg = env.postgres.create_start('test_pageserver_restart') diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index fa6feaf412..61feb1a5bd 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -42,7 +42,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, data_secret = 'very secret secret' ##### First start, insert secret data and upload it to the remote storage - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() pg = env.postgres.create_start() tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index f7810be555..d4dd3fb9e2 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -13,7 +13,7 @@ def test_restart_compute(zenith_env_builder: ZenithEnvBuilder, with_wal_acceptor zenith_env_builder.pageserver_auth_enabled = True if with_wal_acceptors: zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_restart_compute", "main") diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 5c6d78e730..acff3ef62c 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -122,7 +122,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, zenith_env_builder.num_safekeepers = 1 zenith_env_builder.enable_local_fs_remote_storage() - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # create folder for remote storage mock remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage' diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py index 232c724870..b665ae9022 100644 --- a/test_runner/batch_others/test_tenants.py +++ b/test_runner/batch_others/test_tenants.py @@ -10,7 +10,7 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_wal_acce if with_wal_acceptors: zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() """Tests tenants with and without wal acceptors""" tenant_1 = env.create_tenant() tenant_2 = env.create_tenant() diff --git a/test_runner/batch_others/test_timeline_size.py b/test_runner/batch_others/test_timeline_size.py index b48f830528..2c31267922 100644 --- a/test_runner/batch_others/test_timeline_size.py +++ b/test_runner/batch_others/test_timeline_size.py @@ -67,7 +67,7 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60 def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_timeline_size_quota", "main") client = env.pageserver.http_client() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 4d9e18bb58..c375c9626a 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -22,7 +22,7 @@ from typing import List, Optional, Any # succeed and data is written def test_normal_work(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_acceptors_normal_work", "main") @@ -51,7 +51,7 @@ class BranchMetrics: # against different timelines. def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() n_timelines = 3 @@ -181,7 +181,7 @@ def test_restarts(zenith_env_builder: ZenithEnvBuilder): n_acceptors = 3 zenith_env_builder.num_safekeepers = n_acceptors - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_acceptors_restarts", "main") pg = env.postgres.create_start('test_wal_acceptors_restarts') @@ -218,7 +218,7 @@ def delayed_wal_acceptor_start(wa): # When majority of acceptors is offline, commits are expected to be frozen def test_unavailability(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 2 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_acceptors_unavailability", "main") pg = env.postgres.create_start('test_wal_acceptors_unavailability') @@ -289,7 +289,7 @@ def stop_value(): def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_acceptors_race_conditions", "main") pg = env.postgres.create_start('test_wal_acceptors_race_conditions') @@ -404,7 +404,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, # We don't really need the full environment for this test, just the # safekeepers would be enough. zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() timeline_id = uuid.uuid4() tenant_id = uuid.uuid4() @@ -454,7 +454,7 @@ def test_sync_safekeepers(zenith_env_builder: ZenithEnvBuilder, def test_timeline_status(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_timeline_status", "main") pg = env.postgres.create_start('test_timeline_status') @@ -521,12 +521,7 @@ class SafekeeperEnv: http=self.port_distributor.get_port(), ) - if self.num_safekeepers == 1: - name = "single" - else: - name = f"sk{i}" - - safekeeper_dir = os.path.join(self.repo_dir, name) + safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}") mkdir_if_needed(safekeeper_dir) args = [ @@ -537,6 +532,8 @@ class SafekeeperEnv: f"127.0.0.1:{port.http}", "-D", safekeeper_dir, + "--id", + str(i), "--daemonize" ] @@ -604,9 +601,8 @@ def test_safekeeper_without_pageserver(test_output_dir: str, def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): - def safekeepers_guc(env: ZenithEnv, sk_names: List[str]) -> str: - return ','.join( - [f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.name in sk_names]) + def safekeepers_guc(env: ZenithEnv, sk_names: List[int]) -> str: + return ','.join([f'localhost:{sk.port.pg}' for sk in env.safekeepers if sk.id in sk_names]) def execute_payload(pg: Postgres): with closing(pg.connect()) as conn: @@ -628,17 +624,17 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): http_cli = sk.http_client() try: status = http_cli.timeline_status(tenant_id, timeline_id) - log.info(f"Safekeeper {sk.name} status: {status}") + log.info(f"Safekeeper {sk.id} status: {status}") except Exception as e: - log.info(f"Safekeeper {sk.name} status error: {e}") + log.info(f"Safekeeper {sk.id} status error: {e}") zenith_env_builder.num_safekeepers = 4 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_replace_safekeeper", "main") log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - active_safekeepers = ['sk1', 'sk2', 'sk3'] + active_safekeepers = [1, 2, 3] pg = env.postgres.create('test_replace_safekeeper') pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.start() @@ -678,7 +674,7 @@ def test_replace_safekeeper(zenith_env_builder: ZenithEnvBuilder): log.info("Recreate postgres to replace failed sk1 with new sk4") pg.stop_and_destroy().create('test_replace_safekeeper') - active_safekeepers = ['sk2', 'sk3', 'sk4'] + active_safekeepers = [2, 3, 4] env.safekeepers[3].start() pg.adjust_for_wal_acceptors(safekeepers_guc(env, active_safekeepers)) pg.start() diff --git a/test_runner/batch_others/test_wal_acceptor_async.py b/test_runner/batch_others/test_wal_acceptor_async.py index 1d2a186eb7..4b6a27f73d 100644 --- a/test_runner/batch_others/test_wal_acceptor_async.py +++ b/test_runner/batch_others/test_wal_acceptor_async.py @@ -200,7 +200,7 @@ async def run_restarts_under_load(pg: Postgres, acceptors: List[Safekeeper], n_w # restart acceptors one by one, while executing and validating bank transactions def test_restarts_under_load(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() env.zenith_cli.create_branch("test_wal_acceptors_restarts_under_load", "main") pg = env.postgres.create_start('test_wal_acceptors_restarts_under_load') diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index ce051dfd6e..f1897e4b6f 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -97,7 +97,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): # Start with single sk zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # Connect to sk port on v4 loopback res = requests.get(f'http://127.0.0.1:{env.safekeepers[0].port.http}/v1/status') @@ -114,7 +114,7 @@ def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): def test_cli_start_stop(zenith_env_builder: ZenithEnvBuilder): # Start with single sk zenith_env_builder.num_safekeepers = 1 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() # Stop default ps/sk env.zenith_cli.pageserver_stop() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index b4b3de1db3..252ca9b3c1 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -27,7 +27,7 @@ from dataclasses import dataclass # Type-related stuff from psycopg2.extensions import connection as PgConnection -from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple +from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, TypeVar, cast, Union, Tuple from typing_extensions import Literal import pytest @@ -434,6 +434,14 @@ class ZenithEnvBuilder: self.env = ZenithEnv(self) return self.env + def start(self): + self.env.start() + + def init_start(self) -> ZenithEnv: + env = self.init() + self.start() + return env + """ Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. @@ -549,6 +557,7 @@ class ZenithEnv: toml += textwrap.dedent(f""" [pageserver] + id=1 listen_pg_addr = 'localhost:{pageserver_port.pg}' listen_http_addr = 'localhost:{pageserver_port.http}' auth_type = '{pageserver_auth_type}' @@ -566,25 +575,22 @@ class ZenithEnv: pg=self.port_distributor.get_port(), http=self.port_distributor.get_port(), ) - - if config.num_safekeepers == 1: - name = "single" - else: - name = f"sk{i}" - toml += f""" -[[safekeepers]] -name = '{name}' -pg_port = {port.pg} -http_port = {port.http} -sync = false # Disable fsyncs to make the tests go faster - """ - safekeeper = Safekeeper(env=self, name=name, port=port) + id = i # assign ids sequentially + toml += textwrap.dedent(f""" + [[safekeepers]] + id = {id} + pg_port = {port.pg} + http_port = {port.http} + sync = false # Disable fsyncs to make the tests go faster + """) + safekeeper = Safekeeper(env=self, id=id, port=port) self.safekeepers.append(safekeeper) log.info(f"Config: {toml}") self.zenith_cli.init(toml) + def start(self): # Start up the page server and all the safekeepers self.pageserver.start() @@ -625,7 +631,7 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]: with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: - env = builder.init() + env = builder.init_start() # For convenience in tests, create a branch from the freshly-initialized cluster. env.zenith_cli.create_branch("empty", "main") @@ -659,7 +665,7 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB To use, define 'zenith_env_builder' fixture in your test to get access to the builder object. Set properties on it to describe the environment. Finally, initialize and start up the environment by calling - zenith_env_builder.init(). + zenith_env_builder.init_start(). After the initialization, you can launch compute nodes by calling the functions in the 'env.postgres' factory object, stop/start the @@ -847,8 +853,8 @@ class ZenithCli: return self.raw_cli(cmd) - def pageserver_start(self) -> 'subprocess.CompletedProcess[str]': - start_args = ['pageserver', 'start'] + def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]': + start_args = ['pageserver', 'start', *overrides] append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage, self.env.pageserver.config_override) @@ -862,17 +868,17 @@ class ZenithCli: log.info(f"Stopping pageserver with {cmd}") return self.raw_cli(cmd) - def safekeeper_start(self, name: str) -> 'subprocess.CompletedProcess[str]': - return self.raw_cli(['safekeeper', 'start', name]) + def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]': + return self.raw_cli(['safekeeper', 'start', str(id)]) def safekeeper_stop(self, - name: Optional[str] = None, + id: Optional[int] = None, immediate=False) -> 'subprocess.CompletedProcess[str]': args = ['safekeeper', 'stop'] + if id is not None: + args.extend(str(id)) if immediate: args.extend(['-m', 'immediate']) - if name is not None: - args.append(name) return self.raw_cli(args) def pg_create( @@ -1005,14 +1011,15 @@ class ZenithPageserver(PgProtocol): self.remote_storage = remote_storage self.config_override = config_override - def start(self) -> 'ZenithPageserver': + def start(self, overrides=()) -> 'ZenithPageserver': """ Start the page server. + `overrides` allows to add some config to this pageserver start. Returns self. """ assert self.running == False - self.env.zenith_cli.pageserver_start() + self.env.zenith_cli.pageserver_start(overrides=overrides) self.running = True return self @@ -1466,12 +1473,14 @@ class Safekeeper: """ An object representing a running safekeeper daemon. """ env: ZenithEnv port: SafekeeperPort - name: str # identifier for logging + id: int auth_token: Optional[str] = None + running: bool = False def start(self) -> 'Safekeeper': - self.env.zenith_cli.safekeeper_start(self.name) - + assert self.running == False + self.env.zenith_cli.safekeeper_start(self.id) + self.running = True # wait for wal acceptor start by checking its status started_at = time.time() while True: @@ -1489,8 +1498,9 @@ class Safekeeper: return self def stop(self, immediate=False) -> 'Safekeeper': - log.info('Stopping safekeeper {}'.format(self.name)) - self.env.zenith_cli.safekeeper_stop(self.name, immediate) + log.info('Stopping safekeeper {}'.format(self.id)) + self.env.zenith_cli.safekeeper_stop(self.id, immediate) + self.running = False return self def append_logical_message(self, diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index 6fd77f3020..0247385211 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -23,7 +23,7 @@ def test_bulk_tenant_create( """Measure tenant creation time (with and without wal acceptors)""" if use_wal_acceptors == 'with_wa': zenith_env_builder.num_safekeepers = 3 - env = zenith_env_builder.init() + env = zenith_env_builder.init_start() time_slices = [] diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index ea5d0cba14..48de1481d4 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -1,17 +1,19 @@ // // Main entry point for the safekeeper executable // -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; use fs2::FileExt; -use std::fs::File; +use std::fs::{self, File}; +use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; use walkeeper::control_file::{self, CreateControlFile}; use zenith_utils::http::endpoint; +use zenith_utils::zid::ZNodeId; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; use tokio::sync::mpsc; @@ -25,6 +27,7 @@ use zenith_utils::shutdown::exit_now; use zenith_utils::signals; const LOCK_FILE_NAME: &str = "safekeeper.lock"; +const ID_FILE_NAME: &str = "safekeeper.id"; fn main() -> Result<()> { zenith_metrics::set_common_metrics_prefix("safekeeper"); @@ -38,6 +41,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Path to the safekeeper data directory"), ) + .arg( + Arg::new("init") + .long("init") + .takes_value(false) + .help("Initialize safekeeper with ID"), + ) .arg( Arg::new("listen-pg") .short('l') @@ -93,6 +102,9 @@ fn main() -> Result<()> { .takes_value(true) .help("Dump control file at path specifed by this argument and exit"), ) + .arg( + Arg::new("id").long("id").takes_value(true).help("safekeeper node id: integer") + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -136,10 +148,19 @@ fn main() -> Result<()> { conf.recall_period = humantime::parse_duration(recall)?; } - start_safekeeper(conf) + let mut given_id = None; + if let Some(given_id_str) = arg_matches.value_of("id") { + given_id = Some(ZNodeId( + given_id_str + .parse() + .context("failed to parse safekeeper id")?, + )); + } + + start_safekeeper(conf, given_id, arg_matches.is_present("init")) } -fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { let log_file = logging::init("safekeeper.log", conf.daemonize)?; info!("version: {}", GIT_VERSION); @@ -154,6 +175,12 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { ) })?; + // Set or read our ID. + set_id(&mut conf, given_id)?; + if init { + return Ok(()); + } + let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { error!("failed to bind to address {}: {}", conf.listen_http_addr, e); e @@ -260,3 +287,49 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { std::process::exit(111); }) } + +/// Determine safekeeper id and set it in config. +fn set_id(conf: &mut SafeKeeperConf, given_id: Option) -> Result<()> { + let id_file_path = conf.workdir.join(ID_FILE_NAME); + + let my_id: ZNodeId; + // If ID exists, read it in; otherwise set one passed + match fs::read(&id_file_path) { + Ok(id_serialized) => { + my_id = ZNodeId( + std::str::from_utf8(&id_serialized) + .context("failed to parse safekeeper id")? + .parse() + .context("failed to parse safekeeper id")?, + ); + if let Some(given_id) = given_id { + if given_id != my_id { + bail!( + "safekeeper already initialized with id {}, can't set {}", + my_id, + given_id + ); + } + } + info!("safekeeper ID {}", my_id); + } + Err(error) => match error.kind() { + ErrorKind::NotFound => { + my_id = if let Some(given_id) = given_id { + given_id + } else { + bail!("safekeeper id is not specified"); + }; + let mut f = File::create(&id_file_path)?; + f.write_all(my_id.to_string().as_bytes())?; + f.sync_all()?; + info!("initialized safekeeper ID {}", my_id); + } + _ => { + return Err(error.into()); + } + }, + } + conf.my_id = my_id; + Ok(()) +} diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index 11a29ac6d3..bc992c6a6f 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -5,6 +5,7 @@ use std::fmt::Display; use std::sync::Arc; use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZNodeId; use zenith_utils::zid::ZTenantTimelineId; use crate::control_file::CreateControlFile; @@ -18,9 +19,16 @@ use zenith_utils::http::json::json_response; use zenith_utils::http::request::parse_request_param; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +#[derive(Debug, Serialize)] +struct SafekeeperStatus { + id: ZNodeId, +} + /// Healthcheck handler. -async fn status_handler(_: Request) -> Result, ApiError> { - Ok(json_response(StatusCode::OK, "")?) +async fn status_handler(request: Request) -> Result, ApiError> { + let conf = get_conf(&request); + let status = SafekeeperStatus { id: conf.my_id }; + Ok(json_response(StatusCode::OK, status)?) } fn get_conf(request: &Request) -> &SafeKeeperConf { diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 6c3e0b264e..dfd71e4de2 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::time::Duration; -use zenith_utils::zid::ZTenantTimelineId; +use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; pub mod callmemaybe; pub mod control_file; @@ -46,6 +46,7 @@ pub struct SafeKeeperConf { pub listen_http_addr: String, pub ttl: Option, pub recall_period: Duration, + pub my_id: ZNodeId, } impl SafeKeeperConf { @@ -69,6 +70,7 @@ impl Default for SafeKeeperConf { listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), ttl: None, recall_period: defaults::DEFAULT_RECALL_PERIOD, + my_id: ZNodeId(0), } } } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index a2a762f5be..5500d924ea 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -18,32 +18,35 @@ use walkeeper::defaults::{ }; use zenith_utils::auth::{Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use zenith_utils::GIT_VERSION; use pageserver::branches::BranchInfo; -// Default name of a safekeeper node, if not specified on the command line. -const DEFAULT_SAFEKEEPER_NAME: &str = "single"; +// Default id of a safekeeper node, if not specified on the command line. +const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1); +const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1); fn default_conf() -> String { format!( r#" # Default built-in configuration, defined in main.rs [pageserver] +id = {pageserver_id} listen_pg_addr = '{pageserver_pg_addr}' listen_http_addr = '{pageserver_http_addr}' auth_type = '{pageserver_auth_type}' [[safekeepers]] -name = '{safekeeper_name}' +id = {safekeeper_id} pg_port = {safekeeper_pg_port} http_port = {safekeeper_http_port} "#, + pageserver_id = DEFAULT_PAGESERVER_ID, pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR, pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR, pageserver_auth_type = AuthType::Trust, - safekeeper_name = DEFAULT_SAFEKEEPER_NAME, + safekeeper_id = DEFAULT_SAFEKEEPER_ID, safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT, safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT, ) @@ -74,9 +77,9 @@ fn main() -> Result<()> { .required(true); #[rustfmt::skip] - let safekeeper_node_arg = Arg::new("node") + let safekeeper_id_arg = Arg::new("id") .index(1) - .help("Node name") + .help("safekeeper id") .required(false); let timeline_arg = Arg::new("timeline") @@ -154,16 +157,16 @@ fn main() -> Result<()> { .about("Manage safekeepers") .subcommand(App::new("start") .about("Start local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) ) .subcommand(App::new("stop") .about("Stop local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) .arg(stop_mode_arg.clone()) ) .subcommand(App::new("restart") .about("Restart local safekeeper") - .arg(safekeeper_node_arg.clone()) + .arg(safekeeper_id_arg.clone()) .arg(stop_mode_arg.clone()) ) ) @@ -628,11 +631,11 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn get_safekeeper(env: &local_env::LocalEnv, name: &str) -> Result { - if let Some(node) = env.safekeepers.iter().find(|node| node.name == name) { +fn get_safekeeper(env: &local_env::LocalEnv, id: ZNodeId) -> Result { + if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) { Ok(SafekeeperNode::from_env(env, node)) } else { - bail!("could not find safekeeper '{}'", name) + bail!("could not find safekeeper '{}'", id) } } @@ -643,8 +646,12 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul }; // All the commands take an optional safekeeper name argument - let node_name = sub_args.value_of("node").unwrap_or(DEFAULT_SAFEKEEPER_NAME); - let safekeeper = get_safekeeper(env, node_name)?; + let sk_id = if let Some(id_str) = sub_args.value_of("id") { + ZNodeId(id_str.parse().context("while parsing safekeeper id")?) + } else { + DEFAULT_SAFEKEEPER_ID + }; + let safekeeper = get_safekeeper(env, sk_id)?; match sub_name { "start" => { @@ -697,7 +704,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.start() { - eprintln!("safekeeper '{}' start failed: {}", safekeeper.name, e); + eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e); exit(1); } } @@ -724,7 +731,7 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result< for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.stop(immediate) { - eprintln!("safekeeper '{}' stop failed: {}", safekeeper.name, e); + eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e); } } Ok(()) diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index 2e93ab596c..7dfffd96d7 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -221,6 +221,18 @@ impl fmt::Display for ZTenantTimelineId { } } +// Unique ID of a storage node (safekeeper or pageserver). Supposed to be issued +// by the console. +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ZNodeId(pub u64); + +impl fmt::Display for ZNodeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + #[cfg(test)] mod tests { use std::fmt::Display;