mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
neon_local: fix endpoint api to prevent two primary endpoints (#5520)
`neon_local endpoint` subcommand currently allows creating two primary endpoints for the same branch which leads to shutdown of both endpoints `neon_local endpoint start` new behavior: 1. Fail if endpoint doesn't exist 2. Fail if two primary conflict detected Fixes #4959 Closes #5426 Signed-off-by: Rahul Modpur <rmodpur2@gmail.com> Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
@@ -149,6 +149,9 @@ tenant 9ef87a5bf0d92544f6fafeeb3239695c successfully created on the pageserver
|
||||
Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50 for tenant: 9ef87a5bf0d92544f6fafeeb3239695c
|
||||
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
|
||||
|
||||
# create postgres compute node
|
||||
> cargo neon endpoint create main
|
||||
|
||||
# start postgres compute node
|
||||
> cargo neon endpoint start main
|
||||
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
|
||||
@@ -185,8 +188,11 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
|
||||
(L) main [de200bd42b49cc1814412c7e592dd6e9]
|
||||
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
|
||||
|
||||
# create postgres on that branch
|
||||
> cargo neon endpoint create migration_check --branch-name migration_check
|
||||
|
||||
# start postgres on that branch
|
||||
> cargo neon endpoint start migration_check --branch-name migration_check
|
||||
> cargo neon endpoint start migration_check
|
||||
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
|
||||
Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55434/postgres'
|
||||
|
||||
|
||||
@@ -608,11 +608,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
};
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
|
||||
// All subcommands take an optional --tenant-id option
|
||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||
|
||||
match sub_name {
|
||||
"list" => {
|
||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||
let timeline_infos = get_timeline_infos(env, &tenant_id).unwrap_or_else(|e| {
|
||||
eprintln!("Failed to load timeline info: {}", e);
|
||||
HashMap::new()
|
||||
@@ -672,6 +670,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
println!("{table}");
|
||||
}
|
||||
"create" => {
|
||||
let tenant_id = get_tenant_id(sub_args, env)?;
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.map(|s| s.as_str())
|
||||
@@ -716,6 +715,18 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
match (mode, hot_standby) {
|
||||
(ComputeMode::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(ComputeMode::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
|
||||
|
||||
cplane.new_endpoint(
|
||||
&endpoint_id,
|
||||
tenant_id,
|
||||
@@ -728,8 +739,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
let pg_port: Option<u16> = sub_args.get_one::<u16>("pg-port").copied();
|
||||
let http_port: Option<u16> = sub_args.get_one::<u16>("http-port").copied();
|
||||
let endpoint_id = sub_args
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
@@ -758,80 +767,28 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
env.safekeepers.iter().map(|sk| sk.id).collect()
|
||||
};
|
||||
|
||||
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
|
||||
let endpoint = cplane
|
||||
.endpoints
|
||||
.get(endpoint_id.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?;
|
||||
|
||||
cplane.check_conflicting_endpoints(
|
||||
endpoint.mode,
|
||||
endpoint.tenant_id,
|
||||
endpoint.timeline_id,
|
||||
)?;
|
||||
|
||||
let ps_conf = env.get_pageserver_conf(pageserver_id)?;
|
||||
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
|
||||
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
|
||||
let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant);
|
||||
|
||||
Some(env.generate_auth_token(&claims)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let hot_standby = sub_args
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(endpoint) = endpoint {
|
||||
match (&endpoint.mode, hot_standby) {
|
||||
(ComputeMode::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(ComputeMode::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(DEFAULT_BRANCH_NAME);
|
||||
let timeline_id = env
|
||||
.get_branch_timeline_id(branch_name, tenant_id)
|
||||
.ok_or_else(|| {
|
||||
anyhow!("Found no timeline id for branch name '{branch_name}'")
|
||||
})?;
|
||||
let lsn = sub_args
|
||||
.get_one::<String>("lsn")
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
.transpose()
|
||||
.context("Failed to parse Lsn from the request")?;
|
||||
let pg_version = sub_args
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to `pg-version` from the argument string")?;
|
||||
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
(None, false) => ComputeMode::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
// when used with custom port this results in non obvious behaviour
|
||||
// port is remembered from first start command, i e
|
||||
// start --port X
|
||||
// stop
|
||||
// start <-- will also use port X even without explicit port argument
|
||||
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
|
||||
|
||||
let ep = cplane.new_endpoint(
|
||||
endpoint_id,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
pg_port,
|
||||
http_port,
|
||||
pg_version,
|
||||
mode,
|
||||
pageserver_id,
|
||||
)?;
|
||||
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
}
|
||||
"reconfigure" => {
|
||||
let endpoint_id = sub_args
|
||||
@@ -1437,15 +1394,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
.arg(endpoint_id_arg.clone())
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(lsn_arg)
|
||||
.arg(pg_port_arg)
|
||||
.arg(http_port_arg)
|
||||
.arg(endpoint_pageserver_id_arg.clone())
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
)
|
||||
@@ -1458,7 +1407,6 @@ fn cli() -> Command {
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
.arg(endpoint_id_arg)
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(
|
||||
Arg::new("destroy")
|
||||
.help("Also delete data directory (now optional, should be default in future)")
|
||||
|
||||
@@ -125,6 +125,7 @@ impl ComputeControlPlane {
|
||||
let http_port = http_port.unwrap_or_else(|| self.get_port() + 1);
|
||||
let pageserver =
|
||||
PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?);
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
endpoint_id: endpoint_id.to_owned(),
|
||||
pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port),
|
||||
@@ -169,6 +170,30 @@ impl ComputeControlPlane {
|
||||
|
||||
Ok(ep)
|
||||
}
|
||||
|
||||
pub fn check_conflicting_endpoints(
|
||||
&self,
|
||||
mode: ComputeMode,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<()> {
|
||||
if matches!(mode, ComputeMode::Primary) {
|
||||
// this check is not complete, as you could have a concurrent attempt at
|
||||
// creating another primary, both reading the state before checking it here,
|
||||
// but it's better than nothing.
|
||||
let mut duplicates = self.endpoints.iter().filter(|(_k, v)| {
|
||||
v.tenant_id == tenant_id
|
||||
&& v.timeline_id == timeline_id
|
||||
&& v.mode == mode
|
||||
&& v.status() != "stopped"
|
||||
});
|
||||
|
||||
if let Some((key, _)) = duplicates.next() {
|
||||
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -1414,34 +1414,19 @@ class NeonCli(AbstractNeonCli):
|
||||
def endpoint_start(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
pg_port: int,
|
||||
http_port: int,
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
branch_name: Optional[str] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
"start",
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
]
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if lsn is not None:
|
||||
args.append(f"--lsn={lsn}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
args.extend(["--http-port", str(http_port)])
|
||||
|
||||
if safekeepers is not None:
|
||||
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
|
||||
if branch_name is not None:
|
||||
args.extend(["--branch-name", branch_name])
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
if pageserver_id is not None:
|
||||
@@ -1468,15 +1453,12 @@ class NeonCli(AbstractNeonCli):
|
||||
def endpoint_stop(
|
||||
self,
|
||||
endpoint_id: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
"stop",
|
||||
"--tenant-id",
|
||||
str(tenant_id or self.env.initial_tenant),
|
||||
]
|
||||
if destroy:
|
||||
args.append("--destroy")
|
||||
@@ -2507,9 +2489,6 @@ class Endpoint(PgProtocol):
|
||||
|
||||
self.env.neon_cli.endpoint_start(
|
||||
self.endpoint_id,
|
||||
pg_port=self.pg_port,
|
||||
http_port=self.http_port,
|
||||
tenant_id=self.tenant_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
pageserver_id=pageserver_id,
|
||||
@@ -2589,7 +2568,7 @@ class Endpoint(PgProtocol):
|
||||
if self.running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result
|
||||
self.endpoint_id, check_return_code=self.check_stop_result
|
||||
)
|
||||
self.running = False
|
||||
|
||||
@@ -2603,7 +2582,7 @@ class Endpoint(PgProtocol):
|
||||
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result
|
||||
)
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
|
||||
@@ -434,8 +434,11 @@ def check_neon_works(
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port)
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
|
||||
cli_current.endpoint_create(
|
||||
branch_name="main", pg_port=pg_port, http_port=http_port, endpoint_id="ep-main"
|
||||
)
|
||||
cli_current.endpoint_start("ep-main")
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("ep-main"))
|
||||
|
||||
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
|
||||
pg_bin.run_capture(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
|
||||
@@ -11,19 +12,50 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
|
||||
env.neon_cli.start()
|
||||
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
|
||||
|
||||
main_branch_name = "main"
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_start(
|
||||
endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port
|
||||
env.neon_cli.endpoint_create(
|
||||
main_branch_name, pg_port, http_port, endpoint_id="ep-basic-main"
|
||||
)
|
||||
env.neon_cli.endpoint_start("ep-basic-main")
|
||||
|
||||
branch_name = "migration-check"
|
||||
|
||||
env.neon_cli.create_branch(new_branch_name=branch_name)
|
||||
env.neon_cli.create_branch(branch_name)
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_start(
|
||||
f"ep-{branch_name}", pg_port, http_port, branch_name=branch_name
|
||||
env.neon_cli.endpoint_create(
|
||||
branch_name, pg_port, http_port, endpoint_id=f"ep-{branch_name}"
|
||||
)
|
||||
env.neon_cli.endpoint_start(f"ep-{branch_name}")
|
||||
finally:
|
||||
env.neon_cli.stop()
|
||||
|
||||
|
||||
def test_neon_two_primary_endpoints_fail(
|
||||
neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor
|
||||
):
|
||||
"""
|
||||
Two primary endpoints with same tenant and timeline will not run together
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
branch_name = "main"
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
env.neon_cli.endpoint_create(branch_name, pg_port, http_port, "ep1")
|
||||
|
||||
pg_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
# ep1 is not running so create will succeed
|
||||
env.neon_cli.endpoint_create(branch_name, pg_port, http_port, "ep2")
|
||||
|
||||
env.neon_cli.endpoint_start("ep1")
|
||||
|
||||
expected_message = f'attempting to create a duplicate primary endpoint on tenant {env.initial_tenant}, timeline {env.initial_timeline}: endpoint "ep1" exists already. please don\'t do this, it is not supported.'
|
||||
with pytest.raises(RuntimeError):
|
||||
assert expected_message in env.neon_cli.endpoint_start("ep2").stderr
|
||||
|
||||
env.neon_cli.endpoint_stop("ep1")
|
||||
# ep1 is stopped so create ep2 will succeed
|
||||
env.neon_cli.endpoint_start("ep2")
|
||||
|
||||
Reference in New Issue
Block a user