Add etcd to neon_local

This commit is contained in:
Kirill Bulatov
2022-05-08 00:32:57 +03:00
committed by Kirill Bulatov
parent 9a0fed0880
commit a884f4cf6b
19 changed files with 331 additions and 147 deletions

1
Cargo.lock generated
View File

@@ -1772,6 +1772,7 @@ dependencies = [
"crc32c",
"crossbeam-utils",
"daemonize",
"etcd_broker",
"fail",
"futures",
"git-version",

View File

@@ -9,3 +9,6 @@ auth_type = 'Trust'
id = 1
pg_port = 5454
http_port = 7676
[etcd_broker]
broker_endpoints = ['http://127.0.0.1:2379']

93
control_plane/src/etcd.rs Normal file
View File

@@ -0,0 +1,93 @@
use std::{
fs,
path::PathBuf,
process::{Command, Stdio},
};
use anyhow::Context;
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use crate::{local_env, read_pidfile};
pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_broker = &env.etcd_broker;
println!(
"Starting etcd broker using {}",
etcd_broker.etcd_binary_path.display()
);
let etcd_data_dir = env.base_data_dir.join("etcd");
fs::create_dir_all(&etcd_data_dir).with_context(|| {
format!(
"Failed to create etcd data dir: {}",
etcd_data_dir.display()
)
})?;
let etcd_stdout_file =
fs::File::create(etcd_data_dir.join("etcd.stdout.log")).with_context(|| {
format!(
"Failed to create ectd stout file in directory {}",
etcd_data_dir.display()
)
})?;
let etcd_stderr_file =
fs::File::create(etcd_data_dir.join("etcd.stderr.log")).with_context(|| {
format!(
"Failed to create ectd stderr file in directory {}",
etcd_data_dir.display()
)
})?;
let client_urls = etcd_broker.comma_separated_endpoints();
let etcd_process = Command::new(&etcd_broker.etcd_binary_path)
.args(&[
format!("--data-dir={}", etcd_data_dir.display()),
format!("--listen-client-urls={client_urls}"),
format!("--advertise-client-urls={client_urls}"),
])
.stdout(Stdio::from(etcd_stdout_file))
.stderr(Stdio::from(etcd_stderr_file))
.spawn()
.context("Failed to spawn etcd subprocess")?;
let pid = etcd_process.id();
let etcd_pid_file_path = etcd_pid_file_path(env);
fs::write(&etcd_pid_file_path, pid.to_string()).with_context(|| {
format!(
"Failed to create etcd pid file at {}",
etcd_pid_file_path.display()
)
})?;
Ok(())
}
pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_path = &env.etcd_broker.etcd_binary_path;
println!("Stopping etcd broker at {}", etcd_path.display());
let etcd_pid_file_path = etcd_pid_file_path(env);
let pid = Pid::from_raw(read_pidfile(&etcd_pid_file_path).with_context(|| {
format!(
"Failed to read etcd pid filea at {}",
etcd_pid_file_path.display()
)
})?);
kill(pid, Signal::SIGTERM).with_context(|| {
format!(
"Failed to stop etcd with pid {pid} at {}",
etcd_pid_file_path.display()
)
})?;
Ok(())
}
fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {
env.base_data_dir.join("etcd.pid")
}

View File

@@ -12,6 +12,7 @@ use std::path::Path;
use std::process::Command;
pub mod compute;
pub mod etcd;
pub mod local_env;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -60,14 +60,7 @@ pub struct LocalEnv {
#[serde(default)]
pub private_key_path: PathBuf,
// Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
#[serde(default)]
#[serde_as(as = "Vec<DisplayFromStr>")]
pub broker_endpoints: Vec<Url>,
/// A prefix to all to any key when pushing/polling etcd from a node.
#[serde(default)]
pub broker_etcd_prefix: Option<String>,
pub etcd_broker: EtcdBroker,
pub pageserver: PageServerConf,
@@ -83,6 +76,62 @@ pub struct LocalEnv {
branch_name_mappings: HashMap<String, Vec<(ZTenantId, ZTimelineId)>>,
}
/// Etcd broker config for cluster internal communication.
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EtcdBroker {
/// A prefix to all to any key when pushing/polling etcd from a node.
#[serde(default)]
pub broker_etcd_prefix: Option<String>,
/// Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
#[serde(default)]
#[serde_as(as = "Vec<DisplayFromStr>")]
pub broker_endpoints: Vec<Url>,
/// Etcd binary path to use.
#[serde(default)]
pub etcd_binary_path: PathBuf,
}
impl EtcdBroker {
pub fn locate_etcd() -> anyhow::Result<PathBuf> {
let which_output = Command::new("which")
.arg("etcd")
.output()
.context("Failed to run 'which etcd' command")?;
let stdout = String::from_utf8_lossy(&which_output.stdout);
ensure!(
which_output.status.success(),
"'which etcd' invocation failed. Status: {}, stdout: {stdout}, stderr: {}",
which_output.status,
String::from_utf8_lossy(&which_output.stderr)
);
let etcd_path = PathBuf::from(stdout.trim());
ensure!(
etcd_path.is_file(),
"'which etcd' invocation was successful, but the path it returned is not a file or does not exist: {}",
etcd_path.display()
);
Ok(etcd_path)
}
pub fn comma_separated_endpoints(&self) -> String {
self.broker_endpoints.iter().map(Url::as_str).fold(
String::new(),
|mut comma_separated_urls, url| {
if !comma_separated_urls.is_empty() {
comma_separated_urls.push(',');
}
comma_separated_urls.push_str(url);
comma_separated_urls
},
)
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct PageServerConf {
@@ -97,7 +146,6 @@ pub struct PageServerConf {
// jwt auth token used for communication with pageserver
pub auth_token: String,
pub broker_endpoints: Vec<String>,
}
impl Default for PageServerConf {
@@ -108,7 +156,6 @@ impl Default for PageServerConf {
listen_http_addr: String::new(),
auth_type: AuthType::Trust,
auth_token: String::new(),
broker_endpoints: Vec::new(),
}
}
}
@@ -240,17 +287,7 @@ impl LocalEnv {
// Find zenith binaries.
if env.zenith_distrib_dir == Path::new("") {
let current_exec_path =
env::current_exe().context("Failed to find current excecutable's path")?;
env.zenith_distrib_dir = current_exec_path
.parent()
.with_context(|| {
format!(
"Failed to find a parent directory for executable {}",
current_exec_path.display(),
)
})?
.to_owned();
env.zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned();
}
// If no initial tenant ID was given, generate it.
@@ -345,6 +382,22 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_distrib_dir.display()
);
}
for binary in ["pageserver", "safekeeper"] {
if !self.zenith_distrib_dir.join(binary).exists() {
bail!(
"Can't find binary '{}' in zenith distrib dir '{}'",
binary,
self.zenith_distrib_dir.display()
);
}
}
for binary in ["pageserver", "safekeeper"] {
if !self.zenith_distrib_dir.join(binary).exists() {
bail!(
@@ -403,7 +456,6 @@ impl LocalEnv {
self.pageserver.auth_token =
self.generate_auth_token(&Claims::new(None, Scope::PageServerApi))?;
self.pageserver.broker_endpoints = self.broker_endpoints.clone();
fs::create_dir_all(self.pg_data_dirs_path())?;
@@ -435,26 +487,12 @@ mod tests {
"failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
);
let regular_url_string = "broker_endpoints = ['localhost:1111']";
let regular_url_toml = simple_conf_toml.replace(
"[pageserver]",
&format!("\n{regular_url_string}\n[pageserver]"),
);
match LocalEnv::parse_config(&regular_url_toml) {
Ok(regular_url_parsed) => {
assert_eq!(
regular_url_parsed.broker_endpoints,
vec!["localhost:1111".parse().unwrap()],
"Unexpectedly parsed broker endpoint url"
);
}
Err(e) => panic!("failed to parse simple config {regular_url_toml}, reason: {e}"),
}
let spoiled_url_string = "broker_endpoints = ['!@$XOXO%^&']";
let spoiled_url_toml = simple_conf_toml.replace(
"[pageserver]",
&format!("\n{spoiled_url_string}\n[pageserver]"),
let string_to_replace = "broker_endpoints = ['http://127.0.0.1:2379']";
let spoiled_url_str = "broker_endpoints = ['!@$XOXO%^&']";
let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str);
assert!(
spoiled_url_toml.contains(spoiled_url_str),
"Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}"
);
let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml);
assert!(

View File

@@ -12,7 +12,7 @@ use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use postgres::Config;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method, Url};
use reqwest::{IntoUrl, Method};
use safekeeper::http::models::TimelineCreateRequest;
use thiserror::Error;
use utils::{
@@ -75,9 +75,6 @@ pub struct SafekeeperNode {
pub http_base_url: String,
pub pageserver: Arc<PageServerNode>,
broker_endpoints: Vec<Url>,
broker_etcd_prefix: Option<String>,
}
impl SafekeeperNode {
@@ -94,8 +91,6 @@ impl SafekeeperNode {
http_client: Client::new(),
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
pageserver,
broker_endpoints: env.broker_endpoints.clone(),
broker_etcd_prefix: env.broker_etcd_prefix.clone(),
}
}
@@ -137,29 +132,21 @@ impl SafekeeperNode {
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.args(&["--broker-endpoints", &self.broker_endpoints.join(",")])
.args(&[
"--broker-endpoints",
&self.env.etcd_broker.comma_separated_endpoints(),
])
.arg("--daemonize"),
);
if !self.conf.sync {
cmd.arg("--no-sync");
}
if !self.broker_endpoints.is_empty() {
cmd.args(&[
"--broker-endpoints",
&self.broker_endpoints.iter().map(Url::as_str).fold(
String::new(),
|mut comma_separated_urls, url| {
if !comma_separated_urls.is_empty() {
comma_separated_urls.push(',');
}
comma_separated_urls.push_str(url);
comma_separated_urls
},
),
]);
let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints();
if !comma_separated_endpoints.is_empty() {
cmd.args(&["--broker-endpoints", &comma_separated_endpoints]);
}
if let Some(prefix) = self.broker_etcd_prefix.as_deref() {
if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]);
}

View File

@@ -124,7 +124,7 @@ impl PageServerNode {
let broker_endpoints_param = format!(
"broker_endpoints=[{}]",
self.env
.pageserver
.etcd_broker
.broker_endpoints
.iter()
.map(|url| format!("'{url}'"))
@@ -142,6 +142,16 @@ impl PageServerNode {
args.extend(["-c", &broker_endpoints_param]);
args.extend(["-c", &id]);
let broker_etcd_prefix_param = self
.env
.etcd_broker
.broker_etcd_prefix
.as_ref()
.map(|prefix| format!("broker_etcd_prefix='{prefix}'"));
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
args.extend(["-c", broker_etcd_prefix_param]);
}
for config_override in config_overrides {
args.extend(["-c", config_override]);
}

View File

@@ -1,17 +1,20 @@
#!/bin/sh
set -eux
broker_endpoints_param="${BROKER_ENDPOINT:-absent}"
if [ "$broker_endpoints_param" != "absent" ]; then
broker_endpoints_param="-c broker_endpoints=['$broker_endpoints_param']"
else
broker_endpoints_param=''
fi
if [ "$1" = 'pageserver' ]; then
if [ ! -d "/data/tenants" ]; then
echo "Initializing pageserver data directory"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10"
pageserver --init -D /data -c "pg_distrib_dir='/usr/local'" -c "id=10" $broker_endpoints_param
fi
echo "Staring pageserver at 0.0.0.0:6400"
if [ -z '${BROKER_ENDPOINTS}' ]; then
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /data
else
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['${BROKER_ENDPOINTS}']" -D /data
fi
pageserver -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" $broker_endpoints_param -D /data
else
"$@"
fi

View File

@@ -25,10 +25,14 @@ max_file_descriptors = '100'
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zenith_admin'
broker_etcd_prefix = 'neon'
broker_endpoints = ['some://etcd']
# [remote_storage]
```
The config above shows default values for all basic pageserver settings.
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
see the corresponding section below.
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
@@ -46,6 +50,17 @@ Example: `${PAGESERVER_BIN} -c "checkpoint_period = '100 s'" -c "remote_storage=
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
#### broker_endpoints
A list of endpoints (etcd currently) to connect and pull the information from.
Mandatory, does not have a default, since requires etcd to be started as a separate process,
and its connection url should be specified separately.
#### broker_etcd_prefix
A prefix to add for every etcd key used, to separate one group of related instances from another, in the same cluster.
Default is `neon`.
#### checkpoint_distance
`checkpoint_distance` is the amount of incoming WAL that is held in

View File

@@ -19,6 +19,10 @@ use utils::{
zid::{ZNodeId, ZTenantId, ZTenantTimelineId},
};
/// Default value to use for prefixing to all etcd keys with.
/// This way allows isolating safekeeper/pageserver groups in the same etcd cluster.
pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
#[derive(Debug, Deserialize, Serialize)]
struct SafekeeperTimeline {
safekeeper_id: ZNodeId,
@@ -104,28 +108,28 @@ impl SkTimelineSubscription {
/// The subscription kind to the timeline updates from safekeeper.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SkTimelineSubscriptionKind {
broker_prefix: String,
broker_etcd_prefix: String,
kind: SubscriptionKind,
}
impl SkTimelineSubscriptionKind {
pub fn all(broker_prefix: String) -> Self {
pub fn all(broker_etcd_prefix: String) -> Self {
Self {
broker_prefix,
broker_etcd_prefix,
kind: SubscriptionKind::All,
}
}
pub fn tenant(broker_prefix: String, tenant: ZTenantId) -> Self {
pub fn tenant(broker_etcd_prefix: String, tenant: ZTenantId) -> Self {
Self {
broker_prefix,
broker_etcd_prefix,
kind: SubscriptionKind::Tenant(tenant),
}
}
pub fn timeline(broker_prefix: String, timeline: ZTenantTimelineId) -> Self {
pub fn timeline(broker_etcd_prefix: String, timeline: ZTenantTimelineId) -> Self {
Self {
broker_prefix,
broker_etcd_prefix,
kind: SubscriptionKind::Timeline(timeline),
}
}
@@ -134,12 +138,12 @@ impl SkTimelineSubscriptionKind {
match self.kind {
SubscriptionKind::All => Regex::new(&format!(
r"^{}/([[:xdigit:]]+)/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
self.broker_prefix
self.broker_etcd_prefix
))
.expect("wrong regex for 'everything' subscription"),
SubscriptionKind::Tenant(tenant_id) => Regex::new(&format!(
r"^{}/{tenant_id}/([[:xdigit:]]+)/safekeeper/([[:digit:]])$",
self.broker_prefix
self.broker_etcd_prefix
))
.expect("wrong regex for 'tenant' subscription"),
SubscriptionKind::Timeline(ZTenantTimelineId {
@@ -147,7 +151,7 @@ impl SkTimelineSubscriptionKind {
timeline_id,
}) => Regex::new(&format!(
r"^{}/{tenant_id}/{timeline_id}/safekeeper/([[:digit:]])$",
self.broker_prefix
self.broker_etcd_prefix
))
.expect("wrong regex for 'timeline' subscription"),
}
@@ -156,16 +160,16 @@ impl SkTimelineSubscriptionKind {
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
match self.kind {
SubscriptionKind::All => self.broker_prefix.to_string(),
SubscriptionKind::All => self.broker_etcd_prefix.to_string(),
SubscriptionKind::Tenant(tenant_id) => {
format!("{}/{tenant_id}/safekeeper", self.broker_prefix)
format!("{}/{tenant_id}/safekeeper", self.broker_etcd_prefix)
}
SubscriptionKind::Timeline(ZTenantTimelineId {
tenant_id,
timeline_id,
}) => format!(
"{}/{tenant_id}/{timeline_id}/safekeeper",
self.broker_prefix
self.broker_etcd_prefix
),
}
}

View File

@@ -1,10 +1,10 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{App, AppSettings, Arg, ArgMatches};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env;
use control_plane::local_env::LocalEnv;
use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage::PageServerNode;
use control_plane::{etcd, local_env};
use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
@@ -14,6 +14,7 @@ use safekeeper::defaults::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use utils::{
@@ -32,28 +33,27 @@ const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
fn default_conf() -> String {
fn default_conf(etcd_binary_path: &Path) -> String {
format!(
r#"
# Default built-in configuration, defined in main.rs
[etcd_broker]
broker_endpoints = ['http://localhost:2379']
etcd_binary_path = '{etcd_binary_path}'
[pageserver]
id = {pageserver_id}
listen_pg_addr = '{pageserver_pg_addr}'
listen_http_addr = '{pageserver_http_addr}'
id = {DEFAULT_PAGESERVER_ID}
listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}'
listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}'
auth_type = '{pageserver_auth_type}'
[[safekeepers]]
id = {safekeeper_id}
pg_port = {safekeeper_pg_port}
http_port = {safekeeper_http_port}
id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
"#,
pageserver_id = DEFAULT_PAGESERVER_ID,
pageserver_pg_addr = DEFAULT_PAGESERVER_PG_ADDR,
pageserver_http_addr = DEFAULT_PAGESERVER_HTTP_ADDR,
etcd_binary_path = etcd_binary_path.display(),
pageserver_auth_type = AuthType::Trust,
safekeeper_id = DEFAULT_SAFEKEEPER_ID,
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
)
}
@@ -167,12 +167,12 @@ fn main() -> Result<()> {
.subcommand(App::new("create")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
.subcommand(App::new("config")
.arg(tenant_id_arg.clone())
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
)
)
.subcommand(
App::new("pageserver")
@@ -468,17 +468,17 @@ fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result<Option<ZTimelineI
.context("Failed to parse timeline id from the argument string")
}
fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let initial_timeline_id_arg = parse_timeline_id(init_match)?;
// Create config file
let toml_file: String = if let Some(config_path) = init_match.value_of("config") {
// load and parse the file
std::fs::read_to_string(std::path::Path::new(config_path))
.with_context(|| format!("Could not read configuration file \"{}\"", config_path))?
.with_context(|| format!("Could not read configuration file '{config_path}'"))?
} else {
// Built-in default config
default_conf()
default_conf(&EtcdBroker::locate_etcd()?)
};
let mut env =
@@ -497,7 +497,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
&pageserver_config_overrides(init_match),
)
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {}", e);
eprintln!("pageserver init failed: {e}");
exit(1);
});
@@ -920,20 +920,23 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
Ok(())
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
etcd::start_etcd_process(env)?;
let pageserver = PageServerNode::from_env(env);
// Postgres nodes are not started automatically
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
try_stop_etcd_process(env);
exit(1);
}
for node in env.safekeepers.iter() {
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start() {
eprintln!("safekeeper '{}' start failed: {}", safekeeper.id, e);
eprintln!("safekeeper '{}' start failed: {e}", safekeeper.id);
try_stop_etcd_process(env);
exit(1);
}
}
@@ -963,5 +966,14 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
eprintln!("safekeeper '{}' stop failed: {}", safekeeper.id, e);
}
}
try_stop_etcd_process(env);
Ok(())
}
fn try_stop_etcd_process(env: &local_env::LocalEnv) {
if let Err(e) = etcd::stop_etcd_process(env) {
eprintln!("etcd stop failed: {e}");
}
}

View File

@@ -55,6 +55,7 @@ fail = "0.5.0"
git-version = "0.3.5"
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
remote_storage = { path = "../libs/remote_storage" }

View File

@@ -113,6 +113,10 @@ pub struct PageServerConf {
pub profiling: ProfilingConfig,
pub default_tenant_conf: TenantConf,
/// A prefix to add in etcd brokers before every key.
/// Can be used for isolating different pageserver groups withing the same etcd cluster.
pub broker_etcd_prefix: String,
/// Etcd broker endpoints to connect to.
pub broker_endpoints: Vec<Url>,
}
@@ -179,6 +183,7 @@ struct PageServerConfigBuilder {
id: BuilderValue<ZNodeId>,
profiling: BuilderValue<ProfilingConfig>,
broker_etcd_prefix: BuilderValue<String>,
broker_endpoints: BuilderValue<Vec<Url>>,
}
@@ -205,7 +210,8 @@ impl Default for PageServerConfigBuilder {
remote_storage_config: Set(None),
id: NotSet,
profiling: Set(ProfilingConfig::Disabled),
broker_endpoints: NotSet,
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
broker_endpoints: Set(Vec::new()),
}
}
}
@@ -266,6 +272,10 @@ impl PageServerConfigBuilder {
self.broker_endpoints = BuilderValue::Set(broker_endpoints)
}
pub fn broker_etcd_prefix(&mut self, broker_etcd_prefix: String) {
self.broker_etcd_prefix = BuilderValue::Set(broker_etcd_prefix)
}
pub fn id(&mut self, node_id: ZNodeId) {
self.id = BuilderValue::Set(node_id)
}
@@ -278,10 +288,6 @@ impl PageServerConfigBuilder {
let broker_endpoints = self
.broker_endpoints
.ok_or(anyhow!("No broker endpoints provided"))?;
ensure!(
!broker_endpoints.is_empty(),
"Empty broker endpoints collection provided"
);
Ok(PageServerConf {
listen_pg_addr: self
@@ -319,6 +325,9 @@ impl PageServerConfigBuilder {
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
broker_endpoints,
broker_etcd_prefix: self
.broker_etcd_prefix
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
})
}
}
@@ -392,6 +401,7 @@ impl PageServerConf {
}
"id" => builder.id(ZNodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
"broker_etcd_prefix" => builder.broker_etcd_prefix(parse_toml_string(key, item)?),
"broker_endpoints" => builder.broker_endpoints(
parse_toml_array(key, item)?
.into_iter()
@@ -556,6 +566,7 @@ impl PageServerConf {
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
}
}
}
@@ -700,6 +711,7 @@ id = 10
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -743,6 +755,7 @@ id = 10
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
},
"Should be able to parse all basic config values correctly"
);
@@ -795,7 +808,7 @@ broker_endpoints = ['{broker_endpoint}']
max_concurrent_syncs: NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS
)
.unwrap(),
.unwrap(),
max_sync_errors: NonZeroU32::new(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS)
.unwrap(),
storage: RemoteStorageKind::LocalFs(local_storage_path.clone()),

View File

@@ -1,7 +1,7 @@
//
// Main entry point for the safekeeper executable
//
use anyhow::{bail, ensure, Context, Result};
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
@@ -179,10 +179,6 @@ fn main() -> anyhow::Result<()> {
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
conf.broker_endpoints = collected_ep.context("Failed to parse broker endpoint urls")?;
}
ensure!(
!conf.broker_endpoints.is_empty(),
"No broker endpoints provided"
);
if let Some(prefix) = arg_matches.value_of("broker-etcd-prefix") {
conf.broker_etcd_prefix = prefix.to_string();
}
@@ -313,14 +309,18 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: b
.unwrap();
threads.push(callmemaybe_thread);
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
if !conf.broker_endpoints.is_empty() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
broker::thread_main(conf_);
})?,
);
} else {
warn!("No broker endpoints providing, starting without node sync")
}
let conf_ = conf.clone();
threads.push(

View File

@@ -34,13 +34,13 @@ pub fn thread_main(conf: SafeKeeperConf) {
/// Key to per timeline per safekeeper data.
fn timeline_safekeeper_path(
broker_prefix: String,
broker_etcd_prefix: String,
zttid: ZTenantTimelineId,
sk_id: ZNodeId,
) -> String {
format!(
"{}/{sk_id}",
SkTimelineSubscriptionKind::timeline(broker_prefix, zttid).watch_key()
SkTimelineSubscriptionKind::timeline(broker_etcd_prefix, zttid).watch_key()
)
}

View File

@@ -27,7 +27,6 @@ pub mod defaults {
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_NEON_BROKER_PREFIX: &str = "neon";
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
@@ -82,7 +81,7 @@ impl Default for SafeKeeperConf {
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: ZNodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
s3_offload_enabled: true,
}
}

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import etcd_path, get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -327,7 +327,6 @@ def test_race_conditions(zenith_env_builder: ZenithEnvBuilder, stop_value):
# Test that safekeepers push their info to the broker and learn peer status from it
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_broker(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 3
zenith_env_builder.enable_local_fs_remote_storage()
@@ -369,7 +368,6 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder):
# Test that old WAL consumed by peers and pageserver is removed from safekeepers.
@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH")
def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 2
# to advance remote_consistent_llsn

View File

@@ -1,8 +1,9 @@
import os
import shutil
import subprocess
from pathlib import Path
from typing import Any, List
from typing import Any, List, Optional
from fixtures.log_helper import log
@@ -80,9 +81,12 @@ def print_gc_result(row):
.format_map(row))
# path to etcd binary or None if not present.
def etcd_path():
return shutil.which("etcd")
def etcd_path() -> Path:
path_output = shutil.which("etcd")
if path_output is None:
raise RuntimeError('etcd not found in PATH')
else:
return Path(path_output)
# Traverse directory to get total size.

View File

@@ -555,7 +555,9 @@ class ZenithEnv:
self.broker = config.broker
toml += textwrap.dedent(f"""
[etcd_broker]
broker_endpoints = ['{self.broker.client_url()}']
etcd_binary_path = '{self.broker.binary_path}'
""")
# Create config for pageserver
@@ -1846,6 +1848,7 @@ class Etcd:
datadir: str
port: int
peer_port: int
binary_path: Path = etcd_path()
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def client_url(self):
@@ -1858,15 +1861,15 @@ class Etcd:
def start(self):
pathlib.Path(self.datadir).mkdir(exist_ok=True)
etcd_full_path = etcd_path()
if etcd_full_path is None:
raise Exception('etcd binary not found locally')
if not self.binary_path.is_file():
raise RuntimeError(f"etcd broker binary '{self.binary_path}' is not a file")
client_url = self.client_url()
log.info(f'Starting etcd to listen incoming connections at "{client_url}"')
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
args = [
etcd_full_path,
self.binary_path,
f"--data-dir={self.datadir}",
f"--listen-client-urls={client_url}",
f"--advertise-client-urls={client_url}",
@@ -1927,8 +1930,7 @@ SKIP_DIRS = frozenset(('pg_wal',
'pg_stat_tmp',
'pg_subtrans',
'pg_logical',
'pg_replslot/wal_proposer_slot',
'pg_xact'))
'pg_replslot/wal_proposer_slot'))
SKIP_FILES = frozenset(('pg_internal.init',
'pg.log',