From 390235e095dcd71656688d410bd335a203393db0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 10 Sep 2024 20:10:30 +0300 Subject: [PATCH] wip add NeonEnv --- test_runner/fixtures/neon_fixtures.py | 726 ++++++++++++++++---------- test_runner/regress/test_config.py | 2 +- 2 files changed, 445 insertions(+), 283 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a5fcb70c79..c96de1f4d5 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -447,106 +447,69 @@ class NeonEnvBuilder: def __init__( self, - repo_dir: Path, - port_distributor: PortDistributor, - run_id: uuid.UUID, - mock_s3_server: MockS3Server, neon_binpath: Path, pg_distrib_dir: Path, - pg_version: PgVersion, - env_name: str, top_output_dir: Path, + repo_dir: Path, + port_distributor: PortDistributor, + test_name: str, test_output_dir: Path, - test_overlay_dir: Optional[Path] = None, - pageserver_remote_storage: Optional[RemoteStorage] = None, - # toml that will be decomposed into `--config-override` flags during `pageserver --init` - pageserver_config_override: Optional[str | Callable[[Dict[str, Any]], None]] = None, - num_safekeepers: int = 1, - num_pageservers: int = 1, - # Use non-standard SK ids to check for various parsing bugs - safekeepers_id_start: int = 0, - # fsync is disabled by default to make the tests go faster - safekeepers_enable_fsync: bool = False, - auth_enabled: bool = False, + pg_version: PgVersion, + run_id: uuid.UUID, + neon_local_binpath: Optional[Path] = None, rust_log_override: Optional[str] = None, - default_branch_name: str = DEFAULT_BRANCH_NAME, - preserve_database_files: bool = False, initial_tenant: Optional[TenantId] = None, initial_timeline: Optional[TimelineId] = None, - pageserver_virtual_file_io_engine: Optional[str] = None, pageserver_aux_file_policy: Optional[AuxFileStore] = None, - pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, - safekeeper_extra_opts: Optional[list[str]] = None, - storage_controller_port_override: Optional[int] = None, - pageserver_io_buffer_alignment: Optional[int] = None, - shared_initdb_cache_dir: Optional[Path] = None, + storage_env: Optional[NeonStorageEnv] = None, + storage_env_builder: Optional[NeonStorageEnv] = None, ): - self.repo_dir = repo_dir - self.rust_log_override = rust_log_override - self.port_distributor = port_distributor - - # Pageserver remote storage - self.pageserver_remote_storage = pageserver_remote_storage - # Safekeepers remote storage - self.safekeepers_remote_storage: Optional[RemoteStorage] = None - - self.run_id = run_id - self.mock_s3_server: MockS3Server = mock_s3_server - self.pageserver_config_override = pageserver_config_override - self.num_safekeepers = num_safekeepers - self.num_pageservers = num_pageservers - self.safekeepers_id_start = safekeepers_id_start - self.safekeepers_enable_fsync = safekeepers_enable_fsync - self.auth_enabled = auth_enabled - self.default_branch_name = default_branch_name - self.env: Optional[NeonEnv] = None - self.keep_remote_storage_contents: bool = True self.neon_binpath = neon_binpath - self.neon_local_binpath = neon_binpath + self.neon_local_binpath = neon_local_binpath self.pg_distrib_dir = pg_distrib_dir + self.repo_dir = repo_dir + self.port_distributor = port_distributor + self.rust_log_override = rust_log_override + self.test_name = test_name + self.test_output_dir = test_output_dir + self.pg_version = pg_version - self.preserve_database_files = preserve_database_files self.initial_tenant = initial_tenant or TenantId.generate() self.initial_timeline = initial_timeline or TimelineId.generate() - self.enable_scrub_on_exit = True - self.test_output_dir = test_output_dir - self.test_overlay_dir = test_overlay_dir - self.shared_initdb_cache_dir = shared_initdb_cache_dir - self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] - self.config_init_force: Optional[str] = None - self.top_output_dir = top_output_dir - self.control_plane_compute_hook_api: Optional[str] = None - self.storage_controller_config: Optional[dict[Any, Any]] = None - - self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine - - self.pageserver_default_tenant_config_compaction_algorithm: Optional[ - Dict[str, Any] - ] = pageserver_default_tenant_config_compaction_algorithm - if self.pageserver_default_tenant_config_compaction_algorithm is not None: - log.debug( - f"Overriding pageserver default compaction algorithm to {self.pageserver_default_tenant_config_compaction_algorithm}" - ) self.pageserver_aux_file_policy = pageserver_aux_file_policy - self.safekeeper_extra_opts = safekeeper_extra_opts + # XXX: check that both are not set + self.storage_env = storage_env + self.storage_env_builder = storage_env_builder - self.storage_controller_port_override = storage_controller_port_override + self.env = None - self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment + self.__setattr__ = self.my_setattr - # Usually test name - self.env_name = env_name + def __getattr__(self, attribute: str) -> Any: + if self.storage_env_builder is not None: + return self.storage_env_builder.__getattribute__(attribute) + else: + raise AttributeError(f"NeonEnvBuilder doesn't have attribute '{attribute}'") + + def my_setattr(self, attribute: str, value: Any) -> Any: + if self.storage_env_builder is not None: + return self.server_env_builder.__setattribute__(attribute, value) + else: + raise AttributeError(f"NeonEnvBuilder doesn't have attribute '{attribute}'") def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv: # Cannot create more than one environment from one builder assert self.env is None, "environment already initialized" - if default_remote_storage_if_missing and self.pageserver_remote_storage is None: - self.enable_pageserver_remote_storage(default_remote_storage()) - self.env = NeonEnv(self) - return self.env + env = NeonEnv(self) + if self.storage_env_builder is not None: + self.storage_env = self.storage_env_builder.init_configs(neon_cli, default_remote_storage_if_missing=default_remote_storage_if_missing) + env.storage_env = self.storage_env + self.env = env + return self.env + def init_start( self, initial_tenant_conf: Optional[Dict[str, Any]] = None, @@ -556,13 +519,12 @@ class NeonEnvBuilder: ) -> NeonEnv: """ Default way to create and start NeonEnv. Also creates the initial_tenant with root initial_timeline. - - To avoid creating initial_tenant, call init_configs to setup the environment. - - Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one. - """ - env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing) - self.env.start() + """ + # Cannot create more than one environment from one builder + assert self.env is None, "environment already initialized" + env = self.init_configs(default_remote_storage_if_missing) + + env.storage_env.start() # Prepare the default branch to start the postgres on later. # Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API. @@ -580,12 +542,122 @@ class NeonEnvBuilder: assert env.initial_tenant == initial_tenant assert env.initial_timeline == initial_timeline log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully") - return env + def start(self, timeout_in_seconds: Optional[int] = None): + self.storage_env.start() + + def stop(self, immediate: bool = False) -> "": + if self.env is not None: + self.env.stop(immediate) + + def __enter__(self) -> "NeonEnvBuilder": + return self + + + # XXX: Why is this part of the Builder and not the Env itself? + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ): + # Stop all the nodes. + self.stop(immediate=True) + +class NeonStorageEnvBuilder: + """ + Builder object to create a Neon runtime environment (storage parts) + """ + + def __init__( + self, + repo_dir: Path, + port_distributor: PortDistributor, + run_id: uuid.UUID, + mock_s3_server: MockS3Server, + neon_binpath: Path, + pg_distrib_dir: Path, + env_name: str, + top_output_dir: Path, + test_overlay_dir: Optional[Path] = None, + pageserver_remote_storage: Optional[RemoteStorage] = None, + # toml that will be decomposed into `--config-override` flags during `pageserver --init` + pageserver_config_override: Optional[str | Callable[[Dict[str, Any]], None]] = None, + num_safekeepers: int = 1, + num_pageservers: int = 1, + # Use non-standard SK ids to check for various parsing bugs + safekeepers_id_start: int = 0, + # fsync is disabled by default to make the tests go faster + safekeepers_enable_fsync: bool = False, + auth_enabled: bool = False, + neon_local_binpath: Optional[Path] = None, + rust_log_override: Optional[str] = None, + preserve_database_files: bool = False, + pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, + pageserver_virtual_file_io_engine: Optional[str] = None, + safekeeper_extra_opts: Optional[list[str]] = None, + storage_controller_port_override: Optional[int] = None, + pageserver_io_buffer_alignment: Optional[int] = None, + shared_initdb_cache_dir: Optional[Path] = None, + ): + self.repo_dir = repo_dir + self.port_distributor = port_distributor + self.top_output_dir = top_output_dir + self.rust_log_override = rust_log_override + self.neon_local_binpath = neon_local_binpath + + # Pageserver remote storage + self.pageserver_remote_storage = pageserver_remote_storage + # Safekeepers remote storage + self.safekeepers_remote_storage: Optional[RemoteStorage] = None + + self.run_id = run_id + self.mock_s3_server: MockS3Server = mock_s3_server + self.pageserver_config_override = pageserver_config_override + self.num_safekeepers = num_safekeepers + self.num_pageservers = num_pageservers + self.safekeepers_id_start = safekeepers_id_start + self.safekeepers_enable_fsync = safekeepers_enable_fsync + self.auth_enabled = auth_enabled + self.storage_env: Optional[NeonStorageEnv] = None + self.keep_remote_storage_contents: bool = True + self.neon_binpath = neon_binpath + self.pg_distrib_dir = pg_distrib_dir + self.preserve_database_files = preserve_database_files + self.enable_scrub_on_exit = True + self.test_overlay_dir = test_overlay_dir + self.shared_initdb_cache_dir = shared_initdb_cache_dir + self.overlay_mounts_created_by_us: List[Tuple[str, Path]] = [] + self.config_init_force: Optional[str] = None + self.top_output_dir = top_output_dir + self.control_plane_compute_hook_api: Optional[str] = None + self.storage_controller_config: Optional[dict[Any, Any]] = None + + self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine + + self.safekeeper_extra_opts = safekeeper_extra_opts + + self.storage_controller_port_override = storage_controller_port_override + + + self.pageserver_default_tenant_config_compaction_algorithm = pageserver_default_tenant_config_compaction_algorithm + self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment + + # Usually test name + self.env_name = env_name + + def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonStorageEnv: + # Cannot create more than one environment from one builder + assert self.storage_env is None, "environment already initialized" + if default_remote_storage_if_missing and self.pageserver_remote_storage is None: + self.enable_pageserver_remote_storage(default_remote_storage()) + self.storage_env = NeonStorageEnv(self) + return self.storage_env + def build_and_use_snapshot( - self, global_ident: str, create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv] - ) -> NeonEnv: + self, global_ident: str, create_env_for_snapshot: Callable[[NeonStorageEnvBuilder], NeonStorageEnv] + ) -> NeonStorageEnv: if os.getenv("CI", "false") == "true": log.info("do not use snapshots in ephemeral CI environment") env = create_env_for_snapshot(self) @@ -602,7 +674,7 @@ class NeonEnvBuilder: def _build_and_use_snapshot_impl( self, snapshot_dir: SnapshotDirLocked, - create_env_for_snapshot: Callable[[NeonEnvBuilder], NeonEnv], + create_env_for_snapshot: Callable[[NeonStorageEnvBuilder], NeonStorageEnv], ): if snapshot_dir.path.exists(): shutil.rmtree(snapshot_dir.path) @@ -618,8 +690,8 @@ class NeonEnvBuilder: self.config_init_force = "empty-dir-ok" env = create_env_for_snapshot(self) - assert self.env is not None - assert self.env == env + assert self.storage_env is not None + assert self.storage_env == env # shut down everything for snapshot env.stop(immediate=True, ps_assert_metric_no_errors=True) @@ -639,14 +711,14 @@ class NeonEnvBuilder: assert not env.repo_dir.exists(), "both branches above should remove it" snapshot_dir.set_initialized() - self.env = None # so that from_repo_dir works again + self.storage_env = None # so that from_repo_dir works again def from_repo_dir( self, repo_dir: Path, - ) -> NeonEnv: + ) -> NeonStorageEnv: """ - A simple method to import data into the current NeonEnvBuilder from a snapshot of a repo dir. + A simple method to import data into the current NeonStorageEnvBuilder from a snapshot of a repo dir. """ # Get the initial tenant and timeline from the snapshot config @@ -658,13 +730,13 @@ class NeonEnvBuilder: with snapshot_branches_toml.open("r") as f: snapshot_branch_mappings = toml.load(f) - self.initial_tenant = TenantId(snapshot_config["default_tenant_id"]) + self.initial_tenant = TenantId(snapshot_branch_mappings["default_tenant_id"]) self.initial_timeline = TimelineId( - dict(snapshot_config["branch_name_mappings"][DEFAULT_BRANCH_NAME])[ + dict(snapshot_branch_mappings["mappings"][DEFAULT_BRANCH_NAME])[ str(self.initial_tenant) ] ) - self.env = self.init_configs() + self.storage_env = self.init_configs() for ps_dir in repo_dir.glob("pageserver_*"): tenants_from_dir = ps_dir / "tenants" @@ -716,13 +788,13 @@ class NeonEnvBuilder: shutil.copytree(storcon_db_from_dir, storcon_db_to_dir, ignore=ignore_postgres_log) assert not (storcon_db_to_dir / "postgres.log").exists() # NB: neon_local rewrites postgresql.conf on each start based on neon_local config. No need to patch it. - # However, in this new NeonEnv, the pageservers listen on different ports, and the storage controller + # However, in this new NeonStorageEnv, the pageservers listen on different ports, and the storage controller # will currently reject re-attach requests from them because the NodeMetadata isn't identical. # So, from_repo_dir patches up the the storcon database. patch_script_path = self.repo_dir / "storage_controller_db.startup.sql" assert not patch_script_path.exists() patch_script = "" - for ps in self.env.pageservers: + for ps in self.storage_env.pageservers: patch_script += f"UPDATE nodes SET listen_http_port={ps.service_port.http}, listen_pg_port={ps.service_port.pg} WHERE node_id = '{ps.id}';" # This is a temporary to get the backward compat test happy # since the compat snapshot was generated with an older version of neon local @@ -743,7 +815,7 @@ class NeonEnvBuilder: with (self.repo_dir / "branches.toml").open("w") as f: toml.dump(snapshot_branch_mappings, f) - return self.env + return self.storage_env def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path): """ @@ -939,9 +1011,10 @@ class NeonEnvBuilder: if isinstance(x, S3Storage): x.do_cleanup() - def __enter__(self) -> "NeonEnvBuilder": + def __enter__(self) -> NeonStorageEnvBuilder: return self + # XXX: Why is this part of the Builder and not the Env itself? def __exit__( self, exc_type: Optional[Type[BaseException]], @@ -949,26 +1022,24 @@ class NeonEnvBuilder: traceback: Optional[TracebackType], ): # Stop all the nodes. - if self.env: + if self.storage_env: log.info("Cleaning up all storage and compute nodes") - self.env.stop( + self.storage_env.stop( immediate=False, # if the test threw an exception, don't check for errors # as a failing assertion would cause the cleanup below to fail ps_assert_metric_no_errors=(exc_type is None), - # do not fail on endpoint errors to allow the rest of cleanup to proceed - fail_on_endpoint_errors=False, ) cleanup_error = None # If we are running with S3Storage (required by the scrubber), check that whatever the test # did does not generate any corruption if ( - isinstance(self.env.pageserver_remote_storage, S3Storage) + isinstance(self.storage_env.pageserver_remote_storage, S3Storage) and self.enable_scrub_on_exit ): try: - healthy, _ = self.env.storage_scrubber.scan_metadata() + healthy, _ = self.storage_env.storage_scrubber.scan_metadata() if not healthy: e = Exception("Remote storage metadata corrupted") cleanup_error = e @@ -993,13 +1064,13 @@ class NeonEnvBuilder: if cleanup_error is not None: raise cleanup_error - for pageserver in self.env.pageservers: + for pageserver in self.storage_env.pageservers: pageserver.assert_no_errors() - for safekeeper in self.env.safekeepers: + for safekeeper in self.storage_env.safekeepers: safekeeper.assert_no_errors() - self.env.storage_controller.assert_no_errors() + self.storage_env.storage_controller.assert_no_errors() try: self.overlay_cleanup_teardown() @@ -1008,8 +1079,85 @@ class NeonEnvBuilder: if cleanup_error is not None: cleanup_error = e - class NeonEnv: + """ + An object representing the Neon runtime environment for one test + + The storage can be shared or dedicated for this test. + + Some notable functions and fields in NeonEnv: + + endpoints - A factory object for creating postgres compute nodes. + + storage - + """ + + def __init__(self, config: NeonEnvBuilder): + self.repo_dir = config.repo_dir + self.neon_binpath = config.neon_binpath + self.pg_distrib_dir = config.pg_distrib_dir + self.rust_log_override = config.rust_log_override + self.port_distributor = config.port_distributor + self.endpoints = EndpointFactory(self) + self.storage_env = None + + # XXX: caller must set 'storage_env' before using those functions + + self.pg_version = config.pg_version + self.initial_tenant = config.initial_tenant + self.initial_timeline = config.initial_timeline + + self.pageserver_aux_file_policy = config.pageserver_aux_file_policy + + self.endpoint_counter=0 + + # Binary path for neon_local test-specific binaries + self.neon_local_binpath = config.neon_local_binpath + if self.neon_local_binpath is None: + self.neon_local_binpath = config.neon_binpath + + self.neon_cli = NeonCli(self) + + def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True): + """ + After this method returns, there should be no child processes running. + + Unless of course, some stopping failed, in that case, all remaining child processes are leaked. + """ + + # the commonly failing components have special try-except behavior, + # trying to get us to actually shutdown all processes over easier error + # reporting. + + raise_later = None + try: + self.endpoints.stop_all(fail_on_endpoint_errors) + except Exception as e: + raise_later = e + + if self.storage_env is not None: + self.storage_env.stop(immediate=immediate, ps_assert_metric_no_errors=ps_assert_metric_no_errors) + + if raise_later is not None: + raise raise_later + + def generate_endpoint_id(self) -> str: + """ + Generate a unique endpoint ID + """ + self.endpoint_counter += 1 + return "ep-" + str(self.endpoint_counter) + + def __getattr__(self, attribute: str) -> Any: + storage_env = None + try: + storage_env = self.__getattribute__('storage_env') + except AttributeError: + raise AttributeError(f"attribute {attribute} not found in NeonEnv") + + return self.storage_env.__getattribute__(attribute) + +class NeonStorageEnv: """ An object representing the Neon runtime environment. It consists of the page server, 0-N safekeepers, and the compute nodes. @@ -1024,8 +1172,6 @@ class NeonEnv: Some notable functions and fields in NeonEnv: - endpoints - A factory object for creating postgres compute nodes. - pageservers - An array containing objects representing the pageservers safekeepers - An array containing objects representing the safekeepers @@ -1046,36 +1192,23 @@ class NeonEnv: BASE_PAGESERVER_ID = 1 storage_controller: NeonStorageController | NeonProxiedStorageController - def __init__(self, config: NeonEnvBuilder): + def __init__(self, config: NeonStorageEnvBuilder): self.repo_dir = config.repo_dir self.shared_initdb_cache_dir = config.shared_initdb_cache_dir self.rust_log_override = config.rust_log_override self.port_distributor = config.port_distributor self.s3_mock_server = config.mock_s3_server - self.neon_cli = NeonCli(env=self) - self.pagectl = Pagectl(env=self) - self.endpoints = EndpointFactory(self) self.safekeepers: List[Safekeeper] = [] self.pageservers: List[NeonPageserver] = [] self.broker = None self.pageserver_remote_storage = config.pageserver_remote_storage self.safekeepers_remote_storage = config.safekeepers_remote_storage - self.pg_version = config.pg_version # Binary path for pageserver, safekeeper, etc - self.neon_binpath = config.neon_binpath - # Binary path for neon_local test-specific binaries self.neon_local_binpath = config.neon_local_binpath - if self.neon_local_binpath is None: - self.neon_local_binpath = self.neon_binpath + self.neon_binpath = config.neon_binpath self.pg_distrib_dir = config.pg_distrib_dir - self.endpoint_counter = 0 self.storage_controller_config = config.storage_controller_config - # generate initial tenant ID here instead of letting 'neon init' generate it, - # so that we don't need to dig it out of the config file afterwards. - self.initial_tenant = config.initial_tenant - self.initial_timeline = config.initial_timeline - # Storage broker instance broker_logfile = self.repo_dir / "storage_broker.log" broker_port = self.port_distributor.get_port() @@ -1117,9 +1250,17 @@ class NeonEnv: self.control_plane_compute_hook_api = config.control_plane_compute_hook_api self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine - self.pageserver_aux_file_policy = config.pageserver_aux_file_policy self.pageserver_io_buffer_alignment = config.pageserver_io_buffer_alignment + + # Binary path for neon_local test-specific binaries + self.neon_local_binpath = config.neon_local_binpath + if self.neon_local_binpath is None: + self.neon_local_binpath = config.neon_binpath + + self.neon_cli = NeonStorageCli(self) + self.pagectl = Pagectl(self) + # Create the neon_local's `NeonLocalInitConf` cfg: Dict[str, Any] = { "broker": { @@ -1220,7 +1361,7 @@ class NeonEnv: cfg["safekeepers"].append(sk_cfg) # Scrubber instance for tests that use it, and for use during teardown checks - self.storage_scrubber = StorageScrubber(self, log_dir=config.test_output_dir) + self.storage_scrubber = StorageScrubber(self, log_dir=self.repo_dir) log.info(f"Config: {cfg}") self.neon_cli.init( @@ -1263,7 +1404,7 @@ class NeonEnv: for f in futs: f.result() - def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True): + def stop(self, immediate=False, ps_assert_metric_no_errors=False): """ After this method returns, there should be no child processes running. @@ -1274,12 +1415,6 @@ class NeonEnv: # trying to get us to actually shutdown all processes over easier error # reporting. - raise_later = None - try: - self.endpoints.stop_all(fail_on_endpoint_errors) - except Exception as e: - raise_later = e - # Stop storage controller before pageservers: we don't want it to spuriously # detect a pageserver "failure" during test teardown self.storage_controller.stop(immediate=immediate) @@ -1306,9 +1441,6 @@ class NeonEnv: for ps in stop_later: ps.stop(immediate=True) - if raise_later is not None: - raise raise_later - for error in metric_errors: raise error @@ -1400,13 +1532,6 @@ class NeonEnv: ) del self.auth_keys - def generate_endpoint_id(self) -> str: - """ - Generate a unique endpoint ID - """ - self.endpoint_counter += 1 - return "ep-" + str(self.endpoint_counter) - @pytest.fixture(scope="function") def neon_simple_env( @@ -1435,27 +1560,38 @@ def neon_simple_env( # Create the environment in the per-test output directory repo_dir = get_test_repo_dir(request, top_output_dir) - with NeonEnvBuilder( + with NeonStorageEnvBuilder( top_output_dir=top_output_dir, repo_dir=repo_dir, port_distributor=port_distributor, mock_s3_server=mock_s3_server, neon_binpath=neon_binpath, pg_distrib_dir=pg_distrib_dir, - pg_version=pg_version, run_id=run_id, preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")), env_name=request.node.name, - test_output_dir=test_output_dir, - pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, - pageserver_aux_file_policy=pageserver_aux_file_policy, pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, + pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, shared_initdb_cache_dir=shared_initdb_cache_dir - ) as builder: - env = builder.init_start() + ) as storage_env_builder: + storage_env = storage_env_builder.init_configs() + with NeonEnvBuilder( + top_output_dir=top_output_dir, + repo_dir=repo_dir, + port_distributor=port_distributor, + neon_binpath=neon_binpath, + pg_distrib_dir=pg_distrib_dir, + pg_version=pg_version, + run_id=run_id, + test_name=request.node.name, + test_output_dir=test_output_dir, + pageserver_aux_file_policy=pageserver_aux_file_policy, + storage_env=storage_env, + ) as env_builder: + env = env_builder.init_start() - yield env + yield env @pytest.fixture(scope="function") @@ -1492,32 +1628,42 @@ def neon_env_builder( """ # Create the environment in the test-specific output dir - repo_dir = os.path.join(test_output_dir, "repo") + repo_dir = Path(os.path.join(test_output_dir, "repo")) # Return the builder to the caller - with NeonEnvBuilder( + + with NeonStorageEnvBuilder( top_output_dir=top_output_dir, - repo_dir=Path(repo_dir), + repo_dir=repo_dir, port_distributor=port_distributor, mock_s3_server=mock_s3_server, neon_binpath=neon_binpath, pg_distrib_dir=pg_distrib_dir, - pg_version=pg_version, run_id=run_id, preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")), - pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, env_name=request.node.name, - test_output_dir=test_output_dir, - test_overlay_dir=test_overlay_dir, - pageserver_aux_file_policy=pageserver_aux_file_policy, pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm, + pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine, pageserver_io_buffer_alignment=pageserver_io_buffer_alignment, shared_initdb_cache_dir=shared_initdb_cache_dir - ) as builder: - yield builder - # Propogate `preserve_database_files` to make it possible to use in other fixtures, - # like `test_output_dir` fixture for attaching all database files to Allure report. - record_property("preserve_database_files", builder.preserve_database_files) + ) as storage_env_builder: + with NeonEnvBuilder( + top_output_dir=top_output_dir, + repo_dir=Path(repo_dir), + port_distributor=port_distributor, + neon_binpath=neon_binpath, + pg_distrib_dir=pg_distrib_dir, + pg_version=pg_version, + run_id=run_id, + test_name=request.node.name, + test_output_dir=test_output_dir, + pageserver_aux_file_policy=pageserver_aux_file_policy, + storage_env_builder=storage_env_builder, + ) as builder: + yield builder + # Propogate `preserve_database_files` to make it possible to use in other fixtures, + # like `test_output_dir` fixture for attaching all database files to Allure report. + record_property("preserve_database_files", builder.preserve_database_files) @dataclass @@ -1534,8 +1680,11 @@ class AbstractNeonCli(abc.ABC): Do not use directly, use specific subclasses instead. """ - def __init__(self, env: NeonEnv): - self.env = env + def __init__(self, binpath: Path, repo_dir: Path, pg_distrib_dir: Path, rust_log_override: Optional[str]): + self.binpath = binpath + self.repo_dir = repo_dir + self.pg_distrib_dir = pg_distrib_dir + self.rust_log_override = rust_log_override COMMAND: str = cast(str, None) # To be overwritten by the derived class. @@ -1545,12 +1694,11 @@ class AbstractNeonCli(abc.ABC): extra_env_vars: Optional[Dict[str, str]] = None, check_return_code=True, timeout=None, - local_binpath=False, ) -> "subprocess.CompletedProcess[str]": """ Run the command with the specified arguments. - Arguments must be in list form, e.g. ['pg', 'create'] + Arguments must be in list form, e.g. ['endpoint', 'create'] Return both stdout and stderr, which can be accessed as @@ -1559,28 +1707,21 @@ class AbstractNeonCli(abc.ABC): >>> log.info(result.stdout) If `check_return_code`, on non-zero exit code logs failure and raises. - - If `local_binpath` is true, then we are invoking a test utility """ assert isinstance(arguments, list) assert isinstance(self.COMMAND, str) - if local_binpath: - # Test utility - bin_neon = str(self.env.neon_local_binpath / self.COMMAND) - else: - # Normal binary - bin_neon = str(self.env.neon_binpath / self.COMMAND) + bin_neon = str(self.binpath / self.COMMAND) args = [bin_neon] + arguments log.info('Running command "{}"'.format(" ".join(args))) env_vars = os.environ.copy() - env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir) - env_vars["POSTGRES_DISTRIB_DIR"] = str(self.env.pg_distrib_dir) - if self.env.rust_log_override is not None: - env_vars["RUST_LOG"] = self.env.rust_log_override + env_vars["NEON_REPO_DIR"] = str(self.repo_dir) + env_vars["POSTGRES_DISTRIB_DIR"] = str(self.pg_distrib_dir) + if self.rust_log_override is not None: + env_vars["RUST_LOG"] = self.rust_log_override for extra_env_key, extra_env_value in (extra_env_vars or {}).items(): env_vars[extra_env_key] = extra_env_value @@ -1649,14 +1790,17 @@ class AbstractNeonCli(abc.ABC): class NeonCli(AbstractNeonCli): """ - A typed wrapper around the `neon` CLI tool. + A typed wrapper around the `neon_local` CLI tool. Supports main commands via typed methods and a way to run arbitrary command directly via CLI. """ COMMAND = "neon_local" + def __init__(self, env: NeonEnv): + super().__init__(env.neon_local_binpath, env.repo_dir, env.pg_distrib_dir, env.rust_log_override) + self.env = env + def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]: - kwargs["local_binpath"] = True return super().raw_cli(*args, **kwargs) def create_tenant( @@ -1827,105 +1971,6 @@ class NeonCli(AbstractNeonCli): ) return timelines_cli - def init( - self, - init_config: Dict[str, Any], - force: Optional[str] = None, - ) -> "subprocess.CompletedProcess[str]": - with tempfile.NamedTemporaryFile(mode="w+") as init_config_tmpfile: - init_config_tmpfile.write(toml.dumps(init_config)) - init_config_tmpfile.flush() - - cmd = [ - "init", - f"--config={init_config_tmpfile.name}", - ] - - if force is not None: - cmd.extend(["--force", force]) - - res = self.raw_cli(cmd) - res.check_returncode() - return res - - def storage_controller_start( - self, - timeout_in_seconds: Optional[int] = None, - instance_id: Optional[int] = None, - base_port: Optional[int] = None, - ): - cmd = ["storage_controller", "start"] - if timeout_in_seconds is not None: - cmd.append(f"--start-timeout={timeout_in_seconds}s") - if instance_id is not None: - cmd.append(f"--instance-id={instance_id}") - if base_port is not None: - cmd.append(f"--base-port={base_port}") - return self.raw_cli(cmd) - - def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): - cmd = ["storage_controller", "stop"] - if immediate: - cmd.extend(["-m", "immediate"]) - if instance_id is not None: - cmd.append(f"--instance-id={instance_id}") - return self.raw_cli(cmd) - - def pageserver_start( - self, - id: int, - extra_env_vars: Optional[Dict[str, str]] = None, - timeout_in_seconds: Optional[int] = None, - ) -> "subprocess.CompletedProcess[str]": - start_args = ["pageserver", "start", f"--id={id}"] - if timeout_in_seconds is not None: - start_args.append(f"--start-timeout={timeout_in_seconds}s") - storage = self.env.pageserver_remote_storage - - if isinstance(storage, S3Storage): - s3_env_vars = storage.access_env_vars() - extra_env_vars = (extra_env_vars or {}) | s3_env_vars - - return self.raw_cli(start_args, extra_env_vars=extra_env_vars) - - def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]": - cmd = ["pageserver", "stop", f"--id={id}"] - if immediate: - cmd.extend(["-m", "immediate"]) - - log.info(f"Stopping pageserver with {cmd}") - return self.raw_cli(cmd) - - def safekeeper_start( - self, - id: int, - extra_opts: Optional[List[str]] = None, - timeout_in_seconds: Optional[int] = None, - ) -> "subprocess.CompletedProcess[str]": - s3_env_vars = None - if isinstance(self.env.safekeepers_remote_storage, S3Storage): - s3_env_vars = self.env.safekeepers_remote_storage.access_env_vars() - - if extra_opts is not None: - extra_opts = [f"-e={opt}" for opt in extra_opts] - else: - extra_opts = [] - if timeout_in_seconds is not None: - extra_opts.append(f"--start-timeout={timeout_in_seconds}s") - return self.raw_cli( - ["safekeeper", "start", str(id), *extra_opts], extra_env_vars=s3_env_vars - ) - - def safekeeper_stop( - self, id: Optional[int] = None, immediate=False - ) -> "subprocess.CompletedProcess[str]": - args = ["safekeeper", "stop"] - if id is not None: - args.append(str(id)) - if immediate: - args.extend(["-m", "immediate"]) - return self.raw_cli(args) - def endpoint_create( self, branch_name: str, @@ -2066,6 +2111,120 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(["stop"], check_return_code=check_return_code) +class NeonStorageCli(AbstractNeonCli): + """ + A typed wrapper around the `neon_local` CLI tool. + Supports main commands via typed methods and a way to run arbitrary command directly via CLI. + """ + + COMMAND = "neon_local" + + def __init__(self, storage_env: NeonStorageEnv): + super().__init__(storage_env.neon_local_binpath, storage_env.repo_dir, storage_env.pg_distrib_dir, storage_env.rust_log_override) + self.storage_env = storage_env + + def raw_cli(self, *args, **kwargs) -> subprocess.CompletedProcess[str]: + return super().raw_cli(*args, **kwargs) + + def init( + self, + init_config: Dict[str, Any], + force: Optional[str] = None, + ) -> "subprocess.CompletedProcess[str]": + with tempfile.NamedTemporaryFile(mode="w+") as init_config_tmpfile: + init_config_tmpfile.write(toml.dumps(init_config)) + init_config_tmpfile.flush() + + cmd = [ + "init", + f"--config={init_config_tmpfile.name}", + ] + + if force is not None: + cmd.extend(["--force", force]) + + res = self.raw_cli(cmd) + res.check_returncode() + return res + + def storage_controller_start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): + cmd = ["storage_controller", "start"] + if timeout_in_seconds is not None: + cmd.append(f"--start-timeout={timeout_in_seconds}s") + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + if base_port is not None: + cmd.append(f"--base-port={base_port}") + return self.raw_cli(cmd) + + def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): + cmd = ["storage_controller", "stop"] + if immediate: + cmd.extend(["-m", "immediate"]) + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + return self.raw_cli(cmd) + + def pageserver_start( + self, + id: int, + extra_env_vars: Optional[Dict[str, str]] = None, + timeout_in_seconds: Optional[int] = None, + ) -> "subprocess.CompletedProcess[str]": + start_args = ["pageserver", "start", f"--id={id}"] + if timeout_in_seconds is not None: + start_args.append(f"--start-timeout={timeout_in_seconds}s") + storage = self.storage_env.pageserver_remote_storage + + if isinstance(storage, S3Storage): + s3_env_vars = storage.access_env_vars() + extra_env_vars = (extra_env_vars or {}) | s3_env_vars + + return self.raw_cli(start_args, extra_env_vars=extra_env_vars) + + def pageserver_stop(self, id: int, immediate=False) -> "subprocess.CompletedProcess[str]": + cmd = ["pageserver", "stop", f"--id={id}"] + if immediate: + cmd.extend(["-m", "immediate"]) + + log.info(f"Stopping pageserver with {cmd}") + return self.raw_cli(cmd) + + def safekeeper_start( + self, + id: int, + extra_opts: Optional[List[str]] = None, + timeout_in_seconds: Optional[int] = None, + ) -> "subprocess.CompletedProcess[str]": + s3_env_vars = None + if isinstance(self.storage_env.safekeepers_remote_storage, S3Storage): + s3_env_vars = self.storage_env.safekeepers_remote_storage.access_env_vars() + + if extra_opts is not None: + extra_opts = [f"-e={opt}" for opt in extra_opts] + else: + extra_opts = [] + if timeout_in_seconds is not None: + extra_opts.append(f"--start-timeout={timeout_in_seconds}s") + return self.raw_cli( + ["safekeeper", "start", str(id), *extra_opts], extra_env_vars=s3_env_vars + ) + + def safekeeper_stop( + self, id: Optional[int] = None, immediate=False + ) -> "subprocess.CompletedProcess[str]": + args = ["safekeeper", "stop"] + if id is not None: + args.append(str(id)) + if immediate: + args.extend(["-m", "immediate"]) + return self.raw_cli(args) + class WalCraft(AbstractNeonCli): """ A typed wrapper around the `wal_craft` CLI tool. @@ -2097,6 +2256,9 @@ class Pagectl(AbstractNeonCli): A typed wrapper around the `pagectl` utility CLI tool. """ + def __init__(self, binpath: Path): + self.binpath = binpath + COMMAND = "pagectl" def dump_index_part(self, path: Path) -> IndexPartDump: diff --git a/test_runner/regress/test_config.py b/test_runner/regress/test_config.py index d8ef0b8dbd..6bf5fdac6e 100644 --- a/test_runner/regress/test_config.py +++ b/test_runner/regress/test_config.py @@ -7,7 +7,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder # # Test starting Postgres with custom options # -def test_config(neon_simple_env: NeonEnv): +def test_configx(neon_simple_env: NeonEnv): env = neon_simple_env # change config