Test simple.conf and handle broker_endpoints better

This commit is contained in:
Kirill Bulatov
2022-05-13 17:04:51 +03:00
committed by Kirill Bulatov
parent 51ea9c3053
commit 33cac863d7
5 changed files with 99 additions and 37 deletions

View File

@@ -4,6 +4,7 @@
//! script which will use local paths.
use anyhow::{bail, ensure, Context};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
@@ -59,9 +60,10 @@ pub struct LocalEnv {
#[serde(default)]
pub private_key_path: PathBuf,
// A comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
// Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
#[serde(default)]
pub broker_endpoints: Option<String>,
#[serde_as(as = "Vec<DisplayFromStr>")]
pub broker_endpoints: Vec<Url>,
/// A prefix to all to any key when pushing/polling etcd from a node.
#[serde(default)]
@@ -184,12 +186,7 @@ impl LocalEnv {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!(
"branch '{}' is already mapped to timeline {}, cannot map to another timeline {}",
branch_name,
old_timeline_id,
timeline_id
);
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));
@@ -225,7 +222,7 @@ impl LocalEnv {
///
/// Unlike 'load_config', this function fills in any defaults that are missing
/// from the config file.
pub fn create_config(toml: &str) -> anyhow::Result<Self> {
pub fn parse_config(toml: &str) -> anyhow::Result<Self> {
let mut env: LocalEnv = toml::from_str(toml)?;
// Find postgres binaries.
@@ -238,25 +235,20 @@ impl LocalEnv {
env.pg_distrib_dir = cwd.join("tmp_install")
}
}
if !env.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
env.pg_distrib_dir.display()
);
}
// Find zenith binaries.
if env.zenith_distrib_dir == Path::new("") {
env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
}
for binary in ["pageserver", "safekeeper"] {
if !env.zenith_distrib_dir.join(binary).exists() {
bail!(
"Can't find binary '{}' in zenith distrib dir '{}'",
binary,
env.zenith_distrib_dir.display()
);
}
let current_exec_path =
env::current_exe().context("Failed to find current excecutable's path")?;
env.zenith_distrib_dir = current_exec_path
.parent()
.with_context(|| {
format!(
"Failed to find a parent directory for executable {}",
current_exec_path.display(),
)
})?
.to_owned();
}
// If no initial tenant ID was given, generate it.
@@ -351,6 +343,20 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
for binary in ["pageserver", "safekeeper"] {
if !self.zenith_distrib_dir.join(binary).exists() {
bail!(
"Can't find binary '{binary}' in zenith distrib dir '{}'",
self.zenith_distrib_dir.display()
);
}
}
if !self.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_distrib_dir.display()
);
}
fs::create_dir(&base_path)?;
@@ -408,7 +414,49 @@ impl LocalEnv {
fn base_path() -> PathBuf {
match std::env::var_os("ZENITH_REPO_DIR") {
Some(val) => PathBuf::from(val.to_str().unwrap()),
None => ".zenith".into(),
Some(val) => PathBuf::from(val),
None => PathBuf::from(".zenith"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn simple_conf_parsing() {
let simple_conf_toml = include_str!("../simple.conf");
let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml);
assert!(
simple_conf_parse_result.is_ok(),
"failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
);
let regular_url_string = "broker_endpoints = ['localhost:1111']";
let regular_url_toml = simple_conf_toml.replace(
"[pageserver]",
&format!("\n{regular_url_string}\n[pageserver]"),
);
match LocalEnv::parse_config(&regular_url_toml) {
Ok(regular_url_parsed) => {
assert_eq!(
regular_url_parsed.broker_endpoints,
vec!["localhost:1111".parse().unwrap()],
"Unexpectedly parsed broker endpoint url"
);
}
Err(e) => panic!("failed to parse simple config {regular_url_toml}, reason: {e}"),
}
let spoiled_url_string = "broker_endpoints = ['!@$XOXO%^&']";
let spoiled_url_toml = simple_conf_toml.replace(
"[pageserver]",
&format!("\n{spoiled_url_string}\n[pageserver]"),
);
let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml);
assert!(
spoiled_url_parse_result.is_err(),
"expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}"
);
}
}

View File

@@ -12,7 +12,7 @@ use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use reqwest::{IntoUrl, Method, Url};
use safekeeper::http::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
@@ -52,7 +52,7 @@ impl ResponseErrorMessageExt for Response {
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>() {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
Err(_) => format!("Http error ({}) at {url}.", status.as_u16()),
},
))
}
@@ -76,7 +76,7 @@ pub struct SafekeeperNode {
pub pageserver: Arc<PageServerNode>,
broker_endpoints: Option<String>,
broker_endpoints: Vec<Url>,
broker_etcd_prefix: Option<String>,
}
@@ -142,8 +142,21 @@ impl SafekeeperNode {
if !self.conf.sync {
cmd.arg("--no-sync");
}
if let Some(ref ep) = self.broker_endpoints {
cmd.args(&["--broker-endpoints", ep]);
if !self.broker_endpoints.is_empty() {
cmd.args(&[
"--broker-endpoints",
&self.broker_endpoints.iter().map(Url::as_str).fold(
String::new(),
|mut comma_separated_urls, url| {
if !comma_separated_urls.is_empty() {
comma_separated_urls.push(',');
}
comma_separated_urls.push_str(url);
comma_separated_urls
},
),
]);
}
if let Some(prefix) = self.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]);

View File

@@ -87,7 +87,8 @@ pub trait RemoteStorage: Send + Sync {
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
}
/// TODO kb
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
pub enum GenericRemoteStorage {
Local(LocalFs),
S3(S3Bucket),

View File

@@ -275,7 +275,7 @@ fn main() -> Result<()> {
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
_ => bail!("unexpected subcommand {}", sub_name),
_ => bail!("unexpected subcommand {sub_name}"),
};
if original_env != env {
@@ -289,7 +289,7 @@ fn main() -> Result<()> {
Ok(Some(updated_env)) => updated_env.persist_config(&updated_env.base_data_dir)?,
Ok(None) => (),
Err(e) => {
eprintln!("command failed: {:?}", e);
eprintln!("command failed: {e:?}");
exit(1);
}
}
@@ -482,7 +482,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
};
let mut env =
LocalEnv::create_config(&toml_file).context("Failed to create neon configuration")?;
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init().context("Failed to initialize neon repository")?;
// default_tenantid was generated by the `env.init()` call above

View File

@@ -558,7 +558,7 @@ class ZenithEnv:
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
toml += textwrap.dedent(f"""
broker_endpoints = 'http://127.0.0.1:{self.broker.port}'
broker_endpoints = ['http://127.0.0.1:{self.broker.port}']
""")
# Create config for pageserver