diff --git a/Cargo.lock b/Cargo.lock index e1e1a0f067..a3974f6776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1772,6 +1772,7 @@ dependencies = [ "crc32c", "crossbeam-utils", "daemonize", + "etcd_broker", "fail", "futures", "git-version", diff --git a/control_plane/simple.conf b/control_plane/simple.conf index 2243a0a5f8..925e2f14ee 100644 --- a/control_plane/simple.conf +++ b/control_plane/simple.conf @@ -9,3 +9,6 @@ auth_type = 'Trust' id = 1 pg_port = 5454 http_port = 7676 + +[etcd_broker] +broker_endpoints = ['http://127.0.0.1:2379'] diff --git a/control_plane/src/etcd.rs b/control_plane/src/etcd.rs new file mode 100644 index 0000000000..df657dd1be --- /dev/null +++ b/control_plane/src/etcd.rs @@ -0,0 +1,93 @@ +use std::{ + fs, + path::PathBuf, + process::{Command, Stdio}, +}; + +use anyhow::Context; +use nix::{ + sys::signal::{kill, Signal}, + unistd::Pid, +}; + +use crate::{local_env, read_pidfile}; + +pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { + let etcd_broker = &env.etcd_broker; + println!( + "Starting etcd broker using {}", + etcd_broker.etcd_binary_path.display() + ); + + let etcd_data_dir = env.base_data_dir.join("etcd"); + fs::create_dir_all(&etcd_data_dir).with_context(|| { + format!( + "Failed to create etcd data dir: {}", + etcd_data_dir.display() + ) + })?; + + let etcd_stdout_file = + fs::File::create(etcd_data_dir.join("etcd.stdout.log")).with_context(|| { + format!( + "Failed to create ectd stout file in directory {}", + etcd_data_dir.display() + ) + })?; + let etcd_stderr_file = + fs::File::create(etcd_data_dir.join("etcd.stderr.log")).with_context(|| { + format!( + "Failed to create ectd stderr file in directory {}", + etcd_data_dir.display() + ) + })?; + let client_urls = etcd_broker.comma_separated_endpoints(); + + let etcd_process = Command::new(&etcd_broker.etcd_binary_path) + .args(&[ + format!("--data-dir={}", etcd_data_dir.display()), + format!("--listen-client-urls={client_urls}"), + format!("--advertise-client-urls={client_urls}"), + ]) + .stdout(Stdio::from(etcd_stdout_file)) + .stderr(Stdio::from(etcd_stderr_file)) + .spawn() + .context("Failed to spawn etcd subprocess")?; + let pid = etcd_process.id(); + + let etcd_pid_file_path = etcd_pid_file_path(env); + fs::write(&etcd_pid_file_path, pid.to_string()).with_context(|| { + format!( + "Failed to create etcd pid file at {}", + etcd_pid_file_path.display() + ) + })?; + + Ok(()) +} + +pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { + let etcd_path = &env.etcd_broker.etcd_binary_path; + println!("Stopping etcd broker at {}", etcd_path.display()); + + let etcd_pid_file_path = etcd_pid_file_path(env); + let pid = Pid::from_raw(read_pidfile(&etcd_pid_file_path).with_context(|| { + format!( + "Failed to read etcd pid filea at {}", + etcd_pid_file_path.display() + ) + })?); + + kill(pid, Signal::SIGTERM).with_context(|| { + format!( + "Failed to stop etcd with pid {pid} at {}", + etcd_pid_file_path.display() + ) + })?; + + Ok(()) +} + +fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf { + env.base_data_dir.join("etcd.pid") +} diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index a2ecdd3d64..c3469c3350 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -12,6 +12,7 @@ use std::path::Path; use std::process::Command; pub mod compute; +pub mod etcd; pub mod local_env; pub mod postgresql_conf; pub mod safekeeper; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index a8636f9073..c73af7d338 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -60,14 +60,7 @@ pub struct LocalEnv { #[serde(default)] pub private_key_path: PathBuf, - // Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'. - #[serde(default)] - #[serde_as(as = "Vec")] - pub broker_endpoints: Vec, - - /// A prefix to all to any key when pushing/polling etcd from a node. - #[serde(default)] - pub broker_etcd_prefix: Option, + pub etcd_broker: EtcdBroker, pub pageserver: PageServerConf, @@ -83,6 +76,62 @@ pub struct LocalEnv { branch_name_mappings: HashMap>, } +/// Etcd broker config for cluster internal communication. +#[serde_as] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct EtcdBroker { + /// A prefix to all to any key when pushing/polling etcd from a node. + #[serde(default)] + pub broker_etcd_prefix: Option, + + /// Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'. + #[serde(default)] + #[serde_as(as = "Vec")] + pub broker_endpoints: Vec, + + /// Etcd binary path to use. + #[serde(default)] + pub etcd_binary_path: PathBuf, +} + +impl EtcdBroker { + pub fn locate_etcd() -> anyhow::Result { + let which_output = Command::new("which") + .arg("etcd") + .output() + .context("Failed to run 'which etcd' command")?; + let stdout = String::from_utf8_lossy(&which_output.stdout); + ensure!( + which_output.status.success(), + "'which etcd' invocation failed. Status: {}, stdout: {stdout}, stderr: {}", + which_output.status, + String::from_utf8_lossy(&which_output.stderr) + ); + + let etcd_path = PathBuf::from(stdout.trim()); + ensure!( + etcd_path.is_file(), + "'which etcd' invocation was successful, but the path it returned is not a file or does not exist: {}", + etcd_path.display() + ); + + Ok(etcd_path) + } + + pub fn comma_separated_endpoints(&self) -> String { + self.broker_endpoints.iter().map(Url::as_str).fold( + String::new(), + |mut comma_separated_urls, url| { + if !comma_separated_urls.is_empty() { + comma_separated_urls.push(','); + } + comma_separated_urls.push_str(url); + comma_separated_urls + }, + ) + } +} + #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] pub struct PageServerConf { @@ -97,7 +146,6 @@ pub struct PageServerConf { // jwt auth token used for communication with pageserver pub auth_token: String, - pub broker_endpoints: Vec, } impl Default for PageServerConf { @@ -108,7 +156,6 @@ impl Default for PageServerConf { listen_http_addr: String::new(), auth_type: AuthType::Trust, auth_token: String::new(), - broker_endpoints: Vec::new(), } } } @@ -240,17 +287,7 @@ impl LocalEnv { // Find zenith binaries. if env.zenith_distrib_dir == Path::new("") { - let current_exec_path = - env::current_exe().context("Failed to find current excecutable's path")?; - env.zenith_distrib_dir = current_exec_path - .parent() - .with_context(|| { - format!( - "Failed to find a parent directory for executable {}", - current_exec_path.display(), - ) - })? - .to_owned(); + env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); } // If no initial tenant ID was given, generate it. @@ -345,6 +382,22 @@ impl LocalEnv { "directory '{}' already exists. Perhaps already initialized?", base_path.display() ); + if !self.pg_distrib_dir.join("bin/postgres").exists() { + bail!( + "Can't find postgres binary at {}", + self.pg_distrib_dir.display() + ); + } + for binary in ["pageserver", "safekeeper"] { + if !self.zenith_distrib_dir.join(binary).exists() { + bail!( + "Can't find binary '{}' in zenith distrib dir '{}'", + binary, + self.zenith_distrib_dir.display() + ); + } + } + for binary in ["pageserver", "safekeeper"] { if !self.zenith_distrib_dir.join(binary).exists() { bail!( @@ -403,7 +456,6 @@ impl LocalEnv { self.pageserver.auth_token = self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?; - self.pageserver.broker_endpoints = self.broker_endpoints.clone(); fs::create_dir_all(self.pg_data_dirs_path())?; @@ -435,26 +487,12 @@ mod tests { "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}" ); - let regular_url_string = "broker_endpoints = ['localhost:1111']"; - let regular_url_toml = simple_conf_toml.replace( - "[pageserver]", - &format!("\n{regular_url_string}\n[pageserver]"), - ); - match LocalEnv::parse_config(®ular_url_toml) { - Ok(regular_url_parsed) => { - assert_eq!( - regular_url_parsed.broker_endpoints, - vec!["localhost:1111".parse().unwrap()], - "Unexpectedly parsed broker endpoint url" - ); - } - Err(e) => panic!("failed to parse simple config {regular_url_toml}, reason: {e}"), - } - - let spoiled_url_string = "broker_endpoints = ['!@$XOXO%^&']"; - let spoiled_url_toml = simple_conf_toml.replace( - "[pageserver]", - &format!("\n{spoiled_url_string}\n[pageserver]"), + let string_to_replace = "broker_endpoints = ['http://127.0.0.1:2379']"; + let spoiled_url_str = "broker_endpoints = ['!@$XOXO%^&']"; + let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str); + assert!( + spoiled_url_toml.contains(spoiled_url_str), + "Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}" ); let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml); assert!( diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index c5b7f830bf..407cd05c73 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -12,7 +12,7 @@ use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; -use reqwest::{IntoUrl, Method, Url}; +use reqwest::{IntoUrl, Method}; use safekeeper::http::models::TimelineCreateRequest; use thiserror::Error; use utils::{ @@ -75,9 +75,6 @@ pub struct SafekeeperNode { pub http_base_url: String, pub pageserver: Arc, - - broker_endpoints: Vec, - broker_etcd_prefix: Option, } impl SafekeeperNode { @@ -94,8 +91,6 @@ impl SafekeeperNode { http_client: Client::new(), http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port), pageserver, - broker_endpoints: env.broker_endpoints.clone(), - broker_etcd_prefix: env.broker_etcd_prefix.clone(), } } @@ -137,29 +132,21 @@ impl SafekeeperNode { .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--recall", "1 second"]) - .args(&["--broker-endpoints", &self.broker_endpoints.join(",")]) + .args(&[ + "--broker-endpoints", + &self.env.etcd_broker.comma_separated_endpoints(), + ]) .arg("--daemonize"), ); if !self.conf.sync { cmd.arg("--no-sync"); } - if !self.broker_endpoints.is_empty() { - cmd.args(&[ - "--broker-endpoints", - &self.broker_endpoints.iter().map(Url::as_str).fold( - String::new(), - |mut comma_separated_urls, url| { - if !comma_separated_urls.is_empty() { - comma_separated_urls.push(','); - } - comma_separated_urls.push_str(url); - comma_separated_urls - }, - ), - ]); + let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints(); + if !comma_separated_endpoints.is_empty() { + cmd.args(&["--broker-endpoints", &comma_separated_endpoints]); } - if let Some(prefix) = self.broker_etcd_prefix.as_deref() { + if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() { cmd.args(&["--broker-etcd-prefix", prefix]); } diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 0b9fddd64a..7dbc19e145 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -124,7 +124,7 @@ impl PageServerNode { let broker_endpoints_param = format!( "broker_endpoints=[{}]", self.env - .pageserver + .etcd_broker .broker_endpoints .iter() .map(|url| format!("'{url}'")) @@ -142,6 +142,16 @@ impl PageServerNode { args.extend(["-c", &broker_endpoints_param]); args.extend(["-c", &id]); + let broker_etcd_prefix_param = self + .env + .etcd_broker + .broker_etcd_prefix + .as_ref() + .map(|prefix| format!("broker_etcd_prefix='{prefix}'")); + if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() { + args.extend(["-c", broker_etcd_prefix_param]); + } + for config_override in config_overrides { args.extend(["-c", config_override]); } diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh index 0e4cf45f29..6bcbc76551 100755 --- a/docker-entrypoint.sh +++ b/docker-entrypoint.sh @@ -1,17 +1,20 @@ #!/bin/sh set -eux +broker_endpoints_param="${BROKER_ENDPOINT:-absent}" +if [ "$broker_endpoints_param" != "absent" ]; then + broker_endpoints_param="-c broker_endpoints=['$broker_endpoints_param']" +else + broker_endpoints_param='' +fi + if [ "$1" = 'pageserver' ]; then if [ ! -d "/data/tenants" ]; then echo "Initializing pageserver data directory" - pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" + pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" $broker_endpoints_param fi echo "Staring pageserver at 0.0.0.0:6400" - if [ -z '${BROKER_ENDPOINTS}' ]; then - pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data - else - pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['${BROKER_ENDPOINTS}']" -D /data - fi + pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" $broker_endpoints_param -D /data else "$@" fi diff --git a/docs/settings.md b/docs/settings.md index 017d349bb6..9564ef626f 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -25,10 +25,14 @@ max_file_descriptors = '100' # initial superuser role name to use when creating a new tenant initial_superuser_name = 'zenith_admin' +broker_etcd_prefix = 'neon' +broker_endpoints = ['some://etcd'] + # [remote_storage] ``` -The config above shows default values for all basic pageserver settings. +The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user, +see the corresponding section below. Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank. Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start. @@ -46,6 +50,17 @@ Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage= Note that TOML distinguishes between strings and integers, the former require single or double quotes around them. +#### broker_endpoints + +A list of endpoints (etcd currently) to connect and pull the information from. +Mandatory, does not have a default, since requires etcd to be started as a separate process, +and its connection url should be specified separately. + +#### broker_etcd_prefix + +A prefix to add for every etcd key used, to separate one group of related instances from another, in the same cluster. +Default is `neon`. + #### checkpoint_distance `checkpoint_distance` is the amount of incoming WAL that is held in diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 1b27f99ccf..76181f9ba1 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -19,6 +19,10 @@ use utils::{ zid::{ZNodeId, ZTenantId, ZTenantTimelineId}, }; +/// Default value to use for prefixing to all etcd keys with. +/// This way allows isolating safekeeper/pageserver groups in the same etcd cluster. +pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon"; + #[derive(Debug, Deserialize, Serialize)] struct SafekeeperTimeline { safekeeper_id: ZNodeId, @@ -104,28 +108,28 @@ impl SkTimelineSubscription { /// The subscription kind to the timeline updates from safekeeper. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SkTimelineSubscriptionKind { - broker_prefix: String, + broker_etcd_prefix: String, kind: SubscriptionKind, } impl SkTimelineSubscriptionKind { - pub fn all(broker_prefix: String) -> Self { + pub fn all(broker_etcd_prefix: String) -> Self { Self { - broker_prefix, + broker_etcd_prefix, kind: SubscriptionKind::All, } } - pub fn tenant(broker_prefix: String, tenant: ZTenantId) -> Self { + pub fn tenant(broker_etcd_prefix: String, tenant: ZTenantId) -> Self { Self { - broker_prefix, + broker_etcd_prefix, kind: SubscriptionKind::Tenant(tenant), } } - pub fn timeline(broker_prefix: String, timeline: ZTenantTimelineId) -> Self { + pub fn timeline(broker_etcd_prefix: String, timeline: ZTenantTimelineId) -> Self { Self { - broker_prefix, + broker_etcd_prefix, kind: SubscriptionKind::Timeline(timeline), } } @@ -134,12 +138,12 @@ impl SkTimelineSubscriptionKind { match self.kind { SubscriptionKind::All => Regex::new(&format!( r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", - self.broker_prefix + self.broker_etcd_prefix )) .expect("wrong regex for 'everything' subscription"), SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!( r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$", - self.broker_prefix + self.broker_etcd_prefix )) .expect("wrong regex for 'tenant' subscription"), SubscriptionKind::Timeline(ZTenantTimelineId { @@ -147,7 +151,7 @@ impl SkTimelineSubscriptionKind { timeline_id, }) => Regex::new(&format!( r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$", - self.broker_prefix + self.broker_etcd_prefix )) .expect("wrong regex for 'timeline' subscription"), } @@ -156,16 +160,16 @@ impl SkTimelineSubscriptionKind { /// Etcd key to use for watching a certain timeline updates from safekeepers. pub fn watch_key(&self) -> String { match self.kind { - SubscriptionKind::All => self.broker_prefix.to_string(), + SubscriptionKind::All => self.broker_etcd_prefix.to_string(), SubscriptionKind::Tenant(tenant_id) => { - format!("{}/{tenant_id}/safekeeper", self.broker_prefix) + format!("{}/{tenant_id}/safekeeper", self.broker_etcd_prefix) } SubscriptionKind::Timeline(ZTenantTimelineId { tenant_id, timeline_id, }) => format!( "{}/{tenant_id}/{timeline_id}/safekeeper", - self.broker_prefix + self.broker_etcd_prefix ), } } diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index e5ac46d3b1..f04af9cfdd 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -1,10 +1,10 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{App, AppSettings, Arg, ArgMatches}; use control_plane::compute::ComputeControlPlane; -use control_plane::local_env; -use control_plane::local_env::LocalEnv; +use control_plane::local_env::{EtcdBroker, LocalEnv}; use control_plane::safekeeper::SafekeeperNode; use control_plane::storage::PageServerNode; +use control_plane::{etcd, local_env}; use pageserver::config::defaults::{ DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR, DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR, @@ -14,6 +14,7 @@ use safekeeper::defaults::{ DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, }; use std::collections::{BTreeSet, HashMap}; +use std::path::Path; use std::process::exit; use std::str::FromStr; use utils::{ @@ -32,28 +33,27 @@ const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1); const DEFAULT_BRANCH_NAME: &str = "main"; project_git_version!(GIT_VERSION); -fn default_conf() -> String { +fn default_conf(etcd_binary_path: &Path) -> String { format!( r#" # Default built-in configuration, defined in main.rs +[etcd_broker] +broker_endpoints = ['http://localhost:2379'] +etcd_binary_path = '{etcd_binary_path}' + [pageserver] -id = {pageserver_id} -listen_pg_addr = '{pageserver_pg_addr}' -listen_http_addr = '{pageserver_http_addr}' +id = {DEFAULT_PAGESERVER_ID} +listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}' +listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}' auth_type = '{pageserver_auth_type}' [[safekeepers]] -id = {safekeeper_id} -pg_port = {safekeeper_pg_port} -http_port = {safekeeper_http_port} +id = {DEFAULT_SAFEKEEPER_ID} +pg_port = {DEFAULT_SAFEKEEPER_PG_PORT} +http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT} "#, - pageserver_id = DEFAULT_PAGESERVER_ID, - pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR, - pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR, + etcd_binary_path = etcd_binary_path.display(), pageserver_auth_type = AuthType::Trust, - safekeeper_id = DEFAULT_SAFEKEEPER_ID, - safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT, - safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT, ) } @@ -167,12 +167,12 @@ fn main() -> Result<()> { .subcommand(App::new("create") .arg(tenant_id_arg.clone()) .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) - .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) - ) + .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) + ) .subcommand(App::new("config") .arg(tenant_id_arg.clone()) - .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) - ) + .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) + ) ) .subcommand( App::new("pageserver") @@ -468,17 +468,17 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result Result { +fn handle_init(init_match: &ArgMatches) -> anyhow::Result { let initial_timeline_id_arg = parse_timeline_id(init_match)?; // Create config file let toml_file: String = if let Some(config_path) = init_match.value_of("config") { // load and parse the file std::fs::read_to_string(std::path::Path::new(config_path)) - .with_context(|| format!("Could not read configuration file \"{}\"", config_path))? + .with_context(|| format!("Could not read configuration file '{config_path}'"))? } else { // Built-in default config - default_conf() + default_conf(&EtcdBroker::locate_etcd()?) }; let mut env = @@ -497,7 +497,7 @@ fn handle_init(init_match: &ArgMatches) -> Result { &pageserver_config_overrides(init_match), ) .unwrap_or_else(|e| { - eprintln!("pageserver init failed: {}", e); + eprintln!("pageserver init failed: {e}"); exit(1); }); @@ -920,20 +920,23 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } -fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { +fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> { + etcd::start_etcd_process(env)?; let pageserver = PageServerNode::from_env(env); // Postgres nodes are not started automatically if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { - eprintln!("pageserver start failed: {}", e); + eprintln!("pageserver start failed: {e}"); + try_stop_etcd_process(env); exit(1); } for node in env.safekeepers.iter() { let safekeeper = SafekeeperNode::from_env(env, node); if let Err(e) = safekeeper.start() { - eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e); + eprintln!("safekeeper '{}' start failed: {e}", safekeeper.id); + try_stop_etcd_process(env); exit(1); } } @@ -963,5 +966,14 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result< eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e); } } + + try_stop_etcd_process(env); + Ok(()) } + +fn try_stop_etcd_process(env: &local_env::LocalEnv) { + if let Err(e) = etcd::stop_etcd_process(env) { + eprintln!("etcd stop failed: {e}"); + } +} diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 9cc8444531..290f52e0b2 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -55,6 +55,7 @@ fail = "0.5.0" git-version = "0.3.5" postgres_ffi = { path = "../libs/postgres_ffi" } +etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } remote_storage = { path = "../libs/remote_storage" } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 8748683f32..a9215c0701 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -113,6 +113,10 @@ pub struct PageServerConf { pub profiling: ProfilingConfig, pub default_tenant_conf: TenantConf, + /// A prefix to add in etcd brokers before every key. + /// Can be used for isolating different pageserver groups withing the same etcd cluster. + pub broker_etcd_prefix: String, + /// Etcd broker endpoints to connect to. pub broker_endpoints: Vec, } @@ -179,6 +183,7 @@ struct PageServerConfigBuilder { id: BuilderValue, profiling: BuilderValue, + broker_etcd_prefix: BuilderValue, broker_endpoints: BuilderValue>, } @@ -205,7 +210,8 @@ impl Default for PageServerConfigBuilder { remote_storage_config: Set(None), id: NotSet, profiling: Set(ProfilingConfig::Disabled), - broker_endpoints: NotSet, + broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()), + broker_endpoints: Set(Vec::new()), } } } @@ -266,6 +272,10 @@ impl PageServerConfigBuilder { self.broker_endpoints = BuilderValue::Set(broker_endpoints) } + pub fn broker_etcd_prefix(&mut self, broker_etcd_prefix: String) { + self.broker_etcd_prefix = BuilderValue::Set(broker_etcd_prefix) + } + pub fn id(&mut self, node_id: ZNodeId) { self.id = BuilderValue::Set(node_id) } @@ -278,10 +288,6 @@ impl PageServerConfigBuilder { let broker_endpoints = self .broker_endpoints .ok_or(anyhow!("No broker endpoints provided"))?; - ensure!( - !broker_endpoints.is_empty(), - "Empty broker endpoints collection provided" - ); Ok(PageServerConf { listen_pg_addr: self @@ -319,6 +325,9 @@ impl PageServerConfigBuilder { // TenantConf is handled separately default_tenant_conf: TenantConf::default(), broker_endpoints, + broker_etcd_prefix: self + .broker_etcd_prefix + .ok_or(anyhow!("missing broker_etcd_prefix"))?, }) } } @@ -392,6 +401,7 @@ impl PageServerConf { } "id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)), "profiling" => builder.profiling(parse_toml_from_str(key, item)?), + "broker_etcd_prefix" => builder.broker_etcd_prefix(parse_toml_string(key, item)?), "broker_endpoints" => builder.broker_endpoints( parse_toml_array(key, item)? .into_iter() @@ -556,6 +566,7 @@ impl PageServerConf { profiling: ProfilingConfig::Disabled, default_tenant_conf: TenantConf::dummy_conf(), broker_endpoints: Vec::new(), + broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), } } } @@ -700,6 +711,7 @@ id = 10 broker_endpoints: vec![broker_endpoint .parse() .expect("Failed to parse a valid broker endpoint URL")], + broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), }, "Correct defaults should be used when no config values are provided" ); @@ -743,6 +755,7 @@ id = 10 broker_endpoints: vec![broker_endpoint .parse() .expect("Failed to parse a valid broker endpoint URL")], + broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), }, "Should be able to parse all basic config values correctly" ); @@ -795,7 +808,7 @@ broker_endpoints = ['{broker_endpoint}'] max_concurrent_syncs: NonZeroUsize::new( remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS ) - .unwrap(), + .unwrap(), max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS) .unwrap(), storage: RemoteStorageKind::LocalFs(local_storage_path.clone()), diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index d7875a9069..2d47710a88 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -1,7 +1,7 @@ // // Main entry point for the safekeeper executable // -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{bail, Context, Result}; use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; @@ -179,10 +179,6 @@ fn main() -> anyhow::Result<()> { let collected_ep: Result, ParseError> = addr.split(',').map(Url::parse).collect(); conf.broker_endpoints = collected_ep.context("Failed to parse broker endpoint urls")?; } - ensure!( - !conf.broker_endpoints.is_empty(), - "No broker endpoints provided" - ); if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") { conf.broker_etcd_prefix = prefix.to_string(); } @@ -313,14 +309,18 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b .unwrap(); threads.push(callmemaybe_thread); - let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("broker thread".into()) - .spawn(|| { - broker::thread_main(conf_); - })?, - ); + if !conf.broker_endpoints.is_empty() { + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("broker thread".into()) + .spawn(|| { + broker::thread_main(conf_); + })?, + ); + } else { + warn!("No broker endpoints providing, starting without node sync") + } let conf_ = conf.clone(); threads.push( diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index c906bc1e74..d7217be20a 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -34,13 +34,13 @@ pub fn thread_main(conf: SafeKeeperConf) { /// Key to per timeline per safekeeper data. fn timeline_safekeeper_path( - broker_prefix: String, + broker_etcd_prefix: String, zttid: ZTenantTimelineId, sk_id: ZNodeId, ) -> String { format!( "{}/{sk_id}", - SkTimelineSubscriptionKind::timeline(broker_prefix, zttid).watch_key() + SkTimelineSubscriptionKind::timeline(broker_etcd_prefix, zttid).watch_key() ) } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 131076fab6..a87e5da686 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -27,7 +27,6 @@ pub mod defaults { pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); - pub const DEFAULT_NEON_BROKER_PREFIX: &str = "neon"; pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676; pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); @@ -82,7 +81,7 @@ impl Default for SafeKeeperConf { recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: ZNodeId(0), broker_endpoints: Vec::new(), - broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(), + broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), s3_offload_enabled: true, } } diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 85798156a7..e1b7bd91ee 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol -from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex +from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -327,7 +327,6 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value): # Test that safekeepers push their info to the broker and learn peer status from it -@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") def test_broker(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 3 zenith_env_builder.enable_local_fs_remote_storage() @@ -369,7 +368,6 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder): # Test that old WAL consumed by peers and pageserver is removed from safekeepers. -@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 2 # to advance remote_consistent_llsn diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 7b95e729d9..ba9bc6e113 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -1,8 +1,9 @@ import os import shutil import subprocess +from pathlib import Path -from typing import Any, List +from typing import Any, List, Optional from fixtures.log_helper import log @@ -80,9 +81,12 @@ def print_gc_result(row): .format_map(row)) -# path to etcd binary or None if not present. -def etcd_path(): - return shutil.which("etcd") +def etcd_path() -> Path: + path_output = shutil.which("etcd") + if path_output is None: + raise RuntimeError('etcd not found in PATH') + else: + return Path(path_output) # Traverse directory to get total size. diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 09f7f26588..78de78144c 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -555,7 +555,9 @@ class ZenithEnv: self.broker = config.broker toml += textwrap.dedent(f""" + [etcd_broker] broker_endpoints = ['{self.broker.client_url()}'] + etcd_binary_path = '{self.broker.binary_path}' """) # Create config for pageserver @@ -1846,6 +1848,7 @@ class Etcd: datadir: str port: int peer_port: int + binary_path: Path = etcd_path() handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon def client_url(self): @@ -1858,15 +1861,15 @@ class Etcd: def start(self): pathlib.Path(self.datadir).mkdir(exist_ok=True) - etcd_full_path = etcd_path() - if etcd_full_path is None: - raise Exception('etcd binary not found locally') + + if not self.binary_path.is_file(): + raise RuntimeError(f"etcd broker binary '{self.binary_path}' is not a file") client_url = self.client_url() log.info(f'Starting etcd to listen incoming connections at "{client_url}"') with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file: args = [ - etcd_full_path, + self.binary_path, f"--data-dir={self.datadir}", f"--listen-client-urls={client_url}", f"--advertise-client-urls={client_url}", @@ -1927,8 +1930,7 @@ SKIP_DIRS = frozenset(('pg_wal', 'pg_stat_tmp', 'pg_subtrans', 'pg_logical', - 'pg_replslot/wal_proposer_slot', - 'pg_xact')) + 'pg_replslot/wal_proposer_slot')) SKIP_FILES = frozenset(('pg_internal.init', 'pg.log',