mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
neon_local: add a flock to protect against concurrent execution (#10185)
## Problem `neon_local` has always been unsafe to run concurrently with itself: it uses simple text files for persistent state, and concurrent runs will step on each other. In some test environments we intentionally handle this with mutexes in python land, but it's fragile to try and always remember to do that. ## Summary of changes - Add a `flock` based mutex around the `main` function of neon_local, using the repo directory as the file to lock - Clean up an Option<> around control_plane_api, this is a drive-by change because it was one of the fields that had a weird effect when previous concurrent stuff stamped on it.
This commit is contained in:
@@ -19,6 +19,7 @@ use control_plane::storage_controller::{
|
||||
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
|
||||
};
|
||||
use control_plane::{broker, local_env};
|
||||
use nix::fcntl::{flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
@@ -36,6 +37,8 @@ use safekeeper_api::{
|
||||
};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::fs::File;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
@@ -689,6 +692,21 @@ struct TimelineTreeEl {
|
||||
pub children: BTreeSet<TimelineId>,
|
||||
}
|
||||
|
||||
/// A flock-based guard over the neon_local repository directory
|
||||
struct RepoLock {
|
||||
_file: File,
|
||||
}
|
||||
|
||||
impl RepoLock {
|
||||
fn new() -> Result<Self> {
|
||||
let repo_dir = File::open(local_env::base_path())?;
|
||||
let repo_dir_fd = repo_dir.as_raw_fd();
|
||||
flock(repo_dir_fd, FlockArg::LockExclusive)?;
|
||||
|
||||
Ok(Self { _file: repo_dir })
|
||||
}
|
||||
}
|
||||
|
||||
// Main entry point for the 'neon_local' CLI utility
|
||||
//
|
||||
// This utility helps to manage neon installation. That includes following:
|
||||
@@ -700,9 +718,14 @@ fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
// Check for 'neon init' command first.
|
||||
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
handle_init(&args).map(|env| Some(Cow::Owned(env)))
|
||||
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
|
||||
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
|
||||
} else {
|
||||
// This tool uses a collection of simple files to store its state, and consequently
|
||||
// it is not generally safe to run multiple commands concurrently. Rather than expect
|
||||
// all callers to know this, use a lock file to protect against concurrent execution.
|
||||
let _repo_lock = RepoLock::new().unwrap();
|
||||
|
||||
// all other commands need an existing config
|
||||
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
|
||||
let original_env = env.clone();
|
||||
@@ -728,11 +751,12 @@ fn main() -> Result<()> {
|
||||
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
|
||||
};
|
||||
|
||||
if &original_env != env {
|
||||
let subcommand_result = if &original_env != env {
|
||||
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
|
||||
} else {
|
||||
subcommand_result.map(|()| None)
|
||||
}
|
||||
};
|
||||
(subcommand_result, Some(_repo_lock))
|
||||
};
|
||||
|
||||
match subcommand_result {
|
||||
@@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
} else {
|
||||
// User (likely interactive) did not provide a description of the environment, give them the default
|
||||
NeonLocalInitConf {
|
||||
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
|
||||
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
|
||||
broker: NeonBroker {
|
||||
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
|
||||
},
|
||||
@@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
|
||||
broker::start_broker_process(env, &retry_timeout).await
|
||||
});
|
||||
|
||||
// Only start the storage controller if the pageserver is configured to need it
|
||||
if env.control_plane_api.is_some() {
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
}
|
||||
js.spawn(async move {
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
storage_controller
|
||||
.start(NeonStorageControllerStartArgs::with_default_instance_id(
|
||||
retry_timeout,
|
||||
))
|
||||
.await
|
||||
.map_err(|e| e.context("start storage_controller"))
|
||||
});
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
js.spawn(async move {
|
||||
@@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
|
||||
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
|
||||
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
|
||||
|
||||
if env.control_plane_api.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let storcon = StorageController::from_env(env);
|
||||
|
||||
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
|
||||
|
||||
Reference in New Issue
Block a user