From f2881bbd8a90bc4b04fb1693ad3a684b260a0f98 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Sat, 14 May 2022 15:03:12 +0300 Subject: [PATCH] Start and stop single etcd and mock s3 servers globally in python tests --- .circleci/config.yml | 2 +- control_plane/src/safekeeper.rs | 4 - test_runner/README.md | 1 - .../batch_others/test_tenant_relocation.py | 8 +- test_runner/fixtures/zenith_fixtures.py | 151 ++++++++++-------- 5 files changed, 87 insertions(+), 79 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 85654b5d45..62ae60eb18 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -355,7 +355,7 @@ jobs: when: always command: | du -sh /tmp/test_output/* - find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "safekeeper.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" -delete + find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "safekeeper.log" ! -name "etcd.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" -delete du -sh /tmp/test_output/* - store_artifacts: path: /tmp/test_output diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 407cd05c73..1ac06cb2d2 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -132,10 +132,6 @@ impl SafekeeperNode { .args(&["--listen-pg", &listen_pg]) .args(&["--listen-http", &listen_http]) .args(&["--recall", "1 second"]) - .args(&[ - "--broker-endpoints", - &self.env.etcd_broker.comma_separated_endpoints(), - ]) .arg("--daemonize"), ); if !self.conf.sync { diff --git a/test_runner/README.md b/test_runner/README.md index ee171ae6a0..059bbb83cc 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -51,7 +51,6 @@ Useful environment variables: should go. `TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests. `ZENITH_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as -`FORCE_MOCK_S3`: inits every test's pageserver with a mock S3 used as a remote storage. `--pageserver-config-override=${value}` parameter values when zenith cli is invoked `RUST_LOG`: logging configuration to pass into Zenith CLI diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 85a91b9ce1..0e5dd6eadf 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -3,8 +3,10 @@ import os import pathlib import subprocess import threading +import typing from uuid import UUID from fixtures.log_helper import log +from typing import Optional import signal import pytest @@ -22,7 +24,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, remote_storage_mock_path: pathlib.Path, pg_port: int, http_port: int, - broker: Etcd): + broker: Optional[Etcd]): """ cannot use ZenithPageserver yet because it depends on zenith cli which currently lacks support for multiple pageservers @@ -37,9 +39,11 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path, f"-c pg_distrib_dir='{pg_distrib_dir}'", f"-c id=2", f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}", - f"-c broker_endpoints=['{broker.client_url()}']", ] + if broker is not None: + cmd.append(f"-c broker_endpoints=['{broker.client_url()}']", ) + subprocess.check_output(cmd, text=True) # actually run new pageserver diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 78de78144c..8fca56143e 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -61,7 +61,7 @@ DEFAULT_POSTGRES_DIR = 'tmp_install' DEFAULT_BRANCH_NAME = 'main' BASE_PORT = 15000 -WORKER_PORT_NUM = 100 +WORKER_PORT_NUM = 1000 def pytest_addoption(parser): @@ -178,7 +178,7 @@ def shareable_scope(fixture_name, config) -> Literal["session", "function"]: return 'function' if os.environ.get('TEST_SHARED_FIXTURES') is None else 'session' -@pytest.fixture(scope=shareable_scope) +@pytest.fixture(scope='session') def worker_seq_no(worker_id: str): # worker_id is a pytest-xdist fixture # it can be master or gw @@ -189,7 +189,7 @@ def worker_seq_no(worker_id: str): return int(worker_id[2:]) -@pytest.fixture(scope=shareable_scope) +@pytest.fixture(scope='session') def worker_base_port(worker_seq_no: int): # so we divide ports in ranges of 100 ports # so workers have disjoint set of ports for services @@ -242,11 +242,30 @@ class PortDistributor: 'port range configured for test is exhausted, consider enlarging the range') -@pytest.fixture(scope=shareable_scope) +@pytest.fixture(scope='session') def port_distributor(worker_base_port): return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM) +@pytest.fixture(scope='session') +def default_broker(request: Any, port_distributor: PortDistributor): + client_port = port_distributor.get_port() + # multiple pytest sessions could get launched in parallel, get them different datadirs + etcd_datadir = os.path.join(get_test_output_dir(request), f"etcd_datadir_{client_port}") + pathlib.Path(etcd_datadir).mkdir(exist_ok=True, parents=True) + + broker = Etcd(datadir=etcd_datadir, port=client_port, peer_port=port_distributor.get_port()) + yield broker + broker.stop() + + +@pytest.fixture(scope='session') +def mock_s3_server(port_distributor: PortDistributor): + mock_s3_server = MockS3Server(port_distributor.get_port()) + yield mock_s3_server + mock_s3_server.kill() + + class PgProtocol: """ Reusable connection logic """ def __init__(self, **kwargs): @@ -410,7 +429,9 @@ class ZenithEnvBuilder: def __init__(self, repo_dir: Path, port_distributor: PortDistributor, - pageserver_remote_storage: Optional[RemoteStorage] = None, + broker: Etcd, + mock_s3_server: MockS3Server, + remote_storage: Optional[RemoteStorage] = None, pageserver_config_override: Optional[str] = None, num_safekeepers: int = 1, pageserver_auth_enabled: bool = False, @@ -419,24 +440,15 @@ class ZenithEnvBuilder: self.repo_dir = repo_dir self.rust_log_override = rust_log_override self.port_distributor = port_distributor - self.pageserver_remote_storage = pageserver_remote_storage + self.remote_storage = remote_storage + self.broker = broker + self.mock_s3_server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers self.pageserver_auth_enabled = pageserver_auth_enabled self.default_branch_name = default_branch_name - # keep etcd datadir inside 'repo' - self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"), - port=self.port_distributor.get_port(), - peer_port=self.port_distributor.get_port()) self.env: Optional[ZenithEnv] = None - self.s3_mock_server: Optional[MockS3Server] = None - - if os.getenv('FORCE_MOCK_S3') is not None: - bucket_name = f'{repo_dir.name}_bucket' - log.warning(f'Unconditionally initializing mock S3 server for bucket {bucket_name}') - self.enable_s3_mock_remote_storage(bucket_name) - def init(self) -> ZenithEnv: # Cannot create more than one environment from one builder assert self.env is None, "environment already initialized" @@ -457,9 +469,8 @@ class ZenithEnvBuilder: """ def enable_local_fs_remote_storage(self, force_enable=True): - assert force_enable or self.pageserver_remote_storage is None, "remote storage is enabled already" - self.pageserver_remote_storage = LocalFsStorage( - Path(self.repo_dir / 'local_fs_remote_storage')) + assert force_enable or self.remote_storage is None, "remote storage is enabled already" + self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage')) """ Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. @@ -468,22 +479,19 @@ class ZenithEnvBuilder: """ def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True): - assert force_enable or self.pageserver_remote_storage is None, "remote storage is enabled already" - if not self.s3_mock_server: - self.s3_mock_server = MockS3Server(self.port_distributor.get_port()) - - mock_endpoint = self.s3_mock_server.endpoint() - mock_region = self.s3_mock_server.region() + assert force_enable or self.remote_storage is None, "remote storage is enabled already" + mock_endpoint = self.mock_s3_server.endpoint() + mock_region = self.mock_s3_server.region() boto3.client( 's3', endpoint_url=mock_endpoint, region_name=mock_region, - aws_access_key_id=self.s3_mock_server.access_key(), - aws_secret_access_key=self.s3_mock_server.secret_key(), + aws_access_key_id=self.mock_s3_server.access_key(), + aws_secret_access_key=self.mock_s3_server.secret_key(), ).create_bucket(Bucket=bucket_name) - self.pageserver_remote_storage = S3Storage(bucket=bucket_name, - endpoint=mock_endpoint, - region=mock_region) + self.remote_storage = S3Storage(bucket=bucket_name, + endpoint=mock_endpoint, + region=mock_region) def __enter__(self): return self @@ -497,10 +505,6 @@ class ZenithEnvBuilder: for sk in self.env.safekeepers: sk.stop(immediate=True) self.env.pageserver.stop(immediate=True) - if self.s3_mock_server: - self.s3_mock_server.kill() - if self.env.broker is not None: - self.env.broker.stop() class ZenithEnv: @@ -539,10 +543,12 @@ class ZenithEnv: self.repo_dir = config.repo_dir self.rust_log_override = config.rust_log_override self.port_distributor = config.port_distributor - self.s3_mock_server = config.s3_mock_server + self.s3_mock_server = config.mock_s3_server self.zenith_cli = ZenithCli(env=self) self.postgres = PostgresFactory(self) self.safekeepers: List[Safekeeper] = [] + self.broker = config.broker + self.remote_storage = config.remote_storage # generate initial tenant ID here instead of letting 'zenith init' generate it, # so that we don't need to dig it out of the config file afterwards. @@ -553,7 +559,6 @@ class ZenithEnv: default_tenant_id = '{self.initial_tenant.hex}' """) - self.broker = config.broker toml += textwrap.dedent(f""" [etcd_broker] broker_endpoints = ['{self.broker.client_url()}'] @@ -578,7 +583,6 @@ class ZenithEnv: # Create a corresponding ZenithPageserver object self.pageserver = ZenithPageserver(self, port=pageserver_port, - remote_storage=config.pageserver_remote_storage, config_override=config.pageserver_config_override) # Create config and a Safekeeper object for each safekeeper @@ -602,15 +606,13 @@ class ZenithEnv: self.zenith_cli.init(toml) def start(self): - # Start up the page server, all the safekeepers and the broker + # Start up broker, pageserver and all safekeepers + self.broker.try_start() self.pageserver.start() for safekeeper in self.safekeepers: safekeeper.start() - if self.broker is not None: - self.broker.start() - def get_safekeeper_connstrs(self) -> str: """ Get list of safekeeper endpoints suitable for safekeepers GUC """ return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers]) @@ -623,7 +625,10 @@ class ZenithEnv: @pytest.fixture(scope=shareable_scope) -def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]: +def _shared_simple_env(request: Any, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd) -> Iterator[ZenithEnv]: """ Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES is set, this is shared by all tests using `zenith_simple_env`. @@ -637,7 +642,8 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]: repo_dir = os.path.join(str(top_output_dir), "shared_repo") shutil.rmtree(repo_dir, ignore_errors=True) - with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: + with ZenithEnvBuilder(Path(repo_dir), port_distributor, default_broker, + mock_s3_server) as builder: env = builder.init_start() # For convenience in tests, create a branch from the freshly-initialized cluster. @@ -659,12 +665,13 @@ def zenith_simple_env(_shared_simple_env: ZenithEnv) -> Iterator[ZenithEnv]: yield _shared_simple_env _shared_simple_env.postgres.stop_all() - if _shared_simple_env.s3_mock_server: - _shared_simple_env.s3_mock_server.kill() @pytest.fixture(scope='function') -def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvBuilder]: +def zenith_env_builder(test_output_dir, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd) -> Iterator[ZenithEnvBuilder]: """ Fixture to create a Zenith environment for test. @@ -682,7 +689,8 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB repo_dir = os.path.join(test_output_dir, "repo") # Return the builder to the caller - with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder: + with ZenithEnvBuilder(Path(repo_dir), port_distributor, default_broker, + mock_s3_server) as builder: yield builder @@ -979,9 +987,10 @@ class ZenithCli: cmd = ['init', f'--config={tmp.name}'] if initial_timeline_id: cmd.extend(['--timeline-id', initial_timeline_id.hex]) - append_pageserver_param_overrides(cmd, - self.env.pageserver.remote_storage, - self.env.pageserver.config_override) + append_pageserver_param_overrides( + params_to_update=cmd, + remote_storage=self.env.remote_storage, + pageserver_config_override=self.env.pageserver.config_override) res = self.raw_cli(cmd) res.check_returncode() @@ -1002,9 +1011,10 @@ class ZenithCli: def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]': start_args = ['pageserver', 'start', *overrides] - append_pageserver_param_overrides(start_args, - self.env.pageserver.remote_storage, - self.env.pageserver.config_override) + append_pageserver_param_overrides( + params_to_update=start_args, + remote_storage=self.env.remote_storage, + pageserver_config_override=self.env.pageserver.config_override) s3_env_vars = None if self.env.s3_mock_server: @@ -1174,16 +1184,11 @@ class ZenithPageserver(PgProtocol): Initializes the repository via `zenith init`. """ - def __init__(self, - env: ZenithEnv, - port: PageserverPort, - remote_storage: Optional[RemoteStorage] = None, - config_override: Optional[str] = None): + def __init__(self, env: ZenithEnv, port: PageserverPort, config_override: Optional[str] = None): super().__init__(host='localhost', port=port.pg, user='zenith_admin') self.env = env self.running = False self.service_port = port - self.remote_storage = remote_storage self.config_override = config_override def start(self, overrides=()) -> 'ZenithPageserver': @@ -1223,21 +1228,21 @@ class ZenithPageserver(PgProtocol): def append_pageserver_param_overrides( params_to_update: List[str], - pageserver_remote_storage: Optional[RemoteStorage], + remote_storage: Optional[RemoteStorage], pageserver_config_override: Optional[str] = None, ): - if pageserver_remote_storage is not None: - if isinstance(pageserver_remote_storage, LocalFsStorage): - pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'" - elif isinstance(pageserver_remote_storage, S3Storage): - pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\ - bucket_region='{pageserver_remote_storage.region}'" + if remote_storage is not None: + if isinstance(remote_storage, LocalFsStorage): + pageserver_storage_override = f"local_path='{remote_storage.root}'" + elif isinstance(remote_storage, S3Storage): + pageserver_storage_override = f"bucket_name='{remote_storage.bucket}',\ + bucket_region='{remote_storage.region}'" - if pageserver_remote_storage.endpoint is not None: - pageserver_storage_override += f",endpoint='{pageserver_remote_storage.endpoint}'" + if remote_storage.endpoint is not None: + pageserver_storage_override += f",endpoint='{remote_storage.endpoint}'" else: - raise Exception(f'Unknown storage configuration {pageserver_remote_storage}') + raise Exception(f'Unknown storage configuration {remote_storage}') params_to_update.append( f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}') @@ -1859,7 +1864,11 @@ class Etcd: s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry s.get(f"{self.client_url()}/health").raise_for_status() - def start(self): + def try_start(self): + if self.handle is not None: + log.debug(f'etcd is already running on port {self.port}') + return + pathlib.Path(self.datadir).mkdir(exist_ok=True) if not self.binary_path.is_file():