neon_local: Fix race conditions creating multiple endpoints concurrently

The list of endpoints was loaded by scanning the endpoints directory,
whenever you started 'neon_local'. If you ran two 'neon_local endpoint
create' commands concurrently, one of them might fail because of the
other half-finished endpoint creation. There might be a directory
without the endpoint.json file, for example, or a half-written
endpoint.json file. Most commands don't need the full list of
endpoints, however, so only load that when it's needed. In particular,
it is now possible to run two "neon_local endpoint create" commands in
parallel, if you explicitly assign a port to both. (If you let
neon_local assign the port, it needs to check all the other endpoints
to decide what ports are free.)

Another race condition was with the branch-name mappings. They were
previously stored in the 'config' file, which would also fail if two
'neon_local' commands tried to change it at the same time. To fix that
race condition, use advisory file lock to coordinate access to the
branch-name mappings, and only read/modify that file in commands that
actually need the mappings. While at it, move the mappings to a
separate 'branch_name_mappings.json' file.

The motivation for these changes is that I wanted to create a python
test that creates lots of tenants concurrently. Without these changes,
the concurrent tenant creations would often hit these 'neon_local'
race conditions and fail.

'neon_local' is still not concurrency-safe in general, but this is a
good start.
This commit is contained in:
Heikki Linnakangas
2023-04-24 09:53:08 +03:00
parent 73b369531d
commit 74d3cdeaf0
7 changed files with 188 additions and 80 deletions

11
Cargo.lock generated
View File

@@ -1105,6 +1105,7 @@ dependencies = [
"anyhow",
"clap 4.2.2",
"comfy-table",
"file-lock",
"git-version",
"nix",
"once_cell",
@@ -1550,6 +1551,16 @@ dependencies = [
"instant",
]
[[package]]
name = "file-lock"
version = "2.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f59be9010c5418713a48aac4c1b897d85dafd958055683dc31bdae553536647b"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "filetime"
version = "0.2.21"

View File

@@ -42,6 +42,7 @@ either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
file-lock = "2.1.9"
fs2 = "0.4.3"
futures = "0.3"
futures-core = "0.3"

View File

@@ -8,6 +8,7 @@ license.workspace = true
anyhow.workspace = true
clap.workspace = true
comfy-table.workspace = true
file-lock.workspace = true
git-version.workspace = true
nix.workspace = true
once_cell.workspace = true

View File

@@ -365,11 +365,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
new_tenant_id,
new_timeline_id,
)?;
env.register_branch_mapping(DEFAULT_BRANCH_NAME, new_tenant_id, new_timeline_id)?;
println!(
"Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}",
@@ -411,7 +407,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Some(("list", list_match)) => {
let tenant_id = get_tenant_id(list_match, env)?;
let timelines = pageserver.timeline_list(&tenant_id)?;
print_timelines_tree(timelines, env.timeline_name_mappings())?;
print_timelines_tree(timelines, env.timeline_name_mappings()?)?;
}
Some(("create", create_match)) => {
let tenant_id = get_tenant_id(create_match, env)?;
@@ -429,7 +425,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}",
@@ -468,10 +464,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.copied()
.context("Failed to parse postgres version from the argument string")?;
let mut cplane = ComputeControlPlane::load(env.clone())?;
let mut cplane = ComputeControlPlane::new(env.clone());
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
env.register_branch_mapping(name, tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
@@ -487,7 +483,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let ancestor_timeline_id = env
.get_branch_timeline_id(ancestor_branch_name, tenant_id)
.get_branch_timeline_id(ancestor_branch_name, tenant_id)?
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
@@ -508,7 +504,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
env.register_branch_mapping(new_branch_name, tenant_id, new_timeline_id)?;
println!(
"Created timeline '{}' at Lsn {last_record_lsn} for tenant: {tenant_id}. Ancestor timeline: '{ancestor_branch_name}'",
@@ -528,7 +524,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
let mut cplane = ComputeControlPlane::new(env.clone());
// All subcommands take an optional --tenant-id option
let tenant_id = get_tenant_id(sub_args, env)?;
@@ -540,7 +536,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
HashMap::new()
});
let timeline_name_mappings = env.timeline_name_mappings();
let timeline_name_mappings = env.timeline_name_mappings()?;
let mut table = comfy_table::Table::new();
@@ -555,8 +551,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
"STATUS",
]);
for (endpoint_id, endpoint) in cplane
.endpoints
for (endpoint_id, endpoint) in ComputeControlPlane::load_endpoints(env)?
.iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
@@ -609,7 +604,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.transpose()
.context("Failed to parse Lsn from the request")?;
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)
.get_branch_timeline_id(branch_name, tenant_id)?
.ok_or_else(|| anyhow!("Found no timeline id for branch name '{branch_name}'"))?;
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -627,7 +622,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?;
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -646,7 +641,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let timeline_id = env
.get_branch_timeline_id(branch_name, tenant_id)
.get_branch_timeline_id(branch_name, tenant_id)?
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{branch_name}'")
})?;
@@ -683,9 +678,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
let endpoint = ComputeControlPlane::load_endpoint(endpoint_id.as_str(), env)?
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
@@ -844,9 +837,9 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.endpoints {
match ComputeControlPlane::load_endpoints(env) {
Ok(endpoints) => {
for (_k, node) in endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}

View File

@@ -43,41 +43,72 @@ pub struct EndpointConf {
pub struct ComputeControlPlane {
base_port: u16,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
pub fn new(env: LocalEnv) -> Self {
let pageserver = Arc::new(PageServerNode::from_env(&env));
ComputeControlPlane {
base_port: 55431,
env,
pageserver,
}
}
// Load current endpoints from the endpoints/ subdirectories
//
// endpoint ID is the key in the returned BTreeMap.
//
// NOTE: This is not concurrency-safe, and can fail if another 'neon_local'
// invocation is creating or deleting an endpoint at the same time.
pub fn load_endpoints(env: &LocalEnv) -> Result<BTreeMap<String, Arc<Endpoint>>> {
let pageserver = Arc::new(PageServerNode::from_env(env));
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
let ep = Endpoint::from_dir_entry(endpoint_dir?, env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(ComputeControlPlane {
base_port: 55431,
endpoints,
env,
pageserver,
})
Ok(endpoints)
}
fn get_port(&mut self) -> u16 {
1 + self
.endpoints
// Load an endpoint from the endpoints/ subdirectories
pub fn load_endpoint(name: &str, env: &LocalEnv) -> Result<Option<Endpoint>> {
let endpoint_json_path = env.endpoints_path().join(name).join("endpoint.json");
if !endpoint_json_path.exists() {
return Ok(None);
}
// Read the endpoint.json file
let conf: EndpointConf = serde_json::from_slice(&std::fs::read(endpoint_json_path)?)?;
// ok now
let pageserver = Arc::new(PageServerNode::from_env(env));
Ok(Some(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.port),
name: name.to_string(),
env: env.clone(),
pageserver,
timeline_id: conf.timeline_id,
lsn: conf.lsn,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
}))
}
fn get_port(&self) -> anyhow::Result<u16> {
let endpoints = ComputeControlPlane::load_endpoints(&self.env)?;
let next_port = 1 + endpoints
.values()
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port)
.unwrap_or(self.base_port);
Ok(next_port)
}
pub fn new_endpoint(
@@ -89,7 +120,13 @@ impl ComputeControlPlane {
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
// NOTE: Unlike most of neon_local, 'new_endpoint' is safe to run from
// two 'neon_local' invocations at the same time, IF the port is specified
// explicitly. (get_port() is racy)
let port = match port {
Some(port) => port,
None => self.get_port()?,
};
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
@@ -114,8 +151,6 @@ impl ComputeControlPlane {
)?;
ep.setup_pg_conf()?;
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(ep)
}
}

View File

@@ -5,6 +5,7 @@
use anyhow::{bail, ensure, Context};
use file_lock::{FileLock, FileOptions};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -12,11 +13,14 @@ use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::Seek;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::str::FromStr;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -72,14 +76,84 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
}
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
// Keep human-readable aliases in memory (and persist them to
// 'branch_name_mappings.json'), to hide ZId hex strings from the user.
//
// BranchNameMappingsSerialized corresponds to the actual JSON format of
// 'branch_name_mappings.json' file. It's a bit more awkward to work with, so we convert
// it to/from BranchNameMappings when reading/writing the file.
type BranchNameMappings = HashMap<(TenantId, String), TimelineId>;
type BranchNameMappingsSerialized = HashMap<String, HashMap<String, String>>;
pub struct BranchNameMappingsLock {
mappings: BranchNameMappings,
lock: FileLock,
}
impl Deref for BranchNameMappingsLock {
type Target = HashMap<(TenantId, String), TimelineId>;
fn deref(&self) -> &Self::Target {
&self.mappings
}
}
impl DerefMut for BranchNameMappingsLock {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.mappings
}
}
impl BranchNameMappingsLock {
/// Write the modified branch-name mapppings back to 'branch_name_mappings.json',
/// and release the lock.
fn write_to_file(mut self) -> anyhow::Result<()> {
let mut serialized_mappings: BranchNameMappingsSerialized = HashMap::new();
for ((tenant_id, branch_name), timeline_id) in self.iter() {
serialized_mappings
.entry(tenant_id.to_string())
.or_default()
.insert(branch_name.clone(), timeline_id.to_string());
}
self.lock.file.set_len(0)?;
self.lock.file.rewind()?;
serde_json::to_writer_pretty(&self.lock.file, &serialized_mappings)?;
Ok(())
}
}
/// Get the branch-name mappings.
///
/// This returns a guard object that holds a lock on the branch_name_mappings.json
/// file. That makes it safe for two 'neon_local' invocations to read/manipulate
/// branch name mappings at the same time.
pub fn load_branch_name_mappings() -> anyhow::Result<BranchNameMappingsLock> {
let path = base_path().join("branch_name_mappings.json");
let lock = FileLock::lock(
path,
true,
FileOptions::new().create(true).read(true).write(true),
)?;
let mut mappings = BranchNameMappings::new();
if lock.file.metadata()?.len() > 0 {
let serialized_mappings: BranchNameMappingsSerialized = serde_json::from_reader(&lock.file)
.context("Failed to read branch_name_mappings.json")?;
for (tenant_str, map) in serialized_mappings.iter() {
for (branch_name, timeline_str) in map.iter() {
mappings.insert(
(TenantId::from_str(tenant_str)?, branch_name.to_string()),
TimelineId::from_str(timeline_str)?,
);
}
}
}
Ok(BranchNameMappingsLock { mappings, lock })
}
/// Broker config for cluster internal communication.
@@ -215,27 +289,21 @@ impl LocalEnv {
pub fn register_branch_mapping(
&mut self,
branch_name: String,
branch_name: &str,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
let existing_values = self
.branch_name_mappings
.entry(branch_name.clone())
.or_default();
let mut mappings = load_branch_name_mappings()?;
let existing_ids = existing_values
.iter()
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
if let Some((_, old_timeline_id)) = existing_ids {
if let Some(old_timeline_id) = mappings.get(&(tenant_id, branch_name.to_string())) {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));
mappings.insert((tenant_id, branch_name.to_string()), timeline_id);
mappings.write_to_file()?;
Ok(())
}
}
@@ -244,24 +312,22 @@ impl LocalEnv {
&self,
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(TimelineId::from)
) -> anyhow::Result<Option<TimelineId>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings.get(&(tenant_id, branch_name.to_string())).copied())
}
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
self.branch_name_mappings
pub fn timeline_name_mappings(&self) -> anyhow::Result<HashMap<TenantTimelineId, String>> {
let mappings = load_branch_name_mappings()?;
Ok(mappings
.iter()
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
(TenantTimelineId::new(tenant_id, timeline_id), name.clone())
})
.map(|((tenant_id, branch_name), timeline_id)| {
(
TenantTimelineId::new(*tenant_id, *timeline_id),
branch_name.clone(),
)
})
.collect()
.collect())
}
/// Create a LocalEnv from a config file.

View File

@@ -16,7 +16,8 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
from pytest import FixtureRequest
#
@@ -58,6 +59,10 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
with endpoint.cursor() as cur:
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
timeline_id = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
@@ -69,10 +74,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])