From 7262096b74b07d590cd9d17889beb2af15b0edca Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 18 Nov 2024 09:54:51 +0000 Subject: [PATCH] tests: add test_sharding_gc --- test_runner/fixtures/remote_storage.py | 16 ++-- test_runner/regress/test_sharding.py | 110 ++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 6 deletions(-) diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 7024953661..c630ea98b4 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -77,14 +77,16 @@ class MockS3Server: class LocalFsStorage: root: Path - def tenant_path(self, tenant_id: TenantId) -> Path: + def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path: return self.root / "tenants" / str(tenant_id) - def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def timeline_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: return self.tenant_path(tenant_id) / "timelines" / str(timeline_id) def timeline_latest_generation( - self, tenant_id: TenantId, timeline_id: TimelineId + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId ) -> Optional[int]: timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id)) index_parts = [f for f in timeline_files if f.startswith("index_part")] @@ -102,7 +104,9 @@ class LocalFsStorage: raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}") return generations[-1] - def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def index_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: latest_gen = self.timeline_latest_generation(tenant_id, timeline_id) if latest_gen is None: filename = TIMELINE_INDEX_PART_FILE_NAME @@ -126,7 +130,9 @@ class LocalFsStorage: filename = f"{local_name}-{generation:08x}" return self.timeline_path(tenant_id, timeline_id) / filename - def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any: + def index_content( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Any: with self.index_path(tenant_id, timeline_id).open("r") as f: return json.load(f) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 0a4a53356d..9f0ceb296b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -19,7 +19,7 @@ from fixtures.neon_fixtures import ( wait_for_last_flush_lsn, ) from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty -from fixtures.remote_storage import s3_storage +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage from fixtures.utils import skip_in_debug_build, wait_until from fixtures.workload import Workload from pytest_httpserver import HTTPServer @@ -1685,3 +1685,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder): ) assert len(top["shards"]) == n_tenants - 4 assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:]) + + +def test_sharding_gc( + neon_env_builder: NeonEnvBuilder, +): + """ + Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as + the "leader" for GC, and other shards read its index to learn what LSN they should + GC up to. + """ + + shard_count = 4 + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": 128 * 1024, + "compaction_threshold": 1, + "compaction_target_size": 128 * 1024, + # A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to + # happen. + "pitr_interval": "1s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # Disable automatic creation of image layers, as we will create them explicitly when we want them + "image_creation_threshold": 9999, + "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", + } + env = neon_env_builder.init_start( + initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Create a branch and write some data + workload = Workload(env, tenant_id, timeline_id) + initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + log.info(f"Started at LSN: {initial_lsn}") + + workload.init() + + # Write enough data to generate multiple layers + for _i in range(10): + last_lsn = workload.write_rows(32) + + assert last_lsn > initial_lsn + + log.info(f"Wrote up to last LSN: {last_lsn}") + + # Do full image layer generation. When we subsequently wait for PITR, all historic deltas + # should be GC-able + for shard_number in range(shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_compact( + shard, timeline_id, force_image_layer_creation=True + ) + + workload.churn_rows(32) + + time.sleep(5) + + # Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance + shard_one = TenantShardId(tenant_id, 1, shard_count) + env.get_tenant_pageserver(shard_one).http_client().timeline_gc( + shard_one, timeline_id, gc_horizon=None + ) + + # Check shard 1's index - GC cutoff LSN should not have advanced + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_1_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_one, timeline_id=timeline_id + ) + shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}") + assert shard_1_gc_cutoff_lsn <= last_lsn + + shard_zero = TenantShardId(tenant_id, 0, shard_count) + env.get_tenant_pageserver(shard_zero).http_client().timeline_gc( + shard_zero, timeline_id, gc_horizon=None + ) + + # TODO: observe that GC LSN of shard 0 has moved forward in remote storage + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_0_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_zero, timeline_id=timeline_id + ) + shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}") + assert shard_0_gc_cutoff_lsn >= last_lsn + + # Invoke GC on all other shards and verify their GC cutoff LSNs + for shard_number in range(1, shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_gc( + shard, timeline_id, gc_horizon=None + ) + + # Verify GC cutoff LSN advanced to match shard 0 + shard_index = env.pageserver_remote_storage.index_content( + tenant_id=shard, timeline_id=timeline_id + ) + shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}") + assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn