Start and stop single etcd and mock s3 servers globally in python tests

This commit is contained in:
Kirill Bulatov
2022-05-14 15:03:12 +03:00
committed by Kirill Bulatov
parent a884f4cf6b
commit f2881bbd8a
5 changed files with 87 additions and 79 deletions

View File

@@ -355,7 +355,7 @@ jobs:
when: always
command: |
du -sh /tmp/test_output/*
find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "safekeeper.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" -delete
find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "safekeeper.log" ! -name "etcd.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" ! -name "flamegraph.svg" -delete
du -sh /tmp/test_output/*
- store_artifacts:
path: /tmp/test_output

View File

@@ -132,10 +132,6 @@ impl SafekeeperNode {
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--recall", "1 second"])
.args(&[
"--broker-endpoints",
&self.env.etcd_broker.comma_separated_endpoints(),
])
.arg("--daemonize"),
);
if !self.conf.sync {

View File

@@ -51,7 +51,6 @@ Useful environment variables:
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`ZENITH_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
`FORCE_MOCK_S3`: inits every test's pageserver with a mock S3 used as a remote storage.
`--pageserver-config-override=${value}` parameter values when zenith cli is invoked
`RUST_LOG`: logging configuration to pass into Zenith CLI

View File

@@ -3,8 +3,10 @@ import os
import pathlib
import subprocess
import threading
import typing
from uuid import UUID
from fixtures.log_helper import log
from typing import Optional
import signal
import pytest
@@ -22,7 +24,7 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int,
broker: Etcd):
broker: Optional[Etcd]):
"""
cannot use ZenithPageserver yet because it depends on zenith cli
which currently lacks support for multiple pageservers
@@ -37,9 +39,11 @@ def new_pageserver_helper(new_pageserver_dir: pathlib.Path,
f"-c pg_distrib_dir='{pg_distrib_dir}'",
f"-c id=2",
f"-c remote_storage={{local_path='{remote_storage_mock_path}'}}",
f"-c broker_endpoints=['{broker.client_url()}']",
]
if broker is not None:
cmd.append(f"-c broker_endpoints=['{broker.client_url()}']", )
subprocess.check_output(cmd, text=True)
# actually run new pageserver

View File

@@ -61,7 +61,7 @@ DEFAULT_POSTGRES_DIR = 'tmp_install'
DEFAULT_BRANCH_NAME = 'main'
BASE_PORT = 15000
WORKER_PORT_NUM = 100
WORKER_PORT_NUM = 1000
def pytest_addoption(parser):
@@ -178,7 +178,7 @@ def shareable_scope(fixture_name, config) -> Literal["session", "function"]:
return 'function' if os.environ.get('TEST_SHARED_FIXTURES') is None else 'session'
@pytest.fixture(scope=shareable_scope)
@pytest.fixture(scope='session')
def worker_seq_no(worker_id: str):
# worker_id is a pytest-xdist fixture
# it can be master or gw<number>
@@ -189,7 +189,7 @@ def worker_seq_no(worker_id: str):
return int(worker_id[2:])
@pytest.fixture(scope=shareable_scope)
@pytest.fixture(scope='session')
def worker_base_port(worker_seq_no: int):
# so we divide ports in ranges of 100 ports
# so workers have disjoint set of ports for services
@@ -242,11 +242,30 @@ class PortDistributor:
'port range configured for test is exhausted, consider enlarging the range')
@pytest.fixture(scope=shareable_scope)
@pytest.fixture(scope='session')
def port_distributor(worker_base_port):
return PortDistributor(base_port=worker_base_port, port_number=WORKER_PORT_NUM)
@pytest.fixture(scope='session')
def default_broker(request: Any, port_distributor: PortDistributor):
client_port = port_distributor.get_port()
# multiple pytest sessions could get launched in parallel, get them different datadirs
etcd_datadir = os.path.join(get_test_output_dir(request), f"etcd_datadir_{client_port}")
pathlib.Path(etcd_datadir).mkdir(exist_ok=True, parents=True)
broker = Etcd(datadir=etcd_datadir, port=client_port, peer_port=port_distributor.get_port())
yield broker
broker.stop()
@pytest.fixture(scope='session')
def mock_s3_server(port_distributor: PortDistributor):
mock_s3_server = MockS3Server(port_distributor.get_port())
yield mock_s3_server
mock_s3_server.kill()
class PgProtocol:
""" Reusable connection logic """
def __init__(self, **kwargs):
@@ -410,7 +429,9 @@ class ZenithEnvBuilder:
def __init__(self,
repo_dir: Path,
port_distributor: PortDistributor,
pageserver_remote_storage: Optional[RemoteStorage] = None,
broker: Etcd,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
pageserver_auth_enabled: bool = False,
@@ -419,24 +440,15 @@ class ZenithEnvBuilder:
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.pageserver_remote_storage = pageserver_remote_storage
self.remote_storage = remote_storage
self.broker = broker
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.default_branch_name = default_branch_name
# keep etcd datadir inside 'repo'
self.broker = Etcd(datadir=os.path.join(self.repo_dir, "etcd"),
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port())
self.env: Optional[ZenithEnv] = None
self.s3_mock_server: Optional[MockS3Server] = None
if os.getenv('FORCE_MOCK_S3') is not None:
bucket_name = f'{repo_dir.name}_bucket'
log.warning(f'Unconditionally initializing mock S3 server for bucket {bucket_name}')
self.enable_s3_mock_remote_storage(bucket_name)
def init(self) -> ZenithEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
@@ -457,9 +469,8 @@ class ZenithEnvBuilder:
"""
def enable_local_fs_remote_storage(self, force_enable=True):
assert force_enable or self.pageserver_remote_storage is None, "remote storage is enabled already"
self.pageserver_remote_storage = LocalFsStorage(
Path(self.repo_dir / 'local_fs_remote_storage'))
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage'))
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
@@ -468,22 +479,19 @@ class ZenithEnvBuilder:
"""
def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True):
assert force_enable or self.pageserver_remote_storage is None, "remote storage is enabled already"
if not self.s3_mock_server:
self.s3_mock_server = MockS3Server(self.port_distributor.get_port())
mock_endpoint = self.s3_mock_server.endpoint()
mock_region = self.s3_mock_server.region()
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
mock_region = self.mock_s3_server.region()
boto3.client(
's3',
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=self.s3_mock_server.access_key(),
aws_secret_access_key=self.s3_mock_server.secret_key(),
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
).create_bucket(Bucket=bucket_name)
self.pageserver_remote_storage = S3Storage(bucket=bucket_name,
endpoint=mock_endpoint,
region=mock_region)
self.remote_storage = S3Storage(bucket=bucket_name,
endpoint=mock_endpoint,
region=mock_region)
def __enter__(self):
return self
@@ -497,10 +505,6 @@ class ZenithEnvBuilder:
for sk in self.env.safekeepers:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
if self.s3_mock_server:
self.s3_mock_server.kill()
if self.env.broker is not None:
self.env.broker.stop()
class ZenithEnv:
@@ -539,10 +543,12 @@ class ZenithEnv:
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
self.port_distributor = config.port_distributor
self.s3_mock_server = config.s3_mock_server
self.s3_mock_server = config.mock_s3_server
self.zenith_cli = ZenithCli(env=self)
self.postgres = PostgresFactory(self)
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
# generate initial tenant ID here instead of letting 'zenith init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -553,7 +559,6 @@ class ZenithEnv:
default_tenant_id = '{self.initial_tenant.hex}'
""")
self.broker = config.broker
toml += textwrap.dedent(f"""
[etcd_broker]
broker_endpoints = ['{self.broker.client_url()}']
@@ -578,7 +583,6 @@ class ZenithEnv:
# Create a corresponding ZenithPageserver object
self.pageserver = ZenithPageserver(self,
port=pageserver_port,
remote_storage=config.pageserver_remote_storage,
config_override=config.pageserver_config_override)
# Create config and a Safekeeper object for each safekeeper
@@ -602,15 +606,13 @@ class ZenithEnv:
self.zenith_cli.init(toml)
def start(self):
# Start up the page server, all the safekeepers and the broker
# Start up broker, pageserver and all safekeepers
self.broker.try_start()
self.pageserver.start()
for safekeeper in self.safekeepers:
safekeeper.start()
if self.broker is not None:
self.broker.start()
def get_safekeeper_connstrs(self) -> str:
""" Get list of safekeeper endpoints suitable for safekeepers GUC """
return ','.join([f'localhost:{wa.port.pg}' for wa in self.safekeepers])
@@ -623,7 +625,10 @@ class ZenithEnv:
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
def _shared_simple_env(request: Any,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[ZenithEnv]:
"""
Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `zenith_simple_env`.
@@ -637,7 +642,8 @@ def _shared_simple_env(request: Any, port_distributor) -> Iterator[ZenithEnv]:
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
shutil.rmtree(repo_dir, ignore_errors=True)
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
with ZenithEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
@@ -659,12 +665,13 @@ def zenith_simple_env(_shared_simple_env: ZenithEnv) -> Iterator[ZenithEnv]:
yield _shared_simple_env
_shared_simple_env.postgres.stop_all()
if _shared_simple_env.s3_mock_server:
_shared_simple_env.s3_mock_server.kill()
@pytest.fixture(scope='function')
def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvBuilder]:
def zenith_env_builder(test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[ZenithEnvBuilder]:
"""
Fixture to create a Zenith environment for test.
@@ -682,7 +689,8 @@ def zenith_env_builder(test_output_dir, port_distributor) -> Iterator[ZenithEnvB
repo_dir = os.path.join(test_output_dir, "repo")
# Return the builder to the caller
with ZenithEnvBuilder(Path(repo_dir), port_distributor) as builder:
with ZenithEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
yield builder
@@ -979,9 +987,10 @@ class ZenithCli:
cmd = ['init', f'--config={tmp.name}']
if initial_timeline_id:
cmd.extend(['--timeline-id', initial_timeline_id.hex])
append_pageserver_param_overrides(cmd,
self.env.pageserver.remote_storage,
self.env.pageserver.config_override)
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
pageserver_config_override=self.env.pageserver.config_override)
res = self.raw_cli(cmd)
res.check_returncode()
@@ -1002,9 +1011,10 @@ class ZenithCli:
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start', *overrides]
append_pageserver_param_overrides(start_args,
self.env.pageserver.remote_storage,
self.env.pageserver.config_override)
append_pageserver_param_overrides(
params_to_update=start_args,
remote_storage=self.env.remote_storage,
pageserver_config_override=self.env.pageserver.config_override)
s3_env_vars = None
if self.env.s3_mock_server:
@@ -1174,16 +1184,11 @@ class ZenithPageserver(PgProtocol):
Initializes the repository via `zenith init`.
"""
def __init__(self,
env: ZenithEnv,
port: PageserverPort,
remote_storage: Optional[RemoteStorage] = None,
config_override: Optional[str] = None):
def __init__(self, env: ZenithEnv, port: PageserverPort, config_override: Optional[str] = None):
super().__init__(host='localhost', port=port.pg, user='zenith_admin')
self.env = env
self.running = False
self.service_port = port
self.remote_storage = remote_storage
self.config_override = config_override
def start(self, overrides=()) -> 'ZenithPageserver':
@@ -1223,21 +1228,21 @@ class ZenithPageserver(PgProtocol):
def append_pageserver_param_overrides(
params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage],
remote_storage: Optional[RemoteStorage],
pageserver_config_override: Optional[str] = None,
):
if pageserver_remote_storage is not None:
if isinstance(pageserver_remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
elif isinstance(pageserver_remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{pageserver_remote_storage.bucket}',\
bucket_region='{pageserver_remote_storage.region}'"
if remote_storage is not None:
if isinstance(remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{remote_storage.bucket}',\
bucket_region='{remote_storage.region}'"
if pageserver_remote_storage.endpoint is not None:
pageserver_storage_override += f",endpoint='{pageserver_remote_storage.endpoint}'"
if remote_storage.endpoint is not None:
pageserver_storage_override += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {pageserver_remote_storage}')
raise Exception(f'Unknown storage configuration {remote_storage}')
params_to_update.append(
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
@@ -1859,7 +1864,11 @@ class Etcd:
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"{self.client_url()}/health").raise_for_status()
def start(self):
def try_start(self):
if self.handle is not None:
log.debug(f'etcd is already running on port {self.port}')
return
pathlib.Path(self.datadir).mkdir(exist_ok=True)
if not self.binary_path.is_file():