diff --git a/Cargo.lock b/Cargo.lock index ce24bbcee8..ebe1d31e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,6 +1105,7 @@ dependencies = [ "anyhow", "clap 4.2.2", "comfy-table", + "file-lock", "git-version", "nix", "once_cell", @@ -1550,6 +1551,16 @@ dependencies = [ "instant", ] +[[package]] +name = "file-lock" +version = "2.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "filetime" version = "0.2.21" diff --git a/Cargo.toml b/Cargo.toml index 0b545e6190..eabe14f400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ either = "1.8" enum-map = "2.4.2" enumset = "1.0.12" fail = "0.5.0" +file-lock = "2.1.9" fs2 = "0.4.3" futures = "0.3" futures-core = "0.3" diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index ba39747e03..9e34e852b8 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true anyhow.workspace = true clap.workspace = true comfy-table.workspace = true +file-lock.workspace = true git-version.workspace = true nix.workspace = true once_cell.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 665cad8783..3bff4d3416 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -365,11 +365,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info.last_record_lsn; - env.register_branch_mapping( - DEFAULT_BRANCH_NAME.to_string(), - new_tenant_id, - new_timeline_id, - )?; + env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?; println!( "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}", @@ -411,7 +407,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - Some(("list", list_match)) => { let tenant_id = get_tenant_id(list_match, env)?; let timelines = pageserver.timeline_list(&tenant_id)?; - print_timelines_tree(timelines, env.timeline_name_mappings())?; + print_timelines_tree(timelines, env.timeline_name_mappings()?)?; } Some(("create", create_match)) => { let tenant_id = get_tenant_id(create_match, env)?; @@ -429,7 +425,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info.last_record_lsn; - env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?; + env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?; println!( "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}", @@ -468,10 +464,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - .copied() .context("Failed to parse postgres version from the argument string")?; - let mut cplane = ComputeControlPlane::load(env.clone())?; + let mut cplane = ComputeControlPlane::new(env.clone()); println!("Importing timeline into pageserver ..."); pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?; - env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; + env.register_branch_mapping(name, tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?; @@ -487,7 +483,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - .map(|s| s.as_str()) .unwrap_or(DEFAULT_BRANCH_NAME); let ancestor_timeline_id = env - .get_branch_timeline_id(ancestor_branch_name, tenant_id) + .get_branch_timeline_id(ancestor_branch_name, tenant_id)? .ok_or_else(|| { anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'") })?; @@ -508,7 +504,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let last_record_lsn = timeline_info.last_record_lsn; - env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?; + env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?; println!( "Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'", @@ -528,7 +524,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( None => bail!("no endpoint subcommand provided"), }; - let mut cplane = ComputeControlPlane::load(env.clone())?; + let mut cplane = ComputeControlPlane::new(env.clone()); // All subcommands take an optional --tenant-id option let tenant_id = get_tenant_id(sub_args, env)?; @@ -540,7 +536,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( HashMap::new() }); - let timeline_name_mappings = env.timeline_name_mappings(); + let timeline_name_mappings = env.timeline_name_mappings()?; let mut table = comfy_table::Table::new(); @@ -555,8 +551,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( "STATUS", ]); - for (endpoint_id, endpoint) in cplane - .endpoints + for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)? .iter() .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id) { @@ -609,7 +604,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .transpose() .context("Failed to parse Lsn from the request")?; let timeline_id = env - .get_branch_timeline_id(branch_name, tenant_id) + .get_branch_timeline_id(branch_name, tenant_id)? .ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?; let port: Option = sub_args.get_one::("port").copied(); @@ -627,7 +622,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; - let endpoint = cplane.endpoints.get(endpoint_id.as_str()); + let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?; let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) { let claims = Claims::new(Some(tenant_id), Scope::Tenant); @@ -646,7 +641,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .map(|s| s.as_str()) .unwrap_or(DEFAULT_BRANCH_NAME); let timeline_id = env - .get_branch_timeline_id(branch_name, tenant_id) + .get_branch_timeline_id(branch_name, tenant_id)? .ok_or_else(|| { anyhow!("Found no timeline id for branch name '{branch_name}'") })?; @@ -683,9 +678,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?; let destroy = sub_args.get_flag("destroy"); - let endpoint = cplane - .endpoints - .get(endpoint_id.as_str()) + let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)? .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; endpoint.stop(destroy)?; } @@ -844,9 +837,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { let pageserver = PageServerNode::from_env(env); // Stop all endpoints - match ComputeControlPlane::load(env.clone()) { - Ok(cplane) => { - for (_k, node) in cplane.endpoints { + match ComputeControlPlane::load_endpoints(env) { + Ok(endpoints) => { + for (_k, node) in endpoints { if let Err(e) = node.stop(false) { eprintln!("postgres stop failed: {e:#}"); } diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 59b306382d..534dd5fdbf 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -43,41 +43,72 @@ pub struct EndpointConf { pub struct ComputeControlPlane { base_port: u16, - // endpoint ID is the key - pub endpoints: BTreeMap>, - env: LocalEnv, pageserver: Arc, } impl ComputeControlPlane { - // Load current endpoints from the endpoints/ subdirectories - pub fn load(env: LocalEnv) -> Result { + pub fn new(env: LocalEnv) -> Self { let pageserver = Arc::new(PageServerNode::from_env(&env)); + ComputeControlPlane { + base_port: 55431, + env, + pageserver, + } + } + + // Load current endpoints from the endpoints/ subdirectories + // + // endpoint ID is the key in the returned BTreeMap. + // + // NOTE: This is not concurrency-safe, and can fail if another 'neon_local' + // invocation is creating or deleting an endpoint at the same time. + pub fn load_endpoints(env: &LocalEnv) -> Result>> { + let pageserver = Arc::new(PageServerNode::from_env(env)); let mut endpoints = BTreeMap::default(); for endpoint_dir in fs::read_dir(env.endpoints_path()) .with_context(|| format!("failed to list {}", env.endpoints_path().display()))? { - let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?; + let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?; endpoints.insert(ep.name.clone(), Arc::new(ep)); } - Ok(ComputeControlPlane { - base_port: 55431, - endpoints, - env, - pageserver, - }) + Ok(endpoints) } - fn get_port(&mut self) -> u16 { - 1 + self - .endpoints + // Load an endpoint from the endpoints/ subdirectories + pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result> { + let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json"); + if !endpoint_json_path.exists() { + return Ok(None); + } + + // Read the endpoint.json file + let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?; + + // ok now + let pageserver = Arc::new(PageServerNode::from_env(env)); + Ok(Some(Endpoint { + address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port), + name: name.to_string(), + env: env.clone(), + pageserver, + timeline_id: conf.timeline_id, + lsn: conf.lsn, + tenant_id: conf.tenant_id, + pg_version: conf.pg_version, + })) + } + + fn get_port(&self) -> anyhow::Result { + let endpoints = ComputeControlPlane::load_endpoints(&self.env)?; + let next_port = 1 + endpoints .values() .map(|ep| ep.address.port()) .max() - .unwrap_or(self.base_port) + .unwrap_or(self.base_port); + Ok(next_port) } pub fn new_endpoint( @@ -89,7 +120,13 @@ impl ComputeControlPlane { port: Option, pg_version: u32, ) -> Result> { - let port = port.unwrap_or_else(|| self.get_port()); + // NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from + // two 'neon_local' invocations at the same time, IF the port is specified + // explicitly. (get_port() is racy) + let port = match port { + Some(port) => port, + None => self.get_port()?, + }; let ep = Arc::new(Endpoint { name: name.to_owned(), address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -114,8 +151,6 @@ impl ComputeControlPlane { )?; ep.setup_pg_conf()?; - self.endpoints.insert(ep.name.clone(), Arc::clone(&ep)); - Ok(ep) } } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 2b1eec7c4b..b60f32fc23 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -5,6 +5,7 @@ use anyhow::{bail, ensure, Context}; +use file_lock::{FileLock, FileOptions}; use postgres_backend::AuthType; use reqwest::Url; use serde::{Deserialize, Serialize}; @@ -12,11 +13,14 @@ use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; use std::env; use std::fs; +use std::io::Seek; use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::SocketAddr; +use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::str::FromStr; use utils::{ auth::{encode_from_key_file, Claims}, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -72,14 +76,84 @@ pub struct LocalEnv { #[serde(default)] pub safekeepers: Vec, +} - /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. - #[serde(default)] - // A `HashMap>` would be more appropriate here, - // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. - // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". - #[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")] - branch_name_mappings: HashMap>, +// Keep human-readable aliases in memory (and persist them to +// 'branch_name_mappings.json'), to hide ZId hex strings from the user. +// +// BranchNameMappingsSerialized corresponds to the actual JSON format of +// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert +// it to/from BranchNameMappings when reading/writing the file. +type BranchNameMappings = HashMap<(TenantId, String), TimelineId>; + +type BranchNameMappingsSerialized = HashMap>; + +pub struct BranchNameMappingsLock { + mappings: BranchNameMappings, + lock: FileLock, +} + +impl Deref for BranchNameMappingsLock { + type Target = HashMap<(TenantId, String), TimelineId>; + + fn deref(&self) -> &Self::Target { + &self.mappings + } +} +impl DerefMut for BranchNameMappingsLock { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.mappings + } +} + +impl BranchNameMappingsLock { + /// Write the modified branch-name mapppings back to 'branch_name_mappings.json', + /// and release the lock. + fn write_to_file(mut self) -> anyhow::Result<()> { + let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new(); + for ((tenant_id, branch_name), timeline_id) in self.iter() { + serialized_mappings + .entry(tenant_id.to_string()) + .or_default() + .insert(branch_name.clone(), timeline_id.to_string()); + } + + self.lock.file.set_len(0)?; + self.lock.file.rewind()?; + serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?; + Ok(()) + } +} + +/// Get the branch-name mappings. +/// +/// This returns a guard object that holds a lock on the branch_name_mappings.json +/// file. That makes it safe for two 'neon_local' invocations to read/manipulate +/// branch name mappings at the same time. +pub fn load_branch_name_mappings() -> anyhow::Result { + let path = base_path().join("branch_name_mappings.json"); + let lock = FileLock::lock( + path, + true, + FileOptions::new().create(true).read(true).write(true), + )?; + + let mut mappings = BranchNameMappings::new(); + if lock.file.metadata()?.len() > 0 { + let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file) + .context("Failed to read branch_name_mappings.json")?; + + for (tenant_str, map) in serialized_mappings.iter() { + for (branch_name, timeline_str) in map.iter() { + mappings.insert( + (TenantId::from_str(tenant_str)?, branch_name.to_string()), + TimelineId::from_str(timeline_str)?, + ); + } + } + } + + Ok(BranchNameMappingsLock { mappings, lock }) } /// Broker config for cluster internal communication. @@ -215,27 +289,21 @@ impl LocalEnv { pub fn register_branch_mapping( &mut self, - branch_name: String, + branch_name: &str, tenant_id: TenantId, timeline_id: TimelineId, ) -> anyhow::Result<()> { - let existing_values = self - .branch_name_mappings - .entry(branch_name.clone()) - .or_default(); + let mut mappings = load_branch_name_mappings()?; - let existing_ids = existing_values - .iter() - .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id); - - if let Some((_, old_timeline_id)) = existing_ids { + if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) { if old_timeline_id == &timeline_id { Ok(()) } else { bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"); } } else { - existing_values.push((tenant_id, timeline_id)); + mappings.insert((tenant_id, branch_name.to_string()), timeline_id); + mappings.write_to_file()?; Ok(()) } } @@ -244,24 +312,22 @@ impl LocalEnv { &self, branch_name: &str, tenant_id: TenantId, - ) -> Option { - self.branch_name_mappings - .get(branch_name)? - .iter() - .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id) - .map(|&(_, timeline_id)| timeline_id) - .map(TimelineId::from) + ) -> anyhow::Result> { + let mappings = load_branch_name_mappings()?; + Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied()) } - pub fn timeline_name_mappings(&self) -> HashMap { - self.branch_name_mappings + pub fn timeline_name_mappings(&self) -> anyhow::Result> { + let mappings = load_branch_name_mappings()?; + Ok(mappings .iter() - .flat_map(|(name, tenant_timelines)| { - tenant_timelines.iter().map(|&(tenant_id, timeline_id)| { - (TenantTimelineId::new(tenant_id, timeline_id), name.clone()) - }) + .map(|((tenant_id, branch_name), timeline_id)| { + ( + TenantTimelineId::new(*tenant_id, *timeline_id), + branch_name.clone(), + ) }) - .collect() + .collect()) } /// Create a LocalEnv from a config file. diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index e262202a73..3bb007ed27 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -16,7 +16,8 @@ from fixtures.neon_fixtures import ( ) from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload -from fixtures.types import Lsn +from fixtures.types import Lsn, TenantId, TimelineId +from fixtures.utils import query_scalar from pytest import FixtureRequest # @@ -58,6 +59,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") + with endpoint.cursor() as cur: + tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id")) + timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id")) + # FIXME: Is this expected? env.pageserver.allowed_errors.append( ".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*" @@ -69,10 +74,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o ["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"] ) - snapshot_config = toml.load(test_output_dir / "repo" / "config") - tenant_id = snapshot_config["default_tenant_id"] - timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id] - pageserver_http = env.pageserver.http_client() lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])