//! This module is responsible for locating and loading paths in a local setup. //! //! Now it also provides init method which acts like a stub for proper installation //! script which will use local paths. use anyhow::{bail, ensure, Context}; use clap::ValueEnum; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; use std::fs; use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::Duration; use utils::{ auth::{encode_from_key_file, Claims}, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, }; use crate::safekeeper::SafekeeperNode; pub const DEFAULT_PG_VERSION: u32 = 15; // // This data structures represents neon_local CLI config // // It is deserialized from the .neon/config file, or the config file passed // to 'neon_local init --config=' option. See control_plane/simple.conf for // an example. // #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] pub struct LocalEnv { // Base directory for all the nodes (the pageserver, safekeepers and // compute endpoints). // // This is not stored in the config file. Rather, this is the path where the // config file itself is. It is read from the NEON_REPO_DIR env variable or // '.neon' if not given. #[serde(skip)] pub base_data_dir: PathBuf, // Path to postgres distribution. It's expected that "bin", "include", // "lib", "share" from postgres distribution are there. If at some point // in time we will be able to run against vanilla postgres we may split that // to four separate paths and match OS-specific installation layout. #[serde(default)] pub pg_distrib_dir: PathBuf, // Path to pageserver binary. #[serde(default)] pub neon_distrib_dir: PathBuf, // Default tenant ID to use with the 'neon_local' command line utility, when // --tenant_id is not explicitly specified. #[serde(default)] pub default_tenant_id: Option, // used to issue tokens during e.g pg start #[serde(default)] pub private_key_path: PathBuf, pub broker: NeonBroker, // Configuration for the storage controller (1 per neon_local environment) #[serde(default)] pub storage_controller: NeonStorageControllerConf, /// This Vec must always contain at least one pageserver pub pageservers: Vec, #[serde(default)] pub safekeepers: Vec, // Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will // be propagated into each pageserver's configuration. #[serde(default)] pub control_plane_api: Option, // Control plane upcall API for storage controller. If set, this will be propagated into the // storage controller's configuration. #[serde(default)] pub control_plane_compute_hook_api: Option, /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. #[serde(default)] // A `HashMap>` would be more appropriate here, // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". branch_name_mappings: HashMap>, } /// Broker config for cluster internal communication. #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] pub struct NeonBroker { /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'. pub listen_addr: SocketAddr, } /// Broker config for cluster internal communication. #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] pub struct NeonStorageControllerConf { /// Heartbeat timeout before marking a node offline #[serde(with = "humantime_serde")] pub max_unavailable: Duration, } impl NeonStorageControllerConf { // Use a shorter pageserver unavailability interval than the default to speed up tests. const DEFAULT_MAX_UNAVAILABLE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); } impl Default for NeonStorageControllerConf { fn default() -> Self { Self { max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL, } } } // Dummy Default impl to satisfy Deserialize derive. impl Default for NeonBroker { fn default() -> Self { NeonBroker { listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), } } } impl NeonBroker { pub fn client_url(&self) -> Url { Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url") } } #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default, deny_unknown_fields)] pub struct PageServerConf { // node id pub id: NodeId, // Pageserver connection settings pub listen_pg_addr: String, pub listen_http_addr: String, // auth type used for the PG and HTTP ports pub pg_auth_type: AuthType, pub http_auth_type: AuthType, pub(crate) virtual_file_io_engine: Option, pub(crate) get_vectored_impl: Option, pub(crate) get_impl: Option, pub(crate) validate_vectored_get: Option, } impl Default for PageServerConf { fn default() -> Self { Self { id: NodeId(0), listen_pg_addr: String::new(), listen_http_addr: String::new(), pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, virtual_file_io_engine: None, get_vectored_impl: None, get_impl: None, validate_vectored_get: None, } } } #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] #[serde(default)] pub struct SafekeeperConf { pub id: NodeId, pub pg_port: u16, pub pg_tenant_only_port: Option, pub http_port: u16, pub sync: bool, pub remote_storage: Option, pub backup_threads: Option, pub auth_enabled: bool, pub listen_addr: Option, } impl Default for SafekeeperConf { fn default() -> Self { Self { id: NodeId(0), pg_port: 0, pg_tenant_only_port: None, http_port: 0, sync: true, remote_storage: None, backup_threads: None, auth_enabled: false, listen_addr: None, } } } #[derive(Clone, Copy)] pub enum InitForceMode { MustNotExist, EmptyDirOk, RemoveAllContents, } impl ValueEnum for InitForceMode { fn value_variants<'a>() -> &'a [Self] { &[ Self::MustNotExist, Self::EmptyDirOk, Self::RemoveAllContents, ] } fn to_possible_value(&self) -> Option { Some(clap::builder::PossibleValue::new(match self { InitForceMode::MustNotExist => "must-not-exist", InitForceMode::EmptyDirOk => "empty-dir-ok", InitForceMode::RemoveAllContents => "remove-all-contents", })) } } impl SafekeeperConf { /// Compute is served by port on which only tenant scoped tokens allowed, if /// it is configured. pub fn get_compute_port(&self) -> u16 { self.pg_tenant_only_port.unwrap_or(self.pg_port) } } impl LocalEnv { pub fn pg_distrib_dir_raw(&self) -> PathBuf { self.pg_distrib_dir.clone() } pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result { let path = self.pg_distrib_dir.clone(); #[allow(clippy::manual_range_patterns)] match pg_version { 14 | 15 | 16 => Ok(path.join(format!("v{pg_version}"))), _ => bail!("Unsupported postgres version: {}", pg_version), } } pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result { Ok(self.pg_distrib_dir(pg_version)?.join("bin")) } pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result { Ok(self.pg_distrib_dir(pg_version)?.join("lib")) } pub fn pageserver_bin(&self) -> PathBuf { self.neon_distrib_dir.join("pageserver") } pub fn storage_controller_bin(&self) -> PathBuf { // Irrespective of configuration, storage controller binary is always // run from the same location as neon_local. This means that for compatibility // tests that run old pageserver/safekeeper, they still run latest storage controller. let neon_local_bin_dir = env::current_exe().unwrap().parent().unwrap().to_owned(); neon_local_bin_dir.join("storage_controller") } pub fn safekeeper_bin(&self) -> PathBuf { self.neon_distrib_dir.join("safekeeper") } pub fn storage_broker_bin(&self) -> PathBuf { self.neon_distrib_dir.join("storage_broker") } pub fn endpoints_path(&self) -> PathBuf { self.base_data_dir.join("endpoints") } pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf { self.base_data_dir .join(format!("pageserver_{pageserver_id}")) } pub fn safekeeper_data_dir(&self, data_dir_name: &str) -> PathBuf { self.base_data_dir.join("safekeepers").join(data_dir_name) } pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> { if let Some(conf) = self.pageservers.iter().find(|node| node.id == id) { Ok(conf) } else { let have_ids = self .pageservers .iter() .map(|node| format!("{}:{}", node.id, node.listen_http_addr)) .collect::>(); let joined = have_ids.join(","); bail!("could not find pageserver {id}, have ids {joined}") } } pub fn register_branch_mapping( &mut self, branch_name: String, tenant_id: TenantId, timeline_id: TimelineId, ) -> anyhow::Result<()> { let existing_values = self .branch_name_mappings .entry(branch_name.clone()) .or_default(); let existing_ids = existing_values .iter() .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id); if let Some((_, old_timeline_id)) = existing_ids { if old_timeline_id == &timeline_id { Ok(()) } else { bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"); } } else { existing_values.push((tenant_id, timeline_id)); Ok(()) } } pub fn get_branch_timeline_id( &self, branch_name: &str, tenant_id: TenantId, ) -> Option { self.branch_name_mappings .get(branch_name)? .iter() .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id) .map(|&(_, timeline_id)| timeline_id) .map(TimelineId::from) } pub fn timeline_name_mappings(&self) -> HashMap { self.branch_name_mappings .iter() .flat_map(|(name, tenant_timelines)| { tenant_timelines.iter().map(|&(tenant_id, timeline_id)| { (TenantTimelineId::new(tenant_id, timeline_id), name.clone()) }) }) .collect() } /// Create a LocalEnv from a config file. /// /// Unlike 'load_config', this function fills in any defaults that are missing /// from the config file. pub fn parse_config(toml: &str) -> anyhow::Result { let mut env: LocalEnv = toml::from_str(toml)?; // Find postgres binaries. // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install". // Note that later in the code we assume, that distrib dirs follow the same pattern // for all postgres versions. if env.pg_distrib_dir == Path::new("") { if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { env.pg_distrib_dir = postgres_bin.into(); } else { let cwd = env::current_dir()?; env.pg_distrib_dir = cwd.join("pg_install") } } // Find neon binaries. if env.neon_distrib_dir == Path::new("") { env.neon_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); } if env.pageservers.is_empty() { anyhow::bail!("Configuration must contain at least one pageserver"); } env.base_data_dir = base_path(); Ok(env) } /// Locate and load config pub fn load_config() -> anyhow::Result { let repopath = base_path(); if !repopath.exists() { bail!( "Neon config is not found in {}. You need to run 'neon_local init' first", repopath.to_str().unwrap() ); } // TODO: check that it looks like a neon repository // load and parse file let config = fs::read_to_string(repopath.join("config"))?; let mut env: LocalEnv = toml::from_str(config.as_str())?; env.base_data_dir = repopath; Ok(env) } pub fn persist_config(&self, base_path: &Path) -> anyhow::Result<()> { // Currently, the user first passes a config file with 'neon_local init --config=' // We read that in, in `create_config`, and fill any missing defaults. Then it's saved // to .neon/config. TODO: We lose any formatting and comments along the way, which is // a bit sad. let mut conf_content = r#"# This file describes a local deployment of the page server # and safekeeeper node. It is read by the 'neon_local' command-line # utility. "# .to_string(); // Convert the LocalEnv to a toml file. // // This could be as simple as this: // // conf_content += &toml::to_string_pretty(env)?; // // But it results in a "values must be emitted before tables". I'm not sure // why, AFAICS the table, i.e. 'safekeepers: Vec' is last. // Maybe rust reorders the fields to squeeze avoid padding or something? // In any case, converting to toml::Value first, and serializing that, works. // See https://github.com/alexcrichton/toml-rs/issues/142 conf_content += &toml::to_string_pretty(&toml::Value::try_from(self)?)?; let target_config_path = base_path.join("config"); fs::write(&target_config_path, conf_content).with_context(|| { format!( "Failed to write config file into path '{}'", target_config_path.display() ) }) } // this function is used only for testing purposes in CLI e g generate tokens during init pub fn generate_auth_token(&self, claims: &Claims) -> anyhow::Result { let private_key_path = self.get_private_key_path(); let key_data = fs::read(private_key_path)?; encode_from_key_file(claims, &key_data) } pub fn get_private_key_path(&self) -> PathBuf { if self.private_key_path.is_absolute() { self.private_key_path.to_path_buf() } else { self.base_data_dir.join(&self.private_key_path) } } // // Initialize a new Neon repository // pub fn init(&mut self, pg_version: u32, force: &InitForceMode) -> anyhow::Result<()> { // check if config already exists let base_path = &self.base_data_dir; ensure!( base_path != Path::new(""), "repository base path is missing" ); if base_path.exists() { match force { InitForceMode::MustNotExist => { bail!( "directory '{}' already exists. Perhaps already initialized?", base_path.display() ); } InitForceMode::EmptyDirOk => { if let Some(res) = std::fs::read_dir(base_path)?.next() { res.context("check if directory is empty")?; anyhow::bail!("directory not empty: {base_path:?}"); } } InitForceMode::RemoveAllContents => { println!("removing all contents of '{}'", base_path.display()); // instead of directly calling `remove_dir_all`, we keep the original dir but removing // all contents inside. This helps if the developer symbol links another directory (i.e., // S3 local SSD) to the `.neon` base directory. for entry in std::fs::read_dir(base_path)? { let entry = entry?; let path = entry.path(); if path.is_dir() { fs::remove_dir_all(&path)?; } else { fs::remove_file(&path)?; } } } } } if !self.pg_bin_dir(pg_version)?.join("postgres").exists() { bail!( "Can't find postgres binary at {}", self.pg_bin_dir(pg_version)?.display() ); } for binary in ["pageserver", "safekeeper"] { if !self.neon_distrib_dir.join(binary).exists() { bail!( "Can't find binary '{binary}' in neon distrib dir '{}'", self.neon_distrib_dir.display() ); } } if !base_path.exists() { fs::create_dir(base_path)?; } // Generate keypair for JWT. // // The keypair is only needed if authentication is enabled in any of the // components. For convenience, we generate the keypair even if authentication // is not enabled, so that you can easily enable it after the initialization // step. However, if the key generation fails, we treat it as non-fatal if // authentication was not enabled. if self.private_key_path == PathBuf::new() { match generate_auth_keys( base_path.join("auth_private_key.pem").as_path(), base_path.join("auth_public_key.pem").as_path(), ) { Ok(()) => { self.private_key_path = PathBuf::from("auth_private_key.pem"); } Err(e) => { if !self.auth_keys_needed() { eprintln!("Could not generate keypair for JWT authentication: {e}"); eprintln!("Continuing anyway because authentication was not enabled"); self.private_key_path = PathBuf::from("auth_private_key.pem"); } else { return Err(e); } } } } fs::create_dir_all(self.endpoints_path())?; for safekeeper in &self.safekeepers { fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?; } self.persist_config(base_path) } fn auth_keys_needed(&self) -> bool { self.pageservers.iter().any(|ps| { ps.pg_auth_type == AuthType::NeonJWT || ps.http_auth_type == AuthType::NeonJWT }) || self.safekeepers.iter().any(|sk| sk.auth_enabled) } } fn base_path() -> PathBuf { match std::env::var_os("NEON_REPO_DIR") { Some(val) => PathBuf::from(val), None => PathBuf::from(".neon"), } } /// Generate a public/private key pair for JWT authentication fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow::Result<()> { // Generate the key pair // // openssl genpkey -algorithm ed25519 -out auth_private_key.pem let keygen_output = Command::new("openssl") .arg("genpkey") .args(["-algorithm", "ed25519"]) .args(["-out", private_key_path.to_str().unwrap()]) .stdout(Stdio::null()) .output() .context("failed to generate auth private key")?; if !keygen_output.status.success() { bail!( "openssl failed: '{}'", String::from_utf8_lossy(&keygen_output.stderr) ); } // Extract the public key from the private key file // // openssl pkey -in auth_private_key.pem -pubout -out auth_public_key.pem let keygen_output = Command::new("openssl") .arg("pkey") .args(["-in", private_key_path.to_str().unwrap()]) .arg("-pubout") .args(["-out", public_key_path.to_str().unwrap()]) .output() .context("failed to extract public key from private key")?; if !keygen_output.status.success() { bail!( "openssl failed: '{}'", String::from_utf8_lossy(&keygen_output.stderr) ); } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn simple_conf_parsing() { let simple_conf_toml = include_str!("../simple.conf"); let simple_conf_parse_result = LocalEnv::parse_config(simple_conf_toml); assert!( simple_conf_parse_result.is_ok(), "failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}" ); let string_to_replace = "listen_addr = '127.0.0.1:50051'"; let spoiled_url_str = "listen_addr = '!@$XOXO%^&'"; let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str); assert!( spoiled_url_toml.contains(spoiled_url_str), "Failed to replace string {string_to_replace} in the toml file {simple_conf_toml}" ); let spoiled_url_parse_result = LocalEnv::parse_config(&spoiled_url_toml); assert!( spoiled_url_parse_result.is_err(), "expected toml with invalid Url {spoiled_url_toml} to fail the parsing, but got {spoiled_url_parse_result:?}" ); } }