From 6a922b1a7543c41ef38ce8daaa7b4c9da271c158 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 11 Dec 2023 16:55:43 +0000 Subject: [PATCH] tests: start adding tests for secondary mode, live migration (#5842) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit These tests have been loitering on a branch of mine for a while: they already provide value even without all the secondary mode bits landed yet, and the Workload helper is handy for other tests too. - `Workload` is a re-usable test workload that replaces some of the arbitrary "write a few rows" SQL that I've found my self repeating, and adds a systematic way to append data and check that reads properly reflect the changes. This append+validate stuff is important when doing migrations, as we want to detect situations where we might be reading from a pageserver that has not properly seen latest changes. - test_multi_attach is a validation of how the pageserver handles attaching the same tenant to multiple pageservers, from a safety point of view. This is intentionally separate from the larger testing of migration, to provide an isolated environment for multi-attachment. - test_location_conf_churn is a pseudo-random walk through the various states that TenantSlot can be put into, with validation that attached tenants remain externally readable when they should, and as a side effect validating that the compute endpoint's online configuration changes work as expected. - test_live_migration is the reference implementation of how to drive a pair of pageservers through a zero-downtime migration of a tenant. --------- Co-authored-by: Arpad Müller --- test_runner/fixtures/neon_fixtures.py | 27 +- test_runner/fixtures/pageserver/http.py | 19 +- test_runner/fixtures/workload.py | 148 ++++++++ .../regress/test_pageserver_generations.py | 103 +++++- .../regress/test_pageserver_secondary.py | 332 ++++++++++++++++++ 5 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 test_runner/fixtures/workload.py create mode 100644 test_runner/regress/test_pageserver_secondary.py diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c569b63d4e..fb6cea5713 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1712,7 +1712,7 @@ class NeonPageserver(PgProtocol): @property def workdir(self) -> Path: - return Path(os.path.join(self.env.repo_dir, f"pageserver_{self.id}")) + return self.env.repo_dir / f"pageserver_{self.id}" def assert_no_errors(self): logfile = self.workdir / "pageserver.log" @@ -1784,6 +1784,27 @@ class NeonPageserver(PgProtocol): client = self.http_client() return client.tenant_detach(tenant_id) + def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs): + # This API is only for use when generations are enabled + assert self.env.attachment_service is not None + + if config["mode"].startswith("Attached") and "generation" not in config: + config["generation"] = self.env.attachment_service.attach_hook_issue(tenant_id, self.id) + + client = self.http_client() + return client.tenant_location_conf(tenant_id, config, **kwargs) + + def read_tenant_location_conf(self, tenant_id: TenantId) -> dict[str, Any]: + path = self.tenant_dir(tenant_id) / "config-v1" + log.info(f"Reading location conf from {path}") + bytes = open(path, "r").read() + try: + decoded: dict[str, Any] = toml.loads(bytes) + return decoded + except: + log.error(f"Failed to decode LocationConf, raw content ({len(bytes)} bytes): {bytes}") + raise + def tenant_create( self, tenant_id: TenantId, @@ -2717,6 +2738,7 @@ class EndpointFactory: lsn: Optional[Lsn] = None, hot_standby: bool = False, config_lines: Optional[List[str]] = None, + pageserver_id: Optional[int] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -2736,6 +2758,7 @@ class EndpointFactory: lsn=lsn, hot_standby=hot_standby, config_lines=config_lines, + pageserver_id=pageserver_id, ) def stop_all(self) -> "EndpointFactory": @@ -3082,7 +3105,7 @@ def pytest_addoption(parser: Parser): SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg] - r"config|metadata|.+\.(?:toml|pid|json|sql)" + r"config|config-v1|heatmap-v1|metadata|.+\.(?:toml|pid|json|sql)" ) diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 3e75bac424..b46ddf5527 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -150,7 +150,7 @@ class PageserverHttpClient(requests.Session): # (this may change in future if we do fault injection of a kind that causes # requests TCP flows to stick) read=False, - backoff_factor=0, + backoff_factor=0.2, status_forcelist=[503], allowed_methods=None, remove_headers_on_redirect=[], @@ -277,6 +277,23 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params) self.verbose_error(res) + def tenant_location_conf( + self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None + ): + body = location_conf.copy() + body["tenant_id"] = str(tenant_id) + + params = {} + if flush_ms is not None: + params["flush_ms"] = str(flush_ms) + + res = self.put( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config", + json=body, + params=params, + ) + self.verbose_error(res) + def tenant_delete(self, tenant_id: TenantId): res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py new file mode 100644 index 0000000000..241531437c --- /dev/null +++ b/test_runner/fixtures/workload.py @@ -0,0 +1,148 @@ +from typing import Optional + +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + last_flush_lsn_upload, + wait_for_last_flush_lsn, +) +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload +from fixtures.types import TenantId, TimelineId + + +class Workload: + """ + This is not a general purpose load generator: it exists for storage tests that need to inject some + high level types of storage work via the postgres interface: + - layer writes (`write_rows`) + - work for compaction (`churn_rows`) + - reads, checking we get the right data (`validate`) + """ + + def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId): + self.env = env + self.tenant_id = tenant_id + self.timeline_id = timeline_id + self.table = "foo" + + self.expect_rows = 0 + self.churn_cursor = 0 + + self._endpoint: Optional[Endpoint] = None + + def endpoint(self, pageserver_id: int) -> Endpoint: + if self._endpoint is None: + self._endpoint = self.env.endpoints.create( + "main", + tenant_id=self.tenant_id, + pageserver_id=pageserver_id, + endpoint_id="ep-workload", + ) + self._endpoint.start(pageserver_id=pageserver_id) + else: + self._endpoint.reconfigure(pageserver_id=pageserver_id) + + connstring = self._endpoint.safe_psql( + "SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'" + ) + log.info(f"Workload.endpoint: connstr={connstring}") + + return self._endpoint + + def __del__(self): + if self._endpoint is not None: + self._endpoint.stop() + + def init(self, pageserver_id: int): + endpoint = self.endpoint(pageserver_id) + + endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);") + endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") + last_flush_lsn_upload( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + + def write_rows(self, n, pageserver_id): + endpoint = self.endpoint(pageserver_id) + start = self.expect_rows + end = start + n - 1 + self.expect_rows += n + dummy_value = "blah" + endpoint.safe_psql( + f""" + INSERT INTO {self.table} (id, val) + SELECT g, '{dummy_value}' + FROM generate_series({start}, {end}) g + """ + ) + + return last_flush_lsn_upload( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + + def churn_rows(self, n, pageserver_id, upload=True): + assert self.expect_rows >= n + + max_iters = 10 + endpoint = self.endpoint(pageserver_id) + todo = n + i = 0 + while todo > 0: + i += 1 + if i > max_iters: + raise RuntimeError("oops") + start = self.churn_cursor % self.expect_rows + n_iter = min((self.expect_rows - start), todo) + todo -= n_iter + + end = start + n_iter - 1 + + log.info( + f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}" + ) + + assert end < self.expect_rows + + self.churn_cursor += n_iter + dummy_value = "blah" + endpoint.safe_psql_many( + [ + f""" + INSERT INTO {self.table} (id, val) + SELECT g, '{dummy_value}' + FROM generate_series({start}, {end}) g + ON CONFLICT (id) DO UPDATE + SET val = EXCLUDED.val + """, + f"VACUUM {self.table}", + ] + ) + + last_flush_lsn = wait_for_last_flush_lsn( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + ps_http = self.env.get_pageserver(pageserver_id).http_client() + wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + + if upload: + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id) + wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") + else: + log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") + + def validate(self, pageserver_id): + endpoint = self.endpoint(pageserver_id) + result = endpoint.safe_psql_many( + [ + "select clear_buffer_cache()", + f""" + SELECT COUNT(*) FROM {self.table} + """, + ] + ) + + log.info(f"validate({self.expect_rows}): {result}") + assert result == [[("",)], [(self.expect_rows,)]] diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 66cc286aba..4488be31c5 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -23,14 +23,20 @@ from fixtures.neon_fixtures import ( PgBin, S3Scrubber, last_flush_lsn_upload, - wait_for_last_flush_lsn, ) -from fixtures.pageserver.utils import list_prefix +from fixtures.pageserver.http import PageserverApiException +from fixtures.pageserver.utils import ( + assert_tenant_state, + list_prefix, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.remote_storage import ( RemoteStorageKind, ) from fixtures.types import TenantId, TimelineId from fixtures.utils import print_gc_result, wait_until +from fixtures.workload import Workload # A tenant configuration that is convenient for generating uploads and deletions # without a large amount of postgres traffic. @@ -93,7 +99,10 @@ def generate_uploads_and_deletions( ) assert tenant_id is not None assert timeline_id is not None - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + # We are waiting for uploads as well as local flush, in order to avoid leaving the system + # in a state where there are "future layers" in remote storage that will generate deletions + # after a restart. + last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) ps_http.timeline_checkpoint(tenant_id, timeline_id) # Compaction should generate some GC-elegible layers @@ -560,3 +569,91 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder): read_all(env, tenant_id, timeline_id) evict_all_layers(env, tenant_id, timeline_id) read_all(env, tenant_id, timeline_id) + + +def test_multi_attach( + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, +): + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 3 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + + pageservers = env.pageservers + http_clients = list([p.http_client() for p in pageservers]) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # We will intentionally create situations where stale deletions happen from non-latest-generation + # nodes when the tenant is multiply-attached + for ps in env.pageservers: + ps.allowed_errors.extend( + [".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"] + ) + + # Initially, the tenant will be attached to the first pageserver (first is default in our test harness) + wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[0], tenant_id, "Active")) + _detail = http_clients[0].timeline_detail(tenant_id, timeline_id) + with pytest.raises(PageserverApiException): + http_clients[1].timeline_detail(tenant_id, timeline_id) + with pytest.raises(PageserverApiException): + http_clients[2].timeline_detail(tenant_id, timeline_id) + + workload = Workload(env, tenant_id, timeline_id) + workload.init(pageservers[0].id) + workload.write_rows(1000, pageservers[0].id) + + # Attach the tenant to the other two pageservers + pageservers[1].tenant_attach(env.initial_tenant) + pageservers[2].tenant_attach(env.initial_tenant) + + wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[1], tenant_id, "Active")) + wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[2], tenant_id, "Active")) + + # Now they all have it attached + _details = list([c.timeline_detail(tenant_id, timeline_id) for c in http_clients]) + _detail = http_clients[1].timeline_detail(tenant_id, timeline_id) + _detail = http_clients[2].timeline_detail(tenant_id, timeline_id) + + # The endpoint can use any pageserver to service its reads + for pageserver in pageservers: + workload.validate(pageserver.id) + + # If we write some more data, all the nodes can see it, including stale ones + wrote_lsn = workload.write_rows(1000, pageservers[0].id) + for ps_http in http_clients: + wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, wrote_lsn) + + # ...and indeed endpoints can see it via any of the pageservers + for pageserver in pageservers: + workload.validate(pageserver.id) + + # Prompt all the pageservers, including stale ones, to upload ingested layers to remote storage + for ps_http in http_clients: + ps_http.timeline_checkpoint(tenant_id, timeline_id) + wait_for_upload(ps_http, tenant_id, timeline_id, wrote_lsn) + + # Now, the contents of remote storage will be a set of layers from each pageserver, but with unique + # generation numbers + # TODO: validate remote storage contents + + # Stop all pageservers + for ps in pageservers: + ps.stop() + + # Returning to a normal healthy state: all pageservers will start, but only the one most + # recently attached via the control plane will re-attach on startup + for ps in pageservers: + ps.start() + + with pytest.raises(PageserverApiException): + _detail = http_clients[0].timeline_detail(tenant_id, timeline_id) + with pytest.raises(PageserverApiException): + _detail = http_clients[1].timeline_detail(tenant_id, timeline_id) + _detail = http_clients[2].timeline_detail(tenant_id, timeline_id) + + # All data we wrote while multi-attached remains readable + workload.validate(pageservers[2].id) diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py new file mode 100644 index 0000000000..b14b7f1328 --- /dev/null +++ b/test_runner/regress/test_pageserver_secondary.py @@ -0,0 +1,332 @@ +import random +from typing import Any, Dict, Optional + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver +from fixtures.remote_storage import RemoteStorageKind +from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until +from fixtures.workload import Workload + +# A tenant configuration that is convenient for generating uploads and deletions +# without a large amount of postgres traffic. +TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_target_size": f"{128 * 1024}", + "compaction_threshold": "1", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", +} + + +def evict_random_layers( + rng: random.Random, pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId +): + """ + Evict 50% of the layers on a pageserver + """ + timeline_path = pageserver.timeline_dir(tenant_id, timeline_id) + initial_local_layers = sorted( + list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) + ) + client = pageserver.http_client() + for layer in initial_local_layers: + if "ephemeral" in layer.name or "temp_download" in layer.name: + continue + + if rng.choice([True, False]): + log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}") + client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name) + + +@pytest.mark.parametrize("seed", [1, 2, 3]) +def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int): + """ + Issue many location configuration changes, ensure that tenants + remain readable & we don't get any unexpected errors. We should + have no ERROR in the log, and no 500s in the API. + + The location_config API is intentionally designed so that all destination + states are valid, so that we may test it in this way: the API should always + work as long as the tenant exists. + """ + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 3 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + pageservers = env.pageservers + list([p.http_client() for p in pageservers]) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # We will make no effort to avoid stale attachments + for ps in env.pageservers: + ps.allowed_errors.extend( + [ + ".*Dropped remote consistent LSN updates.*", + ".*Dropping stale deletions.*", + # page_service_conn_main{peer_addr=[::1]:41176}: query handler for 'pagestream 3b19aec5038c796f64b430b30a555121 d07776761d44050b8aab511df1657d83' failed: Tenant 3b19aec5038c796f64b430b30a555121 not found + ".*query handler.*Tenant.*not found.*", + # page_service_conn_main{peer_addr=[::1]:45552}: query handler for 'pagestream 414ede7ad50f775a8e7d9ba0e43b9efc a43884be16f44b3626482b6981b2c745' failed: Tenant 414ede7ad50f775a8e7d9ba0e43b9efc is not active + ".*query handler.*Tenant.*not active.*", + ] + ) + + # these can happen, if we shutdown at a good time. to be fixed as part of #5172. + message = ".*duplicated L1 layer layer=.*" + ps.allowed_errors.append(message) + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, env.pageservers[0].id) + + # We use a fixed seed to make the test reproducible: we want a randomly + # chosen order, but not to change the order every time we run the test. + rng = random.Random(seed) + + initial_generation = 1 + last_state = { + env.pageservers[0].id: ("AttachedSingle", initial_generation), + env.pageservers[1].id: ("Detached", None), + env.pageservers[2].id: ("Detached", None), + } + + latest_attached = env.pageservers[0].id + + for _i in range(0, 64): + # Pick a pageserver + pageserver = rng.choice(env.pageservers) + + # Pick a pseudorandom state + modes = [ + "AttachedSingle", + "AttachedMulti", + "AttachedStale", + "Secondary", + "Detached", + "_Evictions", + "_Restart", + ] + + mode = rng.choice(modes) + + last_state_ps = last_state[pageserver.id] + if mode == "_Evictions": + if last_state_ps[0].startswith("Attached"): + log.info(f"Action: evictions on pageserver {pageserver.id}") + evict_random_layers(rng, pageserver, tenant_id, timeline_id) + else: + log.info( + f"Action: skipping evictions on pageserver {pageserver.id}, is not attached" + ) + elif mode == "_Restart": + log.info(f"Action: restarting pageserver {pageserver.id}") + pageserver.stop() + pageserver.start() + if last_state_ps[0].startswith("Attached") and latest_attached == pageserver.id: + log.info("Entering postgres...") + workload.churn_rows(rng.randint(128, 256), pageserver.id) + workload.validate(pageserver.id) + elif last_state_ps[0].startswith("Attached"): + # The `attachment_service` will only re-attach on startup when a pageserver was the + # holder of the latest generation: otherwise the pageserver will revert to detached + # state if it was running attached with a stale generation + last_state[pageserver.id] = ("Detached", None) + else: + secondary_conf: Optional[Dict[str, Any]] = None + if mode == "Secondary": + secondary_conf = {"warm": rng.choice([True, False])} + + location_conf: Dict[str, Any] = { + "mode": mode, + "secondary_conf": secondary_conf, + "tenant_conf": {}, + } + + log.info(f"Action: Configuring pageserver {pageserver.id} to {location_conf}") + + # Select a generation number + if mode.startswith("Attached"): + if last_state_ps[1] is not None: + if rng.choice([True, False]): + # Move between attached states, staying in the same generation + generation = last_state_ps[1] + else: + # Switch generations, while also jumping between attached states + generation = env.attachment_service.attach_hook_issue( + tenant_id, pageserver.id + ) + latest_attached = pageserver.id + else: + generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver.id) + latest_attached = pageserver.id + else: + generation = None + + location_conf["generation"] = generation + + pageserver.tenant_location_configure(tenant_id, location_conf) + last_state[pageserver.id] = (mode, generation) + + if mode.startswith("Attached"): + # This is a basic test: we are validating that he endpoint works properly _between_ + # configuration changes. A stronger test would be to validate that clients see + # no errors while we are making the changes. + workload.churn_rows( + rng.randint(128, 256), pageserver.id, upload=mode != "AttachedStale" + ) + workload.validate(pageserver.id) + + # Attach all pageservers + for ps in env.pageservers: + location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}} + ps.tenant_location_configure(tenant_id, location_conf) + + # Confirm that all are readable + for ps in env.pageservers: + workload.validate(ps.id) + + # Detach all pageservers + for ps in env.pageservers: + location_conf = {"mode": "Detached", "secondary_conf": None, "tenant_conf": {}} + ps.tenant_location_configure(tenant_id, location_conf) + + # Confirm that all local disk state was removed on detach + # TODO + + +def test_live_migration(neon_env_builder: NeonEnvBuilder): + """ + Test the sequence of location states that are used in a live migration. + """ + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + pageserver_a = env.pageservers[0] + pageserver_b = env.pageservers[1] + + initial_generation = 1 + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, env.pageservers[0].id) + + # Make the destination a secondary location + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + + workload.churn_rows(64, pageserver_a.id, upload=False) + + # Set origin attachment to stale + log.info("Setting origin to AttachedStale") + pageserver_a.tenant_location_configure( + tenant_id, + { + "mode": "AttachedStale", + "secondary_conf": None, + "tenant_conf": {}, + "generation": initial_generation, + }, + flush_ms=5000, + ) + + migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id) + log.info(f"Acquired generation {migrated_generation} for destination pageserver") + assert migrated_generation == initial_generation + 1 + + # Writes and reads still work in AttachedStale. + workload.validate(pageserver_a.id) + + # TODO: call into secondary mode API hooks to do an upload/download sync + + # Generate some more dirty writes: we expect the origin to ingest WAL in + # in AttachedStale + workload.churn_rows(64, pageserver_a.id, upload=False) + workload.validate(pageserver_a.id) + + # Attach the destination + log.info("Setting destination to AttachedMulti") + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "AttachedMulti", + "secondary_conf": None, + "tenant_conf": {}, + "generation": migrated_generation, + }, + ) + + # Wait for destination LSN to catch up with origin + origin_lsn = pageserver_a.http_client().timeline_detail(tenant_id, timeline_id)[ + "last_record_lsn" + ] + + def caught_up(): + destination_lsn = pageserver_b.http_client().timeline_detail(tenant_id, timeline_id)[ + "last_record_lsn" + ] + log.info( + f"Waiting for LSN to catch up: origin {origin_lsn} vs destination {destination_lsn}" + ) + assert destination_lsn >= origin_lsn + + wait_until(100, 0.1, caught_up) + + # The destination should accept writes + workload.churn_rows(64, pageserver_b.id) + + # Dual attached: both are readable. + workload.validate(pageserver_a.id) + workload.validate(pageserver_b.id) + + # Revert the origin to secondary + log.info("Setting origin to Secondary") + pageserver_a.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + + workload.churn_rows(64, pageserver_b.id) + + # Put the destination into final state + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": migrated_generation, + }, + ) + + workload.churn_rows(64, pageserver_b.id) + workload.validate(pageserver_b.id)