diff --git a/README.md b/README.md index 75fad605c5..3e3123f5ee 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,9 @@ tenant 9ef87a5bf0d92544f6fafeeb3239695c successfully created on the pageserver Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50 for tenant: 9ef87a5bf0d92544f6fafeeb3239695c Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one +# create postgres compute node +> cargo neon endpoint create main + # start postgres compute node > cargo neon endpoint start main Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ... @@ -185,8 +188,11 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant: (L) main [de200bd42b49cc1814412c7e592dd6e9] (L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601] +# create postgres on that branch +> cargo neon endpoint create migration_check --branch-name migration_check + # start postgres on that branch -> cargo neon endpoint start migration_check --branch-name migration_check +> cargo neon endpoint start migration_check Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ... Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55434/postgres' diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 3053122f6a..f7442c02c7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -608,11 +608,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( }; let mut cplane = ComputeControlPlane::load(env.clone())?; - // All subcommands take an optional --tenant-id option - let tenant_id = get_tenant_id(sub_args, env)?; - match sub_name { "list" => { + let tenant_id = get_tenant_id(sub_args, env)?; let timeline_infos = get_timeline_infos(env, &tenant_id).unwrap_or_else(|e| { eprintln!("Failed to load timeline info: {}", e); HashMap::new() @@ -672,6 +670,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( println!("{table}"); } "create" => { + let tenant_id = get_tenant_id(sub_args, env)?; let branch_name = sub_args .get_one::("branch-name") .map(|s| s.as_str()) @@ -716,6 +715,18 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), }; + match (mode, hot_standby) { + (ComputeMode::Static(_), true) => { + bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") + } + (ComputeMode::Primary, true) => { + bail!("Cannot start a node as a hot standby replica, it is already configured as primary node") + } + _ => {} + } + + cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?; + cplane.new_endpoint( &endpoint_id, tenant_id, @@ -728,8 +739,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( )?; } "start" => { - let pg_port: Option = sub_args.get_one::("pg-port").copied(); - let http_port: Option = sub_args.get_one::("http-port").copied(); let endpoint_id = sub_args .get_one::("endpoint_id") .ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?; @@ -758,80 +767,28 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( env.safekeepers.iter().map(|sk| sk.id).collect() }; - let endpoint = cplane.endpoints.get(endpoint_id.as_str()); + let endpoint = cplane + .endpoints + .get(endpoint_id.as_str()) + .ok_or_else(|| anyhow::anyhow!("endpoint {endpoint_id} not found"))?; + + cplane.check_conflicting_endpoints( + endpoint.mode, + endpoint.tenant_id, + endpoint.timeline_id, + )?; let ps_conf = env.get_pageserver_conf(pageserver_id)?; let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) { - let claims = Claims::new(Some(tenant_id), Scope::Tenant); + let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant); Some(env.generate_auth_token(&claims)?) } else { None }; - let hot_standby = sub_args - .get_one::("hot-standby") - .copied() - .unwrap_or(false); - - if let Some(endpoint) = endpoint { - match (&endpoint.mode, hot_standby) { - (ComputeMode::Static(_), true) => { - bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") - } - (ComputeMode::Primary, true) => { - bail!("Cannot start a node as a hot standby replica, it is already configured as primary node") - } - _ => {} - } - println!("Starting existing endpoint {endpoint_id}..."); - endpoint.start(&auth_token, safekeepers, remote_ext_config)?; - } else { - let branch_name = sub_args - .get_one::("branch-name") - .map(|s| s.as_str()) - .unwrap_or(DEFAULT_BRANCH_NAME); - let timeline_id = env - .get_branch_timeline_id(branch_name, tenant_id) - .ok_or_else(|| { - anyhow!("Found no timeline id for branch name '{branch_name}'") - })?; - let lsn = sub_args - .get_one::("lsn") - .map(|lsn_str| Lsn::from_str(lsn_str)) - .transpose() - .context("Failed to parse Lsn from the request")?; - let pg_version = sub_args - .get_one::("pg-version") - .copied() - .context("Failed to `pg-version` from the argument string")?; - - let mode = match (lsn, hot_standby) { - (Some(lsn), false) => ComputeMode::Static(lsn), - (None, true) => ComputeMode::Replica, - (None, false) => ComputeMode::Primary, - (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), - }; - - // when used with custom port this results in non obvious behaviour - // port is remembered from first start command, i e - // start --port X - // stop - // start <-- will also use port X even without explicit port argument - println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ..."); - - let ep = cplane.new_endpoint( - endpoint_id, - tenant_id, - timeline_id, - pg_port, - http_port, - pg_version, - mode, - pageserver_id, - )?; - ep.start(&auth_token, safekeepers, remote_ext_config)?; - } + println!("Starting existing endpoint {endpoint_id}..."); + endpoint.start(&auth_token, safekeepers, remote_ext_config)?; } "reconfigure" => { let endpoint_id = sub_args @@ -1437,15 +1394,7 @@ fn cli() -> Command { .subcommand(Command::new("start") .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") .arg(endpoint_id_arg.clone()) - .arg(tenant_id_arg.clone()) - .arg(branch_name_arg.clone()) - .arg(timeline_id_arg.clone()) - .arg(lsn_arg) - .arg(pg_port_arg) - .arg(http_port_arg) .arg(endpoint_pageserver_id_arg.clone()) - .arg(pg_version_arg) - .arg(hot_standby_arg) .arg(safekeepers_arg) .arg(remote_ext_config_args) ) @@ -1458,7 +1407,6 @@ fn cli() -> Command { .subcommand( Command::new("stop") .arg(endpoint_id_arg) - .arg(tenant_id_arg.clone()) .arg( Arg::new("destroy") .help("Also delete data directory (now optional, should be default in future)") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index ae45746925..12b1250764 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -125,6 +125,7 @@ impl ComputeControlPlane { let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); let pageserver = PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?); + let ep = Arc::new(Endpoint { endpoint_id: endpoint_id.to_owned(), pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), @@ -169,6 +170,30 @@ impl ComputeControlPlane { Ok(ep) } + + pub fn check_conflicting_endpoints( + &self, + mode: ComputeMode, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result<()> { + if matches!(mode, ComputeMode::Primary) { + // this check is not complete, as you could have a concurrent attempt at + // creating another primary, both reading the state before checking it here, + // but it's better than nothing. + let mut duplicates = self.endpoints.iter().filter(|(_k, v)| { + v.tenant_id == tenant_id + && v.timeline_id == timeline_id + && v.mode == mode + && v.status() != "stopped" + }); + + if let Some((key, _)) = duplicates.next() { + bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."); + } + } + Ok(()) + } } /////////////////////////////////////////////////////////////////////////////// diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f060a63344..214bcc3a53 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1414,34 +1414,19 @@ class NeonCli(AbstractNeonCli): def endpoint_start( self, endpoint_id: str, - pg_port: int, - http_port: int, safekeepers: Optional[List[int]] = None, - tenant_id: Optional[TenantId] = None, - lsn: Optional[Lsn] = None, - branch_name: Optional[str] = None, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", "start", - "--tenant-id", - str(tenant_id or self.env.initial_tenant), - "--pg-version", - self.env.pg_version, ] if remote_ext_config is not None: args.extend(["--remote-ext-config", remote_ext_config]) - if lsn is not None: - args.append(f"--lsn={lsn}") - args.extend(["--pg-port", str(pg_port)]) - args.extend(["--http-port", str(http_port)]) if safekeepers is not None: args.extend(["--safekeepers", (",".join(map(str, safekeepers)))]) - if branch_name is not None: - args.extend(["--branch-name", branch_name]) if endpoint_id is not None: args.append(endpoint_id) if pageserver_id is not None: @@ -1468,15 +1453,12 @@ class NeonCli(AbstractNeonCli): def endpoint_stop( self, endpoint_id: str, - tenant_id: Optional[TenantId] = None, destroy=False, check_return_code=True, ) -> "subprocess.CompletedProcess[str]": args = [ "endpoint", "stop", - "--tenant-id", - str(tenant_id or self.env.initial_tenant), ] if destroy: args.append("--destroy") @@ -2507,9 +2489,6 @@ class Endpoint(PgProtocol): self.env.neon_cli.endpoint_start( self.endpoint_id, - pg_port=self.pg_port, - http_port=self.http_port, - tenant_id=self.tenant_id, safekeepers=self.active_safekeepers, remote_ext_config=remote_ext_config, pageserver_id=pageserver_id, @@ -2589,7 +2568,7 @@ class Endpoint(PgProtocol): if self.running: assert self.endpoint_id is not None self.env.neon_cli.endpoint_stop( - self.endpoint_id, self.tenant_id, check_return_code=self.check_stop_result + self.endpoint_id, check_return_code=self.check_stop_result ) self.running = False @@ -2603,7 +2582,7 @@ class Endpoint(PgProtocol): assert self.endpoint_id is not None self.env.neon_cli.endpoint_stop( - self.endpoint_id, self.tenant_id, True, check_return_code=self.check_stop_result + self.endpoint_id, True, check_return_code=self.check_stop_result ) self.endpoint_id = None self.running = False diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 9a2980280c..f3c6af4427 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -434,8 +434,11 @@ def check_neon_works( pg_port = port_distributor.get_port() http_port = port_distributor.get_port() - cli_current.endpoint_start("main", pg_port=pg_port, http_port=http_port) - request.addfinalizer(lambda: cli_current.endpoint_stop("main")) + cli_current.endpoint_create( + branch_name="main", pg_port=pg_port, http_port=http_port, endpoint_id="ep-main" + ) + cli_current.endpoint_start("ep-main") + request.addfinalizer(lambda: cli_current.endpoint_stop("ep-main")) connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres" pg_bin.run_capture( diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index becdd9ff80..46b72fbca5 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -1,3 +1,4 @@ +import pytest from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.port_distributor import PortDistributor @@ -11,19 +12,50 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por env.neon_cli.start() env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True) + main_branch_name = "main" pg_port = port_distributor.get_port() http_port = port_distributor.get_port() - env.neon_cli.endpoint_start( - endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port + env.neon_cli.endpoint_create( + main_branch_name, pg_port, http_port, endpoint_id="ep-basic-main" ) + env.neon_cli.endpoint_start("ep-basic-main") branch_name = "migration-check" - - env.neon_cli.create_branch(new_branch_name=branch_name) + env.neon_cli.create_branch(branch_name) pg_port = port_distributor.get_port() http_port = port_distributor.get_port() - env.neon_cli.endpoint_start( - f"ep-{branch_name}", pg_port, http_port, branch_name=branch_name + env.neon_cli.endpoint_create( + branch_name, pg_port, http_port, endpoint_id=f"ep-{branch_name}" ) + env.neon_cli.endpoint_start(f"ep-{branch_name}") finally: env.neon_cli.stop() + + +def test_neon_two_primary_endpoints_fail( + neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor +): + """ + Two primary endpoints with same tenant and timeline will not run together + """ + env = neon_env_builder.init_start() + branch_name = "main" + + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() + env.neon_cli.endpoint_create(branch_name, pg_port, http_port, "ep1") + + pg_port = port_distributor.get_port() + http_port = port_distributor.get_port() + # ep1 is not running so create will succeed + env.neon_cli.endpoint_create(branch_name, pg_port, http_port, "ep2") + + env.neon_cli.endpoint_start("ep1") + + expected_message = f'attempting to create a duplicate primary endpoint on tenant {env.initial_tenant}, timeline {env.initial_timeline}: endpoint "ep1" exists already. please don\'t do this, it is not supported.' + with pytest.raises(RuntimeError): + assert expected_message in env.neon_cli.endpoint_start("ep2").stderr + + env.neon_cli.endpoint_stop("ep1") + # ep1 is stopped so create ep2 will succeed + env.neon_cli.endpoint_start("ep2")