diff --git a/test_runner/regress/test_timeline_archive.py b/test_runner/regress/test_timeline_archive.py index 3e9812c38a..d3839e3d2c 100644 --- a/test_runner/regress/test_timeline_archive.py +++ b/test_runner/regress/test_timeline_archive.py @@ -1,15 +1,22 @@ from __future__ import annotations +import json +from typing import Optional + import pytest from fixtures.common_types import TenantId, TimelineArchivalState, TimelineId +from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException -from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty -from fixtures.remote_storage import s3_storage +from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty, list_prefix +from fixtures.remote_storage import S3Storage, s3_storage from fixtures.utils import wait_until +from mypy_boto3_s3.type_defs import ( + ObjectTypeDef, +) @pytest.mark.parametrize("shard_count", [0, 4]) @@ -369,3 +376,146 @@ def test_timeline_offload_persist(neon_env_builder: NeonEnvBuilder, delete_timel neon_env_builder.pageserver_remote_storage, prefix=f"tenants/{str(tenant_id)}/", ) + + +@pytest.mark.parametrize("offload_child", ["offload", "offload-corrupt", "archive", None]) +def test_timeline_retain_lsn(neon_env_builder: NeonEnvBuilder, offload_child: Optional[str]): + """ + Ensure that retain_lsn functionality for timelines works, both for offloaded and non-offloaded ones + """ + if offload_child == "offload-corrupt": + # Our corruption code only works with S3 compatible storage + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + + # Turn off gc and compaction loops: we want to issue them manually for better reliability + tenant_id, root_timeline_id = env.create_tenant( + conf={ + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": 128 * 1024, + "compaction_threshold": 1, + "compaction_target_size": 128 * 1024, + # set small image creation thresholds so that gc deletes data + "image_creation_threshold": 2, + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # Disable pitr, we only want the latest lsn + "pitr_interval": "0s", + # Don't rely on endpoint lsn leases + "lsn_lease_length": "0s", + } + ) + + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + endpoint.safe_psql_many( + [ + "CREATE TABLE foo(v int, key serial primary key, t text default 'data_content')", + "SELECT setseed(0.4321)", + "INSERT INTO foo SELECT v FROM (SELECT generate_series(1,2048), (random() * 409600)::int as v) as random_numbers", + ] + ) + pre_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") + log.info(f"Pre branch sum: {pre_branch_sum}") + last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) + + # Create a branch and write some additional data to the parent + child_timeline_id = env.create_branch("test_archived_branch", tenant_id) + + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + # Do some churn of the data. This is important so that we can overwrite image layers. + for i in range(10): + endpoint.safe_psql_many( + [ + f"SELECT setseed(0.23{i})", + "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 2", + "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 1", + "UPDATE foo SET v=(random() * 409600)::int WHERE v % 3 = 0", + ] + ) + post_branch_sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") + log.info(f"Post branch sum: {post_branch_sum}") + last_flush_lsn_upload(env, endpoint, tenant_id, root_timeline_id) + + if offload_child is not None: + ps_http.timeline_archival_config( + tenant_id, + child_timeline_id, + state=TimelineArchivalState.ARCHIVED, + ) + leaf_detail = ps_http.timeline_detail( + tenant_id, + child_timeline_id, + ) + assert leaf_detail["is_archived"] is True + if "offload" in offload_child: + ps_http.timeline_offload(tenant_id, child_timeline_id) + + # Do a restart to get rid of any in-memory objects (we only init gc info once, at attach) + env.pageserver.stop() + if offload_child == "offload-corrupt": + assert isinstance(env.pageserver_remote_storage, S3Storage) + listing = list_prefix( + env.pageserver_remote_storage, f"tenants/{str(tenant_id)}/tenant-manifest" + ) + objects: list[ObjectTypeDef] = listing.get("Contents", []) + assert len(objects) > 0 + remote_key: str = str(objects[0].get("Key", [])) + local_path = str(env.repo_dir / "tenant-manifest.json") + + log.info(f"Downloading {remote_key} -> {local_path}") + env.pageserver_remote_storage.client.download_file( + env.pageserver_remote_storage.bucket_name, remote_key, local_path + ) + + log.info(f"Corrupting {local_path}") + with open(local_path) as manifest_json_file: + manifest_json = json.load(manifest_json_file) + for offloaded_timeline in manifest_json["offloaded_timelines"]: + offloaded_timeline["ancestor_retain_lsn"] = None + with open(local_path, "w") as manifest_json_file: + json.dump(manifest_json, manifest_json_file) + + log.info(f"Uploading {local_path} -> {remote_key}") + env.pageserver_remote_storage.client.upload_file( + local_path, env.pageserver_remote_storage.bucket_name, remote_key + ) + # The point of our earlier efforts was to provoke these + env.pageserver.allowed_errors.extend( + [ + ".*initial size calculation failed: PageRead.MissingKey.could not find data for key.*", + ".*page_service_conn_main.*could not find data for key.*", + ] + ) + env.pageserver.start() + + # Do an agressive gc and compaction of the parent branch + ps_http.timeline_gc(tenant_id=tenant_id, timeline_id=root_timeline_id, gc_horizon=0) + ps_http.timeline_checkpoint( + tenant_id, + root_timeline_id, + force_l0_compaction=True, + force_repartition=True, + wait_until_uploaded=True, + compact=True, + ) + + if offload_child is not None: + ps_http.timeline_archival_config( + tenant_id, + child_timeline_id, + state=TimelineArchivalState.UNARCHIVED, + ) + + # Now, after unarchival, the child timeline should still have its data accessible (or corrupted) + if offload_child == "offload-corrupt": + with pytest.raises(RuntimeError, match=".*failed to get basebackup.*"): + env.endpoints.create_start( + "test_archived_branch", tenant_id=tenant_id, basebackup_request_tries=1 + ) + else: + with env.endpoints.create_start("test_archived_branch", tenant_id=tenant_id) as endpoint: + sum = endpoint.safe_psql("SELECT sum(key) from foo where v < 51200") + assert sum == pre_branch_sum