From 9563336d9aed758ad78139d4ee9c2f24301b4aa8 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Tue, 14 Sep 2021 13:47:19 +0300 Subject: [PATCH] Bring back check for interferring processes, add more comments and descriptive errors --- control_plane/src/storage.rs | 1 - test_runner/batch_others/test_auth.py | 6 +-- test_runner/fixtures/zenith_fixtures.py | 59 +++++++++++++++++++------ 3 files changed, 48 insertions(+), 18 deletions(-) diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index d826836ea1..e938265b83 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -22,7 +22,6 @@ use crate::read_pidfile; use pageserver::branches::BranchInfo; use zenith_utils::connstring::connection_address; - #[derive(Error, Debug)] pub enum PageserverHttpError { #[error("Reqwest error")] diff --git a/test_runner/batch_others/test_auth.py b/test_runner/batch_others/test_auth.py index acde4a4594..614883d4b8 100644 --- a/test_runner/batch_others/test_auth.py +++ b/test_runner/batch_others/test_auth.py @@ -3,7 +3,7 @@ from contextlib import closing from typing import Iterator from uuid import uuid4 import psycopg2 -from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver, PgBin +from fixtures.zenith_fixtures import PortDistributor, Postgres, ZenithCli, ZenithPageserver, PgBin import pytest @@ -47,7 +47,7 @@ def test_compute_auth_to_pageserver( repo_dir: str, with_wal_acceptors: bool, pg_bin: PgBin, - port_distributor: Iterator[int], + port_distributor: PortDistributor, ): ps = pageserver_auth_enabled # since we are in progress of refactoring protocols between compute safekeeper and page server @@ -64,7 +64,7 @@ def test_compute_auth_to_pageserver( repo_dir=repo_dir, pg_bin=pg_bin, tenant_id=ps.initial_tenant, - port=next(port_distributor), + port=port_distributor.get_port(), ).create_start( branch, wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None, diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index a1ed11200a..82312a992b 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -49,8 +49,27 @@ Fn = TypeVar('Fn', bound=Callable[..., Any]) DEFAULT_OUTPUT_DIR = 'test_output' DEFAULT_POSTGRES_DIR = 'tmp_install' -DEFAULT_PAGESERVER_PG_PORT = 64000 BASE_PORT = 55000 +WORKER_PORT_NUM = 100 + +def pytest_configure(config): + """ + Ensure that no unwanted daemons are running before we start testing. + Check that we do not owerflow available ports range. + """ + numprocesses = config.getoption('numprocesses') + if numprocesses is not None and BASE_PORT + numprocesses * WORKER_PORT_NUM > 65536: + raise Exception('Too many workers configured. Cannot distrubute ports for services.') + + # does not use -c as it is not supported on macOS + cmd = ['pgrep', 'pageserver|postgres|wal_acceptor'] + result = subprocess.run(cmd, stdout=subprocess.DEVNULL) + if result.returncode == 0: + # returncode of 0 means it found something. + # This is bad; we don't want any of those processes polluting the + # result of the test. + # NOTE this shows as an internal pytest error, there might be a better way + raise Exception('found interfering processes running') def determine_scope(fixture_name: str, config: Any) -> str: @@ -265,6 +284,9 @@ class AuthKeys: @zenfixture def worker_seq_no(worker_id: str): + # worker_id is a pytest-xdist fixture + # it can be master or gw + # parse it to always get a number if worker_id == 'master': return 0 assert worker_id.startswith('gw') @@ -277,11 +299,20 @@ def worker_base_port(worker_seq_no: int): # so workers have disjoint set of ports for services return BASE_PORT + worker_seq_no * 100 +class PortDistributor: + def __init__(self, base_port: int, port_number: int) -> None: + self.iterator = iter(range(base_port, base_port + port_number)) + + def get_port(self) -> int: + try: + return next(self.iterator) + except StopIteration: + raise RuntimeError('port range configured for test is exhausted, consider enlarging the range') + @zenfixture def port_distributor(worker_base_port): - yield iter(range(worker_base_port, worker_base_port + 100)) - + return PortDistributor(base_port=worker_base_port, port_number=100) @dataclass class PageserverPort: @@ -362,9 +393,9 @@ class ZenithPageserver(PgProtocol): @zenfixture -def pageserver_port(port_distributor: Iterator[int]) -> PageserverPort: - pg = next(port_distributor) - http = next(port_distributor) +def pageserver_port(port_distributor: PortDistributor) -> PageserverPort: + pg = port_distributor.get_port() + http = port_distributor.get_port() print(f"pageserver_port: pg={pg} http={http}") return PageserverPort(pg=pg, http=http) @@ -626,7 +657,7 @@ class Postgres(PgProtocol): class PostgresFactory: """ An object representing multiple running postgres daemons. """ - def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: Iterator[int]): + def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: PortDistributor): self.zenith_cli = zenith_cli self.repo_dir = repo_dir self.num_instances = 0 @@ -647,7 +678,7 @@ class PostgresFactory: repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=next(self.port_distributor), + port=self.port_distributor.get_port(), ) self.num_instances += 1 self.instances.append(pg) @@ -671,7 +702,7 @@ class PostgresFactory: repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=next(self.port_distributor), + port=self.port_distributor.get_port(), ) self.num_instances += 1 @@ -696,7 +727,7 @@ class PostgresFactory: repo_dir=self.repo_dir, pg_bin=self.pg_bin, tenant_id=tenant_id or self.initial_tenant, - port=next(self.port_distributor), + port=self.port_distributor.get_port(), ) self.num_instances += 1 @@ -720,7 +751,7 @@ def initial_tenant(pageserver: ZenithPageserver): @zenfixture -def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: Iterator[int]) -> Iterator[PostgresFactory]: +def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: PortDistributor) -> Iterator[PostgresFactory]: pgfactory = PostgresFactory( zenith_cli=zenith_cli, repo_dir=repo_dir, @@ -808,7 +839,7 @@ class WalAcceptor: class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ - def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: Iterator[int]): + def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor): self.wa_bin_path = zenith_binpath / 'wal_acceptor' self.data_dir = data_dir self.instances: List[WalAcceptor] = [] @@ -823,7 +854,7 @@ class WalAcceptorFactory: wa = WalAcceptor( wa_bin_path=self.wa_bin_path, data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num), - port=next(self.port_distributor), + port=self.port_distributor.get_port(), num=wa_num, pageserver_port=self.pageserver_port, auth_token=auth_token, @@ -851,7 +882,7 @@ class WalAcceptorFactory: @zenfixture -def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: Iterator[int]) -> Iterator[WalAcceptorFactory]: +def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: PortDistributor) -> Iterator[WalAcceptorFactory]: """ Gives WalAcceptorFactory providing wal acceptors. """ wafactory = WalAcceptorFactory( zenith_binpath=Path(zenith_binpath),