From 5aecd8c4fd854179597a93f9b180b8f63bc247c0 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 29 Aug 2023 14:31:38 +0100 Subject: [PATCH] tests: enable generations in neon_fixture --- test_runner/fixtures/neon_fixtures.py | 84 ++++++++++++++++++- test_runner/fixtures/pageserver/http.py | 15 +++- .../regress/test_pageserver_restart.py | 5 +- test_runner/regress/test_remote_storage.py | 17 ++-- 4 files changed, 108 insertions(+), 13 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b2cd0fe968..9de7e90f50 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -428,6 +428,7 @@ class NeonEnvBuilder: preserve_database_files: bool = False, initial_tenant: Optional[TenantId] = None, initial_timeline: Optional[TimelineId] = None, + enable_generations: bool = False, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -454,6 +455,7 @@ class NeonEnvBuilder: self.preserve_database_files = preserve_database_files self.initial_tenant = initial_tenant or TenantId.generate() self.initial_timeline = initial_timeline or TimelineId.generate() + self.enable_generations = False def init_configs(self) -> NeonEnv: # Cannot create more than one environment from one builder @@ -713,6 +715,9 @@ class NeonEnvBuilder: sk.stop(immediate=True) self.env.pageserver.stop(immediate=True) + if self.env.attachment_service is not None: + self.env.attachment_service.stop(immediate=True) + cleanup_error = None try: self.cleanup_remote_storage() @@ -766,6 +771,8 @@ class NeonEnv: the tenant id """ + PAGESERVER_ID = 1 + def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir self.rust_log_override = config.rust_log_override @@ -789,6 +796,14 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline + if config.enable_generations: + attachment_service_port = self.port_distributor.get_port() + self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}" + self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self) + else: + self.control_plane_api = None + self.attachment_service = None + # Create a config file corresponding to the options toml = textwrap.dedent( f""" @@ -814,7 +829,7 @@ class NeonEnv: toml += textwrap.dedent( f""" [pageserver] - id=1 + id={self.PAGESERVER_ID} listen_pg_addr = 'localhost:{pageserver_port.pg}' listen_http_addr = 'localhost:{pageserver_port.http}' pg_auth_type = '{pg_auth_type}' @@ -822,6 +837,13 @@ class NeonEnv: """ ) + if self.control_plane_api is not None: + toml += textwrap.dedent( + f""" + control_plane_api = '{self.control_plane_api}' + """ + ) + # Create a corresponding NeonPageserver object self.pageserver = NeonPageserver( self, port=pageserver_port, config_override=config.pageserver_config_override @@ -868,6 +890,9 @@ class NeonEnv: def start(self): # Start up broker, pageserver and all safekeepers self.broker.try_start() + + if self.attachment_service is not None: + self.attachment_service.start() self.pageserver.start() for safekeeper in self.safekeepers: @@ -1289,6 +1314,16 @@ class NeonCli(AbstractNeonCli): res.check_returncode() return res + def attachment_service_start(self): + cmd = ["attachment_service", "start"] + return self.raw_cli(cmd) + + def attachment_service_stop(self, immediate: bool): + cmd = ["attachment_service", "stop"] + if immediate: + cmd.extend(["-m", "immediate"]) + return self.raw_cli(cmd) + def pageserver_start( self, overrides: Tuple[str, ...] = (), @@ -1470,6 +1505,33 @@ class ComputeCtl(AbstractNeonCli): COMMAND = "compute_ctl" +class NeonAttachmentService: + def __init__(self, env: NeonEnv): + self.env = env + + def start(self): + self.env.neon_cli.attachment_service_start() + self.running = True + return self + + def stop(self, immediate: bool = False) -> "NeonAttachmentService": + if self.running: + self.env.neon_cli.attachment_service_stop(immediate) + self.running = False + return self + + def __enter__(self) -> "NeonAttachmentService": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ): + self.stop(immediate=True) + + class NeonPageserver(PgProtocol): """ An object representing a running pageserver. @@ -1633,6 +1695,26 @@ class NeonPageserver(PgProtocol): return None + def tenant_attach( + self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False + ): + """ + Tenant attachment passes through here to acquire a generation number before proceeding + to call into the pageserver HTTP client. + """ + if self.env.attachment_service is not None: + response = requests.post( + f"{self.env.control_plane_api}/attach_hook", + json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID}, + ) + response.raise_for_status() + generation = response.json()["gen"] + else: + generation = None + + client = self.env.pageserver.http_client() + return client.tenant_attach(tenant_id, config, config_null, generation=generation) + def append_pageserver_param_overrides( params_to_update: List[str], diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index a179ebdd09..9373073abf 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -186,18 +186,25 @@ class PageserverHttpClient(requests.Session): return TenantId(new_tenant_id) def tenant_attach( - self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False + self, + tenant_id: TenantId, + config: None | Dict[str, Any] = None, + config_null: bool = False, + generation: Optional[int] = None, ): if config_null: assert config is None - body = "null" + body: Any = None else: # null-config is prohibited by the API config = config or {} - body = json.dumps({"config": config}) + body = {"config": config} + if generation is not None: + body.update({"generation": generation}) + res = self.post( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach", - data=body, + data=json.dumps(body), headers={"Content-Type": "application/json"}, ) self.verbose_error(res) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 1e41ebd15b..1c61719e5f 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -7,7 +7,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder # Test restarting page server, while safekeeper and compute node keep # running. -def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("generations", [True, False]) +def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool): + neon_env_builder.enable_generations = generations + env = neon_env_builder.init_start() env.neon_cli.create_branch("test_pageserver_restart") diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index b865e3ce24..5755ebab77 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -52,9 +52,9 @@ from requests import ReadTimeout # # The tests are done for all types of remote storage pageserver supports. @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +@pytest.mark.parametrize("generations", [True, False]) def test_remote_storage_backup_and_restore( - neon_env_builder: NeonEnvBuilder, - remote_storage_kind: RemoteStorageKind, + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, generations: bool ): # Use this test to check more realistic SK ids: some etcd key parsing bugs were related, # and this test needs SK to write data to pageserver, so it will be visible @@ -65,6 +65,8 @@ def test_remote_storage_backup_and_restore( test_name="test_remote_storage_backup_and_restore", ) + neon_env_builder.enable_generations = generations + # Exercise retry code path by making all uploads and downloads fail for the # first time. The retries print INFO-messages to the log; we will check # that they are present after the test. @@ -155,7 +157,8 @@ def test_remote_storage_backup_and_restore( # background task to load the tenant. In that background task, # listing the remote timelines will fail because of the failpoint, # and the tenant will be marked as Broken. - client.tenant_attach(tenant_id) + # client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) assert tenant_info["attachment_status"] == { @@ -165,7 +168,7 @@ def test_remote_storage_backup_and_restore( # Ensure that even though the tenant is broken, we can't attach it again. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"): - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) # Restart again, this implicitly clears the failpoint. # test_remote_failures=1 remains active, though, as it's in the pageserver config. @@ -183,7 +186,7 @@ def test_remote_storage_backup_and_restore( # Ensure that the pageserver remembers that the tenant was attaching, by # trying to attach it again. It should fail. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"): - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) log.info("waiting for tenant to become active. this should be quick with on-demand download") wait_until_tenant_active( @@ -364,7 +367,7 @@ def test_remote_storage_upload_queue_retries( env.pageserver.start() client = env.pageserver.http_client() - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) wait_until_tenant_active(client, tenant_id) @@ -502,7 +505,7 @@ def test_remote_timeline_client_calls_started_metric( env.pageserver.start() client = env.pageserver.http_client() - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) wait_until_tenant_active(client, tenant_id)