add neon_local subcommand for the broker & use that from regression tests (#8948)

There's currently no way to just start/stop broker from `neon_local`.

This PR
* adds a sub-command
* uses that sub-command from the test suite instead of the pre-existing
Python `subprocess` based approach.

Found this useful during investigation
https://github.com/neondatabase/cloud/issues/16886.
This commit is contained in:
Christian Schwarz
2024-09-18 09:10:27 +02:00
committed by GitHub
parent 3cd2a3f931
commit 135e7e4306
7 changed files with 99 additions and 99 deletions

View File

@@ -106,6 +106,7 @@ fn main() -> Result<()> {
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)),
"storage_broker" => rt.block_on(handle_storage_broker(sub_args, &env)),
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
"mappings" => handle_mappings(sub_args, &mut env),
@@ -1245,6 +1246,32 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_storage_broker(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(broker_command_data) => broker_command_data,
None => bail!("no broker subcommand provided"),
};
match sub_name {
"start" => {
if let Err(e) = broker::start_broker_process(env, get_start_timeout(sub_args)).await {
eprintln!("broker start failed: {e}");
exit(1);
}
}
"stop" => {
if let Err(e) = broker::stop_broker_process(env) {
eprintln!("broker stop failed: {e}");
exit(1);
}
}
_ => bail!("Unexpected broker subcommand '{}'", sub_name),
}
Ok(())
}
async fn handle_start_all(
env: &local_env::LocalEnv,
retry_timeout: &Duration,
@@ -1672,6 +1699,19 @@ fn cli() -> Command {
.arg(stop_mode_arg.clone())
.arg(instance_id))
)
.subcommand(
Command::new("storage_broker")
.arg_required_else_help(true)
.about("Manage broker")
.subcommand(Command::new("start")
.about("Start broker")
.arg(timeout_arg.clone())
)
.subcommand(Command::new("stop")
.about("Stop broker")
.arg(stop_mode_arg.clone())
)
)
.subcommand(
Command::new("safekeeper")
.arg_required_else_help(true)

View File

@@ -1,63 +0,0 @@
import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from fixtures.log_helper import log
@dataclass
class NeonBroker:
"""An object managing storage_broker instance"""
logfile: Path
port: int
neon_binpath: Path
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
def check_status(self):
return True # TODO
def try_start(self):
if self.handle is not None:
log.debug(f"storage_broker is already running on port {self.port}")
return
listen_addr = self.listen_addr()
log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"')
with open(self.logfile, "wb") as logfile:
args = [
str(self.neon_binpath / "storage_broker"),
f"--listen-addr={listen_addr}",
]
self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile)
# wait for start
started_at = time.time()
while True:
try:
self.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
) from e
time.sleep(0.5)
else:
break # success
def stop(self, immediate: bool = False):
if self.handle is not None:
if immediate:
self.handle.kill()
else:
self.handle.terminate()
self.handle.wait()

View File

@@ -60,7 +60,6 @@ from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.auth_tokens import AuthKeys, TokenScope
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, NodeId, TenantId, TenantShardId, TimelineId
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.log_helper import log
@@ -230,21 +229,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
test_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
broker_logfile = test_output_dir / "repo" / "storage_broker.log"
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
@pytest.fixture(scope="session")
def run_id() -> Iterator[uuid.UUID]:
yield uuid.uuid4()
@@ -387,7 +371,6 @@ class NeonEnvBuilder:
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: NeonBroker,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
neon_binpath: Path,
@@ -428,7 +411,6 @@ class NeonEnvBuilder:
# Safekeepers remote storage
self.safekeepers_remote_storage: Optional[RemoteStorage] = None
self.broker = broker
self.run_id = run_id
self.mock_s3_server: MockS3Server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
@@ -940,6 +922,8 @@ class NeonEnvBuilder:
self.env.storage_controller.assert_no_errors()
self.env.broker.assert_no_errors()
try:
self.overlay_cleanup_teardown()
except Exception as e:
@@ -993,7 +977,7 @@ class NeonEnv:
self.endpoints = EndpointFactory(self)
self.safekeepers: List[Safekeeper] = []
self.pageservers: List[NeonPageserver] = []
self.broker = config.broker
self.broker = NeonBroker(self)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
self.pg_version = config.pg_version
@@ -1168,7 +1152,7 @@ class NeonEnv:
max_workers=2 + len(self.pageservers) + len(self.safekeepers)
) as executor:
futs.append(
executor.submit(lambda: self.broker.try_start() or None)
executor.submit(lambda: self.broker.start() or None)
) # The `or None` is for the linter
for pageserver in self.pageservers:
@@ -1225,7 +1209,7 @@ class NeonEnv:
pageserver.stop(immediate=immediate)
except RuntimeError:
stop_later.append(pageserver)
self.broker.stop(immediate=immediate)
self.broker.stop()
# TODO: for nice logging we need python 3.11 ExceptionGroup
for ps in stop_later:
@@ -1339,7 +1323,6 @@ def neon_simple_env(
pytestconfig: Config,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
test_output_dir: Path,
@@ -1364,7 +1347,6 @@ def neon_simple_env(
top_output_dir=top_output_dir,
repo_dir=repo_dir,
port_distributor=port_distributor,
broker=default_broker,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
@@ -1392,7 +1374,6 @@ def neon_env_builder(
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
default_broker: NeonBroker,
run_id: uuid.UUID,
request: FixtureRequest,
test_overlay_dir: Path,
@@ -1428,7 +1409,6 @@ def neon_env_builder(
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
@@ -1850,6 +1830,18 @@ class NeonCli(AbstractNeonCli):
args.extend(["-m", "immediate"])
return self.raw_cli(args)
def broker_start(
self, timeout_in_seconds: Optional[int] = None
) -> "subprocess.CompletedProcess[str]":
cmd = ["storage_broker", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def broker_stop(self) -> "subprocess.CompletedProcess[str]":
cmd = ["storage_broker", "stop"]
return self.raw_cli(cmd)
def endpoint_create(
self,
branch_name: str,
@@ -4573,6 +4565,40 @@ class Safekeeper(LogUtils):
wait_until(20, 0.5, paused)
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""
def __init__(self, env: NeonEnv):
super().__init__(logfile=env.repo_dir / "storage_broker.log")
self.env = env
self.port: int = self.env.port_distributor.get_port()
self.running = False
def start(
self,
timeout_in_seconds: Optional[int] = None,
):
assert not self.running
self.env.neon_cli.broker_start(timeout_in_seconds)
self.running = True
return self
def stop(self):
if self.running:
self.env.neon_cli.broker_stop()
self.running = False
return self
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
def assert_no_errors(self):
assert_no_errors(self.logfile, "storage_controller", [])
# TODO: Replace with `StrEnum` when we upgrade to python 3.11
class NodeKind(str, Enum):
PAGESERVER = "pageserver"

View File

@@ -134,6 +134,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
env.pageserver.running = False
@@ -176,6 +177,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)
env.neon_cli.broker_stop()
# Default start
res = env.neon_cli.raw_cli(["start"])

View File

@@ -134,7 +134,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
)
env = neon_env_builder.init_configs()
env.broker.try_start()
env.broker.start()
for sk in env.safekeepers:
sk.start()
env.storage_controller.start()

View File

@@ -69,7 +69,7 @@ def test_storage_controller_smoke(
env = neon_env_builder.init_configs()
# Start services by hand so that we can skip a pageserver (this will start + register later)
env.broker.try_start()
env.broker.start()
env.storage_controller.start()
env.pageservers[0].start()
env.pageservers[1].start()
@@ -292,7 +292,7 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
# Start services by hand so that we can skip registration on one of the pageservers
env = neon_env_builder.init_configs()
env.broker.try_start()
env.broker.start()
env.storage_controller.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
@@ -2126,7 +2126,7 @@ def start_env(env: NeonEnv, storage_controller_port: int):
max_workers=2 + len(env.pageservers) + len(env.safekeepers)
) as executor:
futs.append(
executor.submit(lambda: env.broker.try_start() or None)
executor.submit(lambda: env.broker.start() or None)
) # The `or None` is for the linter
for pageserver in env.pageservers:

View File

@@ -19,7 +19,6 @@ import psycopg2.errors
import psycopg2.extras
import pytest
import requests
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
@@ -1439,11 +1438,7 @@ class SafekeeperEnv:
):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
self.broker = NeonBroker(
logfile=Path(self.repo_dir) / "storage_broker.log",
port=self.port_distributor.get_port(),
neon_binpath=neon_binpath,
)
self.fake_broker_endpoint = f"http://127.0.0.1:{port_distributor.get_port()}"
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = str(neon_binpath / "safekeeper")
@@ -1492,7 +1487,7 @@ class SafekeeperEnv:
"--id",
str(i),
"--broker-endpoint",
self.broker.client_url(),
self.fake_broker_endpoint,
]
log.info(f'Running command "{" ".join(cmd)}"')