From 135e7e4306978d79a133086abb81707d8d9fb8a9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 18 Sep 2024 09:10:27 +0200 Subject: [PATCH] add `neon_local` subcommand for the broker & use that from regression tests (#8948) There's currently no way to just start/stop broker from `neon_local`. This PR * adds a sub-command * uses that sub-command from the test suite instead of the pre-existing Python `subprocess` based approach. Found this useful during investigation https://github.com/neondatabase/cloud/issues/16886. --- control_plane/src/bin/neon_local.rs | 40 ++++++++++ test_runner/fixtures/broker.py | 63 --------------- test_runner/fixtures/neon_fixtures.py | 76 +++++++++++++------ test_runner/regress/test_neon_cli.py | 2 + .../regress/test_pageserver_generations.py | 2 +- .../regress/test_storage_controller.py | 6 +- test_runner/regress/test_wal_acceptor.py | 9 +-- 7 files changed, 99 insertions(+), 99 deletions(-) delete mode 100644 test_runner/fixtures/broker.py diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 144cd647c9..1c94c42865 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -106,6 +106,7 @@ fn main() -> Result<()> { "stop" => rt.block_on(handle_stop_all(sub_args, &env)), "pageserver" => rt.block_on(handle_pageserver(sub_args, &env)), "storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)), + "storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)), "safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)), "endpoint" => rt.block_on(handle_endpoint(sub_args, &env)), "mappings" => handle_mappings(sub_args, &mut env), @@ -1245,6 +1246,32 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Ok(()) } +async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let (sub_name, sub_args) = match sub_match.subcommand() { + Some(broker_command_data) => broker_command_data, + None => bail!("no broker subcommand provided"), + }; + + match sub_name { + "start" => { + if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await { + eprintln!("broker start failed: {e}"); + exit(1); + } + } + + "stop" => { + if let Err(e) = broker::stop_broker_process(env) { + eprintln!("broker stop failed: {e}"); + exit(1); + } + } + + _ => bail!("Unexpected broker subcommand '{}'", sub_name), + } + Ok(()) +} + async fn handle_start_all( env: &local_env::LocalEnv, retry_timeout: &Duration, @@ -1672,6 +1699,19 @@ fn cli() -> Command { .arg(stop_mode_arg.clone()) .arg(instance_id)) ) + .subcommand( + Command::new("storage_broker") + .arg_required_else_help(true) + .about("Manage broker") + .subcommand(Command::new("start") + .about("Start broker") + .arg(timeout_arg.clone()) + ) + .subcommand(Command::new("stop") + .about("Stop broker") + .arg(stop_mode_arg.clone()) + ) + ) .subcommand( Command::new("safekeeper") .arg_required_else_help(true) diff --git a/test_runner/fixtures/broker.py b/test_runner/fixtures/broker.py deleted file mode 100644 index 8aca90a097..0000000000 --- a/test_runner/fixtures/broker.py +++ /dev/null @@ -1,63 +0,0 @@ -import subprocess -import time -from dataclasses import dataclass -from pathlib import Path -from typing import Any, Optional - -from fixtures.log_helper import log - - -@dataclass -class NeonBroker: - """An object managing storage_broker instance""" - - logfile: Path - port: int - neon_binpath: Path - handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon - - def listen_addr(self): - return f"127.0.0.1:{self.port}" - - def client_url(self): - return f"http://{self.listen_addr()}" - - def check_status(self): - return True # TODO - - def try_start(self): - if self.handle is not None: - log.debug(f"storage_broker is already running on port {self.port}") - return - - listen_addr = self.listen_addr() - log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"') - with open(self.logfile, "wb") as logfile: - args = [ - str(self.neon_binpath / "storage_broker"), - f"--listen-addr={listen_addr}", - ] - self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile) - - # wait for start - started_at = time.time() - while True: - try: - self.check_status() - except Exception as e: - elapsed = time.time() - started_at - if elapsed > 5: - raise RuntimeError( - f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}" - ) from e - time.sleep(0.5) - else: - break # success - - def stop(self, immediate: bool = False): - if self.handle is not None: - if immediate: - self.handle.kill() - else: - self.handle.terminate() - self.handle.wait() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 69284bfdfc..cbbb162cc6 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -60,7 +60,6 @@ from urllib3.util.retry import Retry from fixtures import overlayfs from fixtures.auth_tokens import AuthKeys, TokenScope -from fixtures.broker import NeonBroker from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId from fixtures.endpoint.http import EndpointHttpClient from fixtures.log_helper import log @@ -230,21 +229,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib return PortDistributor(base_port=worker_base_port, port_number=worker_port_num) -@pytest.fixture(scope="function") -def default_broker( - port_distributor: PortDistributor, - test_output_dir: Path, - neon_binpath: Path, -) -> Iterator[NeonBroker]: - # multiple pytest sessions could get launched in parallel, get them different ports/datadirs - client_port = port_distributor.get_port() - broker_logfile = test_output_dir / "repo" / "storage_broker.log" - - broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath) - yield broker - broker.stop() - - @pytest.fixture(scope="session") def run_id() -> Iterator[uuid.UUID]: yield uuid.uuid4() @@ -387,7 +371,6 @@ class NeonEnvBuilder: self, repo_dir: Path, port_distributor: PortDistributor, - broker: NeonBroker, run_id: uuid.UUID, mock_s3_server: MockS3Server, neon_binpath: Path, @@ -428,7 +411,6 @@ class NeonEnvBuilder: # Safekeepers remote storage self.safekeepers_remote_storage: Optional[RemoteStorage] = None - self.broker = broker self.run_id = run_id self.mock_s3_server: MockS3Server = mock_s3_server self.pageserver_config_override = pageserver_config_override @@ -940,6 +922,8 @@ class NeonEnvBuilder: self.env.storage_controller.assert_no_errors() + self.env.broker.assert_no_errors() + try: self.overlay_cleanup_teardown() except Exception as e: @@ -993,7 +977,7 @@ class NeonEnv: self.endpoints = EndpointFactory(self) self.safekeepers: List[Safekeeper] = [] self.pageservers: List[NeonPageserver] = [] - self.broker = config.broker + self.broker = NeonBroker(self) self.pageserver_remote_storage = config.pageserver_remote_storage self.safekeepers_remote_storage = config.safekeepers_remote_storage self.pg_version = config.pg_version @@ -1168,7 +1152,7 @@ class NeonEnv: max_workers=2 + len(self.pageservers) + len(self.safekeepers) ) as executor: futs.append( - executor.submit(lambda: self.broker.try_start() or None) + executor.submit(lambda: self.broker.start() or None) ) # The `or None` is for the linter for pageserver in self.pageservers: @@ -1225,7 +1209,7 @@ class NeonEnv: pageserver.stop(immediate=immediate) except RuntimeError: stop_later.append(pageserver) - self.broker.stop(immediate=immediate) + self.broker.stop() # TODO: for nice logging we need python 3.11 ExceptionGroup for ps in stop_later: @@ -1339,7 +1323,6 @@ def neon_simple_env( pytestconfig: Config, port_distributor: PortDistributor, mock_s3_server: MockS3Server, - default_broker: NeonBroker, run_id: uuid.UUID, top_output_dir: Path, test_output_dir: Path, @@ -1364,7 +1347,6 @@ def neon_simple_env( top_output_dir=top_output_dir, repo_dir=repo_dir, port_distributor=port_distributor, - broker=default_broker, mock_s3_server=mock_s3_server, neon_binpath=neon_binpath, pg_distrib_dir=pg_distrib_dir, @@ -1392,7 +1374,6 @@ def neon_env_builder( neon_binpath: Path, pg_distrib_dir: Path, pg_version: PgVersion, - default_broker: NeonBroker, run_id: uuid.UUID, request: FixtureRequest, test_overlay_dir: Path, @@ -1428,7 +1409,6 @@ def neon_env_builder( neon_binpath=neon_binpath, pg_distrib_dir=pg_distrib_dir, pg_version=pg_version, - broker=default_broker, run_id=run_id, preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")), pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, @@ -1850,6 +1830,18 @@ class NeonCli(AbstractNeonCli): args.extend(["-m", "immediate"]) return self.raw_cli(args) + def broker_start( + self, timeout_in_seconds: Optional[int] = None + ) -> "subprocess.CompletedProcess[str]": + cmd = ["storage_broker", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli(cmd) + + def broker_stop(self) -> "subprocess.CompletedProcess[str]": + cmd = ["storage_broker", "stop"] + return self.raw_cli(cmd) + def endpoint_create( self, branch_name: str, @@ -4573,6 +4565,40 @@ class Safekeeper(LogUtils): wait_until(20, 0.5, paused) +class NeonBroker(LogUtils): + """An object managing storage_broker instance""" + + def __init__(self, env: NeonEnv): + super().__init__(logfile=env.repo_dir / "storage_broker.log") + self.env = env + self.port: int = self.env.port_distributor.get_port() + self.running = False + + def start( + self, + timeout_in_seconds: Optional[int] = None, + ): + assert not self.running + self.env.neon_cli.broker_start(timeout_in_seconds) + self.running = True + return self + + def stop(self): + if self.running: + self.env.neon_cli.broker_stop() + self.running = False + return self + + def listen_addr(self): + return f"127.0.0.1:{self.port}" + + def client_url(self): + return f"http://{self.listen_addr()}" + + def assert_no_errors(self): + assert_no_errors(self.logfile, "storage_controller", []) + + # TODO: Replace with `StrEnum` when we upgrade to python 3.11 class NodeKind(str, Enum): PAGESERVER = "pageserver" diff --git a/test_runner/regress/test_neon_cli.py b/test_runner/regress/test_neon_cli.py index ba170cfb4c..b65430ff49 100644 --- a/test_runner/regress/test_neon_cli.py +++ b/test_runner/regress/test_neon_cli.py @@ -134,6 +134,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder): env.neon_cli.pageserver_stop(env.pageserver.id) env.neon_cli.safekeeper_stop() env.neon_cli.storage_controller_stop(False) + env.neon_cli.broker_stop() # Keep NeonEnv state up to date, it usually owns starting/stopping services env.pageserver.running = False @@ -176,6 +177,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder): # Stop this to get out of the way of the following `start` env.neon_cli.storage_controller_stop(False) + env.neon_cli.broker_stop() # Default start res = env.neon_cli.raw_cli(["start"]) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index ebf58d2bd1..c923713432 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -134,7 +134,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder): ) env = neon_env_builder.init_configs() - env.broker.try_start() + env.broker.start() for sk in env.safekeepers: sk.start() env.storage_controller.start() diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 2d72dbb2df..dc90a6e9a0 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -69,7 +69,7 @@ def test_storage_controller_smoke( env = neon_env_builder.init_configs() # Start services by hand so that we can skip a pageserver (this will start + register later) - env.broker.try_start() + env.broker.start() env.storage_controller.start() env.pageservers[0].start() env.pageservers[1].start() @@ -292,7 +292,7 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up # Start services by hand so that we can skip registration on one of the pageservers env = neon_env_builder.init_configs() - env.broker.try_start() + env.broker.start() env.storage_controller.start() # This is the pageserver where we'll initially create the tenant. Run it in emergency @@ -2126,7 +2126,7 @@ def start_env(env: NeonEnv, storage_controller_port: int): max_workers=2 + len(env.pageservers) + len(env.safekeepers) ) as executor: futs.append( - executor.submit(lambda: env.broker.try_start() or None) + executor.submit(lambda: env.broker.start() or None) ) # The `or None` is for the linter for pageserver in env.pageservers: diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 50fac441c0..4bf8cfe88f 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -19,7 +19,6 @@ import psycopg2.errors import psycopg2.extras import pytest import requests -from fixtures.broker import NeonBroker from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import log from fixtures.metrics import parse_metrics @@ -1439,11 +1438,7 @@ class SafekeeperEnv: ): self.repo_dir = repo_dir self.port_distributor = port_distributor - self.broker = NeonBroker( - logfile=Path(self.repo_dir) / "storage_broker.log", - port=self.port_distributor.get_port(), - neon_binpath=neon_binpath, - ) + self.fake_broker_endpoint = f"http://127.0.0.1:{port_distributor.get_port()}" self.pg_bin = pg_bin self.num_safekeepers = num_safekeepers self.bin_safekeeper = str(neon_binpath / "safekeeper") @@ -1492,7 +1487,7 @@ class SafekeeperEnv: "--id", str(i), "--broker-endpoint", - self.broker.client_url(), + self.fake_broker_endpoint, ] log.info(f'Running command "{" ".join(cmd)}"')