Bring back check for interferring processes, add more comments and

descriptive errors
This commit is contained in:
Dmitry Rodionov
2021-09-14 13:47:19 +03:00
committed by Dmitry
parent 4ebe643d0c
commit 9563336d9a
3 changed files with 48 additions and 18 deletions

View File

@@ -22,7 +22,6 @@ use crate::read_pidfile;
use pageserver::branches::BranchInfo;
use zenith_utils::connstring::connection_address;
#[derive(Error, Debug)]
pub enum PageserverHttpError {
#[error("Reqwest error")]

View File

@@ -3,7 +3,7 @@ from contextlib import closing
from typing import Iterator
from uuid import uuid4
import psycopg2
from fixtures.zenith_fixtures import Postgres, ZenithCli, ZenithPageserver, PgBin
from fixtures.zenith_fixtures import PortDistributor, Postgres, ZenithCli, ZenithPageserver, PgBin
import pytest
@@ -47,7 +47,7 @@ def test_compute_auth_to_pageserver(
repo_dir: str,
with_wal_acceptors: bool,
pg_bin: PgBin,
port_distributor: Iterator[int],
port_distributor: PortDistributor,
):
ps = pageserver_auth_enabled
# since we are in progress of refactoring protocols between compute safekeeper and page server
@@ -64,7 +64,7 @@ def test_compute_auth_to_pageserver(
repo_dir=repo_dir,
pg_bin=pg_bin,
tenant_id=ps.initial_tenant,
port=next(port_distributor),
port=port_distributor.get_port(),
).create_start(
branch,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,

View File

@@ -49,8 +49,27 @@ Fn = TypeVar('Fn', bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = 'test_output'
DEFAULT_POSTGRES_DIR = 'tmp_install'
DEFAULT_PAGESERVER_PG_PORT = 64000
BASE_PORT = 55000
WORKER_PORT_NUM = 100
def pytest_configure(config):
"""
Ensure that no unwanted daemons are running before we start testing.
Check that we do not owerflow available ports range.
"""
numprocesses = config.getoption('numprocesses')
if numprocesses is not None and BASE_PORT + numprocesses * WORKER_PORT_NUM > 65536:
raise Exception('Too many workers configured. Cannot distrubute ports for services.')
# does not use -c as it is not supported on macOS
cmd = ['pgrep', 'pageserver|postgres|wal_acceptor']
result = subprocess.run(cmd, stdout=subprocess.DEVNULL)
if result.returncode == 0:
# returncode of 0 means it found something.
# This is bad; we don't want any of those processes polluting the
# result of the test.
# NOTE this shows as an internal pytest error, there might be a better way
raise Exception('found interfering processes running')
def determine_scope(fixture_name: str, config: Any) -> str:
@@ -265,6 +284,9 @@ class AuthKeys:
@zenfixture
def worker_seq_no(worker_id: str):
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
# parse it to always get a number
if worker_id == 'master':
return 0
assert worker_id.startswith('gw')
@@ -277,11 +299,20 @@ def worker_base_port(worker_seq_no: int):
# so workers have disjoint set of ports for services
return BASE_PORT + worker_seq_no * 100
class PortDistributor:
def __init__(self, base_port: int, port_number: int) -> None:
self.iterator = iter(range(base_port, base_port + port_number))
def get_port(self) -> int:
try:
return next(self.iterator)
except StopIteration:
raise RuntimeError('port range configured for test is exhausted, consider enlarging the range')
@zenfixture
def port_distributor(worker_base_port):
yield iter(range(worker_base_port, worker_base_port + 100))
return PortDistributor(base_port=worker_base_port, port_number=100)
@dataclass
class PageserverPort:
@@ -362,9 +393,9 @@ class ZenithPageserver(PgProtocol):
@zenfixture
def pageserver_port(port_distributor: Iterator[int]) -> PageserverPort:
pg = next(port_distributor)
http = next(port_distributor)
def pageserver_port(port_distributor: PortDistributor) -> PageserverPort:
pg = port_distributor.get_port()
http = port_distributor.get_port()
print(f"pageserver_port: pg={pg} http={http}")
return PageserverPort(pg=pg, http=http)
@@ -626,7 +657,7 @@ class Postgres(PgProtocol):
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: Iterator[int]):
def __init__(self, zenith_cli: ZenithCli, repo_dir: str, pg_bin: PgBin, initial_tenant: str, port_distributor: PortDistributor):
self.zenith_cli = zenith_cli
self.repo_dir = repo_dir
self.num_instances = 0
@@ -647,7 +678,7 @@ class PostgresFactory:
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=next(self.port_distributor),
port=self.port_distributor.get_port(),
)
self.num_instances += 1
self.instances.append(pg)
@@ -671,7 +702,7 @@ class PostgresFactory:
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=next(self.port_distributor),
port=self.port_distributor.get_port(),
)
self.num_instances += 1
@@ -696,7 +727,7 @@ class PostgresFactory:
repo_dir=self.repo_dir,
pg_bin=self.pg_bin,
tenant_id=tenant_id or self.initial_tenant,
port=next(self.port_distributor),
port=self.port_distributor.get_port(),
)
self.num_instances += 1
@@ -720,7 +751,7 @@ def initial_tenant(pageserver: ZenithPageserver):
@zenfixture
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: Iterator[int]) -> Iterator[PostgresFactory]:
def postgres(zenith_cli: ZenithCli, initial_tenant: str, repo_dir: str, pg_bin: PgBin, port_distributor: PortDistributor) -> Iterator[PostgresFactory]:
pgfactory = PostgresFactory(
zenith_cli=zenith_cli,
repo_dir=repo_dir,
@@ -808,7 +839,7 @@ class WalAcceptor:
class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: Iterator[int]):
def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor):
self.wa_bin_path = zenith_binpath / 'wal_acceptor'
self.data_dir = data_dir
self.instances: List[WalAcceptor] = []
@@ -823,7 +854,7 @@ class WalAcceptorFactory:
wa = WalAcceptor(
wa_bin_path=self.wa_bin_path,
data_dir=self.data_dir / "wal_acceptor_{}".format(wa_num),
port=next(self.port_distributor),
port=self.port_distributor.get_port(),
num=wa_num,
pageserver_port=self.pageserver_port,
auth_token=auth_token,
@@ -851,7 +882,7 @@ class WalAcceptorFactory:
@zenfixture
def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: Iterator[int]) -> Iterator[WalAcceptorFactory]:
def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPort, port_distributor: PortDistributor) -> Iterator[WalAcceptorFactory]:
""" Gives WalAcceptorFactory providing wal acceptors. """
wafactory = WalAcceptorFactory(
zenith_binpath=Path(zenith_binpath),