diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 5aeff505b6..35167ebabf 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,6 +4,7 @@ //! script which will use local paths. use anyhow::{bail, ensure, Context}; +use reqwest::Url; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; @@ -59,9 +60,10 @@ pub struct LocalEnv { #[serde(default)] pub private_key_path: PathBuf, - // A comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'. + // Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'. #[serde(default)] - pub broker_endpoints: Option, + #[serde_as(as = "Vec")] + pub broker_endpoints: Vec, /// A prefix to all to any key when pushing/polling etcd from a node. #[serde(default)] @@ -184,12 +186,7 @@ impl LocalEnv { if old_timeline_id == &timeline_id { Ok(()) } else { - bail!( - "branch '{}' is already mapped to timeline {}, cannot map to another timeline {}", - branch_name, - old_timeline_id, - timeline_id - ); + bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"); } } else { existing_values.push((tenant_id, timeline_id)); @@ -225,7 +222,7 @@ impl LocalEnv { /// /// Unlike 'load_config', this function fills in any defaults that are missing /// from the config file. - pub fn create_config(toml: &str) -> anyhow::Result { + pub fn parse_config(toml: &str) -> anyhow::Result { let mut env: LocalEnv = toml::from_str(toml)?; // Find postgres binaries. @@ -238,25 +235,20 @@ impl LocalEnv { env.pg_distrib_dir = cwd.join("tmp_install") } } - if !env.pg_distrib_dir.join("bin/postgres").exists() { - bail!( - "Can't find postgres binary at {}", - env.pg_distrib_dir.display() - ); - } // Find zenith binaries. if env.zenith_distrib_dir == Path::new("") { - env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); - } - for binary in ["pageserver", "safekeeper"] { - if !env.zenith_distrib_dir.join(binary).exists() { - bail!( - "Can't find binary '{}' in zenith distrib dir '{}'", - binary, - env.zenith_distrib_dir.display() - ); - } + let current_exec_path = + env::current_exe().context("Failed to find current excecutable's path")?; + env.zenith_distrib_dir = current_exec_path + .parent() + .with_context(|| { + format!( + "Failed to find a parent directory for executable {}", + current_exec_path.display(), + ) + })? + .to_owned(); } // If no initial tenant ID was given, generate it. @@ -351,6 +343,20 @@ impl LocalEnv { "directory '{}' already exists. Perhaps already initialized?", base_path.display() ); + for binary in ["pageserver", "safekeeper"] { + if !self.zenith_distrib_dir.join(binary).exists() { + bail!( + "Can't find binary '{binary}' in zenith distrib dir '{}'", + self.zenith_distrib_dir.display() + ); + } + } + if !self.pg_distrib_dir.join("bin/postgres").exists() { + bail!( + "Can't find postgres binary at {}", + self.pg_distrib_dir.display() + ); + } fs::create_dir(&base_path)?; @@ -408,7 +414,49 @@ impl LocalEnv { fn base_path() -> PathBuf { match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => PathBuf::from(val.to_str().unwrap()), - None => ".zenith".into(), + Some(val) => PathBuf::from(val), + None => PathBuf::from(".zenith"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn simple_conf_parsing() { + let simple_conf_toml = include_str!("../simple.conf"); + let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml); + assert!( + simple_conf_parse_result.is_ok(), + "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}" + ); + + let regular_url_string = "broker_endpoints = ['localhost:1111']"; + let regular_url_toml = simple_conf_toml.replace( + "[pageserver]", + &format!("\n{regular_url_string}\n[pageserver]"), + ); + match LocalEnv::parse_config(®ular_url_toml) { + Ok(regular_url_parsed) => { + assert_eq!( + regular_url_parsed.broker_endpoints, + vec!["localhost:1111".parse().unwrap()], + "Unexpectedly parsed broker endpoint url" + ); + } + Err(e) => panic!("failed to parse simple config {regular_url_toml}, reason: {e}"), + } + + let spoiled_url_string = "broker_endpoints = ['!@$XOXO%^&']"; + let spoiled_url_toml = simple_conf_toml.replace( + "[pageserver]", + &format!("\n{spoiled_url_string}\n[pageserver]"), + ); + let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml); + assert!( + spoiled_url_parse_result.is_err(), + "expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}" + ); } } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 074ee72f69..aeeb4a50ec 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -12,7 +12,7 @@ use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; -use reqwest::{IntoUrl, Method}; +use reqwest::{IntoUrl, Method, Url}; use safekeeper::http::models::TimelineCreateRequest; use thiserror::Error; use utils::{ @@ -52,7 +52,7 @@ impl ResponseErrorMessageExt for Response { Err(SafekeeperHttpError::Response( match self.json::() { Ok(err_body) => format!("Error: {}", err_body.msg), - Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + Err(_) => format!("Http error ({}) at {url}.", status.as_u16()), }, )) } @@ -76,7 +76,7 @@ pub struct SafekeeperNode { pub pageserver: Arc, - broker_endpoints: Option, + broker_endpoints: Vec, broker_etcd_prefix: Option, } @@ -142,8 +142,21 @@ impl SafekeeperNode { if !self.conf.sync { cmd.arg("--no-sync"); } - if let Some(ref ep) = self.broker_endpoints { - cmd.args(&["--broker-endpoints", ep]); + + if !self.broker_endpoints.is_empty() { + cmd.args(&[ + "--broker-endpoints", + &self.broker_endpoints.iter().map(Url::as_str).fold( + String::new(), + |mut comma_separated_urls, url| { + if !comma_separated_urls.is_empty() { + comma_separated_urls.push(','); + } + comma_separated_urls.push_str(url); + comma_separated_urls + }, + ), + ]); } if let Some(prefix) = self.broker_etcd_prefix.as_deref() { cmd.args(&["--broker-etcd-prefix", prefix]); diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 9bbb855dd5..8092e4fc49 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -87,7 +87,8 @@ pub trait RemoteStorage: Send + Sync { async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>; } -/// TODO kb +/// Every storage, currently supported. +/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics. pub enum GenericRemoteStorage { Local(LocalFs), S3(S3Bucket), diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 6538cdefc4..e5ac46d3b1 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -275,7 +275,7 @@ fn main() -> Result<()> { "pageserver" => handle_pageserver(sub_args, &env), "pg" => handle_pg(sub_args, &env), "safekeeper" => handle_safekeeper(sub_args, &env), - _ => bail!("unexpected subcommand {}", sub_name), + _ => bail!("unexpected subcommand {sub_name}"), }; if original_env != env { @@ -289,7 +289,7 @@ fn main() -> Result<()> { Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?, Ok(None) => (), Err(e) => { - eprintln!("command failed: {:?}", e); + eprintln!("command failed: {e:?}"); exit(1); } } @@ -482,7 +482,7 @@ fn handle_init(init_match: &ArgMatches) -> Result { }; let mut env = - LocalEnv::create_config(&toml_file).context("Failed to create neon configuration")?; + LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?; env.init().context("Failed to initialize neon repository")?; // default_tenantid was generated by the `env.init()` call above diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 357db4c16d..50b7ef6dbb 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -558,7 +558,7 @@ class ZenithEnv: port=self.port_distributor.get_port(), peer_port=self.port_distributor.get_port()) toml += textwrap.dedent(f""" - broker_endpoints = 'http://127.0.0.1:{self.broker.port}' + broker_endpoints = ['http://127.0.0.1:{self.broker.port}'] """) # Create config for pageserver