diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f3cb1aa370..5f70f37a90 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1860,7 +1860,7 @@ class NeonPageserver(PgProtocol): client = self.http_client() return client.tenant_attach(tenant_id, config, config_null, generation=generation) - def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any]): + def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs): # This API is only for use when generations are enabled assert self.env.attachment_service is not None @@ -1868,7 +1868,7 @@ class NeonPageserver(PgProtocol): config["generation"] = self.env.attachment_service.attach_hook(tenant_id, self.id) client = self.http_client() - return client.tenant_location_conf(tenant_id, config) + return client.tenant_location_conf(tenant_id, config, **kwargs) def read_tenant_location_conf(self, tenant_id: TenantId) -> dict[str, Any]: path = self.tenant_dir(tenant_id) / "config-v1" @@ -2741,6 +2741,7 @@ class EndpointFactory: lsn: Optional[Lsn] = None, hot_standby: bool = False, config_lines: Optional[List[str]] = None, + pageserver_id: Optional[int] = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -2760,6 +2761,7 @@ class EndpointFactory: lsn=lsn, hot_standby=hot_standby, config_lines=config_lines, + pageserver_id=pageserver_id, ) def stop_all(self) -> "EndpointFactory": diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index ae11859065..6526e4abc1 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -251,11 +251,20 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) self.verbose_error(res) - def tenant_location_conf(self, tenant_id: TenantId, location_conf=dict[str, Any]): + def tenant_location_conf( + self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None + ): body = location_conf.copy() body["tenant_id"] = str(tenant_id) + + params = {} + if flush_ms is not None: + params["flush_ms"] = str(flush_ms) + res = self.put( - f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config", json=body + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config", + json=body, + params=params, ) self.verbose_error(res) @@ -662,6 +671,14 @@ class PageserverHttpClient(requests.Session): res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break") self.verbose_error(res) + def secondary_tenant_upload(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/upload") + self.verbose_error(res) + + def secondary_tenant_download(self, tenant_id: TenantId): + res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/download") + self.verbose_error(res) + def post_tracing_event(self, level: str, message: str): res = self.post( f"http://localhost:{self.port}/v1/tracing/event", diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index 007ff387f4..edbe226af0 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -47,7 +47,7 @@ def wait_for_upload( for i in range(20): current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline) if current_lsn >= lsn: - log.info("wait finished") + log.info(f"wait finished: current remote consistent lsn {current_lsn}") return lr_lsn = last_record_lsn(pageserver_http, tenant, timeline) log.info( diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index b17bca5fe3..241531437c 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -1,5 +1,13 @@ +from typing import Optional + from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, last_flush_lsn_upload +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + last_flush_lsn_upload, + wait_for_last_flush_lsn, +) +from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import TenantId, TimelineId @@ -21,43 +29,82 @@ class Workload: self.expect_rows = 0 self.churn_cursor = 0 - def endpoint(self, pageserver_id): - return self.env.endpoints.create_start( - "main", tenant_id=self.tenant_id, pageserver_id=pageserver_id + self._endpoint: Optional[Endpoint] = None + + def endpoint(self, pageserver_id: int) -> Endpoint: + if self._endpoint is None: + self._endpoint = self.env.endpoints.create( + "main", + tenant_id=self.tenant_id, + pageserver_id=pageserver_id, + endpoint_id="ep-workload", + ) + self._endpoint.start(pageserver_id=pageserver_id) + else: + self._endpoint.reconfigure(pageserver_id=pageserver_id) + + connstring = self._endpoint.safe_psql( + "SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'" ) + log.info(f"Workload.endpoint: connstr={connstring}") + + return self._endpoint + + def __del__(self): + if self._endpoint is not None: + self._endpoint.stop() def init(self, pageserver_id: int): - with self.endpoint(pageserver_id) as endpoint: - endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text)") - last_flush_lsn_upload( - self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id - ) + endpoint = self.endpoint(pageserver_id) + + endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);") + endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;") + last_flush_lsn_upload( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) def write_rows(self, n, pageserver_id): - with self.endpoint(pageserver_id) as endpoint: - start = self.expect_rows - end = start + n - 1 - self.expect_rows += n - dummy_value = "blah" - endpoint.safe_psql( - f""" - INSERT INTO {self.table} (id, val) - SELECT g, '{dummy_value}' - FROM generate_series({start}, {end}) g - """ - ) + endpoint = self.endpoint(pageserver_id) + start = self.expect_rows + end = start + n - 1 + self.expect_rows += n + dummy_value = "blah" + endpoint.safe_psql( + f""" + INSERT INTO {self.table} (id, val) + SELECT g, '{dummy_value}' + FROM generate_series({start}, {end}) g + """ + ) - return last_flush_lsn_upload( - self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id - ) + return last_flush_lsn_upload( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) - def churn_rows(self, n, pageserver_id): + def churn_rows(self, n, pageserver_id, upload=True): assert self.expect_rows >= n - with self.endpoint(pageserver_id) as endpoint: - start = self.churn_cursor % (self.expect_rows) - end = (self.churn_cursor + n - 1) % (self.expect_rows) - self.churn_cursor += n + max_iters = 10 + endpoint = self.endpoint(pageserver_id) + todo = n + i = 0 + while todo > 0: + i += 1 + if i > max_iters: + raise RuntimeError("oops") + start = self.churn_cursor % self.expect_rows + n_iter = min((self.expect_rows - start), todo) + todo -= n_iter + + end = start + n_iter - 1 + + log.info( + f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}" + ) + + assert end < self.expect_rows + + self.churn_cursor += n_iter dummy_value = "blah" endpoint.safe_psql_many( [ @@ -72,17 +119,30 @@ class Workload: ] ) - return last_flush_lsn_upload( - self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id - ) + last_flush_lsn = wait_for_last_flush_lsn( + self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id + ) + ps_http = self.env.get_pageserver(pageserver_id).http_client() + wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + + if upload: + # force a checkpoint to trigger upload + ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id) + wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn) + log.info(f"Churn: waiting for remote LSN {last_flush_lsn}") + else: + log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}") def validate(self, pageserver_id): - with self.endpoint(pageserver_id) as endpoint: - result = endpoint.safe_psql( + endpoint = self.endpoint(pageserver_id) + result = endpoint.safe_psql_many( + [ + "select clear_buffer_cache()", f""" - SELECT COUNT(*) FROM {self.table} - """ - ) + SELECT COUNT(*) FROM {self.table} + """, + ] + ) - log.info(f"validate({self.expect_rows}): {result}") - assert result == [(self.expect_rows,)] + log.info(f"validate({self.expect_rows}): {result}") + assert result == [[("",)], [(self.expect_rows,)]] diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index f3f3a1ddf3..f1948bbcc9 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -8,6 +8,7 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + NeonPageserver, PgBin, wait_for_last_flush_lsn, ) @@ -73,14 +74,21 @@ class EvictionEnv: layer_size: int pgbench_init_lsns: Dict[TenantId, Lsn] - def timelines_du(self) -> Tuple[int, int, int]: + @property + def pageserver(self): + """ + Shortcut for tests that only use one pageserver. + """ + return self.neon_env.pageserver + + def timelines_du(self, pageserver: NeonPageserver) -> Tuple[int, int, int]: return poor_mans_du( - self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], verbose=False + self.neon_env, [(tid, tlid) for tid, tlid in self.timelines], pageserver, verbose=False ) - def du_by_timeline(self) -> Dict[Tuple[TenantId, TimelineId], int]: + def du_by_timeline(self, pageserver: NeonPageserver) -> Dict[Tuple[TenantId, TimelineId], int]: return { - (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], verbose=True)[0] + (tid, tlid): poor_mans_du(self.neon_env, [(tid, tlid)], pageserver, verbose=True)[0] for tid, tlid in self.timelines } @@ -108,7 +116,7 @@ class EvictionEnv: _avg = cur.fetchone() def pageserver_start_with_disk_usage_eviction( - self, period, max_usage_pct, min_avail_bytes, mock_behavior + self, pageserver: NeonPageserver, period, max_usage_pct, min_avail_bytes, mock_behavior ): disk_usage_config = { "period": period, @@ -119,7 +127,12 @@ class EvictionEnv: enc = toml.TomlEncoder() - self.neon_env.pageserver.start( + # these can sometimes happen during startup before any tenants have been + # loaded, so nothing can be evicted, we just wait for next iteration which + # is able to evict. + pageserver.allowed_errors.append(".*WARN.* disk usage still high.*") + + pageserver.start( overrides=( "--pageserver-config-override=disk_usage_based_eviction=" + enc.dump_inline_table(disk_usage_config).replace("\n", " "), @@ -133,15 +146,10 @@ class EvictionEnv: ) def statvfs_called(): - assert self.neon_env.pageserver.log_contains(".*running mocked statvfs.*") + assert pageserver.log_contains(".*running mocked statvfs.*") wait_until(10, 1, statvfs_called) - # these can sometimes happen during startup before any tenants have been - # loaded, so nothing can be evicted, we just wait for next iteration which - # is able to evict. - self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*") - def human_bytes(amt: float) -> str: suffixes = ["", "Ki", "Mi", "Gi"] @@ -158,21 +166,42 @@ def human_bytes(amt: float) -> str: @pytest.fixture def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: + return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=1) + + +@pytest.fixture +def eviction_env_ha(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv: + """ + Variant of the eviction environment with two pageservers for testing eviction on + HA configurations with a secondary location. + """ + return _eviction_env(request, neon_env_builder, pg_bin, num_pageservers=2) + + +def _eviction_env( + request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int +) -> EvictionEnv: """ Creates two tenants, one somewhat larger than the other. """ log.info(f"setting up eviction_env for test {request.node.name}") + neon_env_builder.num_pageservers = 2 neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + if num_pageservers >= 2: + neon_env_builder.enable_generations = True # initial tenant will not be present on this pageserver env = neon_env_builder.init_configs() env.start() - pageserver_http = env.pageserver.http_client() + + # We will create all tenants on the 0th pageserver + pageserver_http = env.pageservers[0].http_client() # allow because we are invoking this manually; we always warn on executing disk based eviction - env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") + for pageserver in env.pageservers: + pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*") # Choose small layer_size so that we can use low pgbench_scales and still get a large count of layers. # Large count of layers and small layer size is good for testing because it makes evictions predictable. @@ -197,7 +226,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()]) - wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id) + wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id, pageserver_id=1) timelines.append((tenant_id, timeline_id)) @@ -245,10 +274,10 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): healthy_tenant_id, healthy_timeline_id = env.timelines[1] broken_size_pre, _, _ = poor_mans_du( - env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True + env.neon_env, [(broken_tenant_id, broken_timeline_id)], env.pageserver, verbose=True ) healthy_size_pre, _, _ = poor_mans_du( - env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True + env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], env.pageserver, verbose=True ) # try to evict everything, then validate that broken tenant wasn't touched @@ -258,10 +287,10 @@ def test_broken_tenants_are_skipped(eviction_env: EvictionEnv): log.info(f"{response}") broken_size_post, _, _ = poor_mans_du( - env.neon_env, [(broken_tenant_id, broken_timeline_id)], verbose=True + env.neon_env, [(broken_tenant_id, broken_timeline_id)], env.pageserver, verbose=True ) healthy_size_post, _, _ = poor_mans_du( - env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], verbose=True + env.neon_env, [(healthy_tenant_id, healthy_timeline_id)], env.pageserver, verbose=True ) assert broken_size_pre == broken_size_post, "broken tenant should not be touched" @@ -277,14 +306,14 @@ def test_pageserver_evicts_until_pressure_is_relieved(eviction_env: EvictionEnv) env = eviction_env pageserver_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) target = total_on_disk // 2 response = pageserver_http.disk_usage_eviction_run({"evict_bytes": target}) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk @@ -303,8 +332,8 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv) env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() - du_by_timeline = env.du_by_timeline() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) + du_by_timeline = env.du_by_timeline(env.pageserver) log.info("du_by_timeline: %s", du_by_timeline) assert len(du_by_timeline) == 2, "this test assumes two tenants" @@ -344,8 +373,8 @@ def test_pageserver_respects_overridden_resident_size(eviction_env: EvictionEnv) GLOBAL_LRU_LOG_LINE, ), "this test is pointless if it fell back to global LRU" - (later_total_on_disk, _, _) = env.timelines_du() - later_du_by_timeline = env.du_by_timeline() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) + later_du_by_timeline = env.du_by_timeline(env.pageserver) log.info("later_du_by_timeline: %s", later_du_by_timeline) actual_change = total_on_disk - later_total_on_disk @@ -373,13 +402,13 @@ def test_pageserver_falls_back_to_global_lru(eviction_env: EvictionEnv): env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) target = total_on_disk response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk assert 0 <= actual_change, "nothing can load layers during this test" assert actual_change >= target, "eviction must always evict more than target" @@ -399,8 +428,8 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv): env = eviction_env ps_http = env.pageserver_http - (total_on_disk, _, _) = env.timelines_du() - du_by_timeline = env.du_by_timeline() + (total_on_disk, _, _) = env.timelines_du(env.pageserver) + du_by_timeline = env.du_by_timeline(env.pageserver) # pick any tenant [warm, cold] = list(du_by_timeline.keys()) @@ -416,12 +445,12 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv): response = ps_http.disk_usage_eviction_run({"evict_bytes": target}) log.info(f"{response}") - (later_total_on_disk, _, _) = env.timelines_du() + (later_total_on_disk, _, _) = env.timelines_du(env.pageserver) actual_change = total_on_disk - later_total_on_disk assert 0 <= actual_change, "nothing can load layers during this test" assert actual_change >= target, "eviction must always evict more than target" - later_du_by_timeline = env.du_by_timeline() + later_du_by_timeline = env.du_by_timeline(env.pageserver) for tenant, later_tenant_usage in later_du_by_timeline.items(): assert ( later_tenant_usage < du_by_timeline[tenant] @@ -453,7 +482,10 @@ def test_partial_evict_tenant(eviction_env: EvictionEnv): def poor_mans_du( - env: NeonEnv, timelines: list[Tuple[TenantId, TimelineId]], verbose: bool = False + env: NeonEnv, + timelines: list[Tuple[TenantId, TimelineId]], + pageserver: NeonPageserver, + verbose: bool = False, ) -> Tuple[int, int, int]: """ Disk usage, largest, smallest layer for layer files over the given (tenant, timeline) tuples; @@ -463,7 +495,7 @@ def poor_mans_du( largest_layer = 0 smallest_layer = None for tenant_id, timeline_id in timelines: - timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id) + timeline_dir = pageserver.timeline_dir(tenant_id, timeline_id) assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}" total = 0 for file in timeline_dir.iterdir(): @@ -494,6 +526,7 @@ def test_statvfs_error_handling(eviction_env: EvictionEnv): env = eviction_env env.neon_env.pageserver.stop() env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=90, min_avail_bytes=0, @@ -517,11 +550,12 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): env.neon_env.pageserver.stop() # make it seem like we're at 100% utilization by setting total bytes to the used bytes - total_size, _, _ = env.timelines_du() + total_size, _, _ = env.timelines_du(env.pageserver) blocksize = 512 total_blocks = (total_size + (blocksize - 1)) // blocksize env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=33, min_avail_bytes=0, @@ -540,7 +574,7 @@ def test_statvfs_pressure_usage(eviction_env: EvictionEnv): wait_until(10, 1, relieved_log_message) - post_eviction_total_size, _, _ = env.timelines_du() + post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) assert post_eviction_total_size <= 0.33 * total_size, "we requested max 33% usage" @@ -555,13 +589,14 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): env.neon_env.pageserver.stop() # make it seem like we're at 100% utilization by setting total bytes to the used bytes - total_size, _, _ = env.timelines_du() + total_size, _, _ = env.timelines_du(env.pageserver) blocksize = 512 total_blocks = (total_size + (blocksize - 1)) // blocksize min_avail_bytes = total_size // 3 env.pageserver_start_with_disk_usage_eviction( + env.pageserver, period="1s", max_usage_pct=100, min_avail_bytes=min_avail_bytes, @@ -580,7 +615,66 @@ def test_statvfs_pressure_min_avail_bytes(eviction_env: EvictionEnv): wait_until(10, 1, relieved_log_message) - post_eviction_total_size, _, _ = env.timelines_du() + post_eviction_total_size, _, _ = env.timelines_du(env.pageserver) + + assert ( + total_size - post_eviction_total_size >= min_avail_bytes + ), "we requested at least min_avail_bytes worth of free space" + + +def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv): + env = eviction_env_ha + + tenant_ids = [t[0] for t in env.timelines] + + log.info("Setting up secondary location...") + ps_attached = env.neon_env.pageservers[0] + ps_secondary = env.neon_env.pageservers[1] + for tenant_id in tenant_ids: + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + readback_conf = ps_secondary.read_tenant_location_conf(tenant_id) + log.info(f"Read back conf: {readback_conf}") + + # Request secondary location to download all layers that the attached location has + ps_attached.http_client().secondary_tenant_upload(tenant_id) + ps_secondary.http_client().secondary_tenant_download(tenant_id) + + # Configure the secondary pageserver to have a phony small disk size + ps_secondary.stop() + total_size, _, _ = env.timelines_du(ps_secondary) + blocksize = 512 + total_blocks = (total_size + (blocksize - 1)) // blocksize + + min_avail_bytes = total_size // 3 + + env.pageserver_start_with_disk_usage_eviction( + ps_secondary, + period="1s", + max_usage_pct=100, + min_avail_bytes=min_avail_bytes, + mock_behavior={ + "type": "Success", + "blocksize": blocksize, + "total_blocks": total_blocks, + # Only count layer files towards used bytes in the mock_statvfs. + # This avoids accounting for metadata files & tenant conf in the tests. + "name_filter": ".*__.*", + }, + ) + + def relieved_log_message(): + assert ps_secondary.log_contains(".*disk usage pressure relieved") + + wait_until(10, 1, relieved_log_message) + + post_eviction_total_size, _, _ = env.timelines_du(ps_secondary) assert ( total_size - post_eviction_total_size >= min_avail_bytes diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 0dffcbc25b..12433d86b2 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -25,7 +25,13 @@ from fixtures.neon_fixtures import ( last_flush_lsn_upload, wait_for_last_flush_lsn, ) -from fixtures.pageserver.utils import list_prefix +from fixtures.pageserver.http import PageserverApiException +from fixtures.pageserver.utils import ( + assert_tenant_state, + list_prefix, + wait_for_last_record_lsn, + wait_for_upload, +) from fixtures.remote_storage import ( RemoteStorageKind, ) diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py new file mode 100644 index 0000000000..e49303be1e --- /dev/null +++ b/test_runner/regress/test_pageserver_secondary.py @@ -0,0 +1,477 @@ +import random +from pathlib import Path +from typing import Any, Dict, Optional + +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver +from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed +from fixtures.remote_storage import RemoteStorageKind +from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until +from fixtures.workload import Workload + +# A tenant configuration that is convenient for generating uploads and deletions +# without a large amount of postgres traffic. +TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_target_size": f"{128 * 1024}", + "compaction_threshold": "1", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", +} + + +def evict_random_layers( + rng: random.Random, pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId +): + """ + Evict 50% of the layers on a pageserver + """ + timeline_path = pageserver.timeline_dir(tenant_id, timeline_id) + initial_local_layers = sorted( + list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) + ) + client = pageserver.http_client() + for layer in initial_local_layers: + if "ephemeral" in layer.name: + continue + + if rng.choice([True, False]): + log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}") + client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name) + + +@pytest.mark.parametrize("seed", [1, 2, 3]) +def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int): + """ + Issue many location configuration changes, ensure that tenants + remain readable & we don't get any unexpected errors. We should + have no ERROR in the log, and no 500s in the API. + + The location_config API is intentionally designed so that all destination + states are valid, so that we may test it in this way: the API should always + work as long as the tenant exists. + """ + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 3 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + pageservers = env.pageservers + list([p.http_client() for p in pageservers]) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # We will make no effort to avoid stale attachments + for ps in env.pageservers: + ps.allowed_errors.extend( + [ + ".*Dropped remote consistent LSN updates.*", + ".*Dropping stale deletions.*", + # page_service_conn_main{peer_addr=[::1]:41176}: query handler for 'pagestream 3b19aec5038c796f64b430b30a555121 d07776761d44050b8aab511df1657d83' failed: Tenant 3b19aec5038c796f64b430b30a555121 not found + ".*query handler.*Tenant.*not found.*", + # page_service_conn_main{peer_addr=[::1]:45552}: query handler for 'pagestream 414ede7ad50f775a8e7d9ba0e43b9efc a43884be16f44b3626482b6981b2c745' failed: Tenant 414ede7ad50f775a8e7d9ba0e43b9efc is not active + ".*query handler.*Tenant.*not active.*", + ] + ) + + # these can happen, if we shutdown at a good time. to be fixed as part of #5172. + message = ".*duplicated L1 layer layer=.*" + ps.allowed_errors.append(message) + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, env.pageservers[0].id) + + # We use a fixed seed to make the test reproducible: we want a randomly + # chosen order, but not to change the order every time we run the test. + rng = random.Random(seed) + + initial_generation = 1 + last_state = { + env.pageservers[0].id: ("AttachedSingle", initial_generation), + env.pageservers[1].id: ("Detached", None), + env.pageservers[2].id: ("Detached", None), + } + + latest_attached = env.pageservers[0].id + + for _i in range(0, 64): + # Pick a pageserver + pageserver = rng.choice(env.pageservers) + + # Pick a pseudorandom state + modes = [ + "AttachedSingle", + "AttachedMulti", + "AttachedStale", + "Secondary", + "Detached", + "_Evictions", + "_Restart", + ] + + mode = rng.choice(modes) + + last_state_ps = last_state[pageserver.id] + if mode == "_Evictions": + if last_state_ps[0].startswith("Attached"): + log.info(f"Action: evictions on pageserver {pageserver.id}") + evict_random_layers(rng, pageserver, tenant_id, timeline_id) + else: + log.info( + f"Action: skipping evictions on pageserver {pageserver.id}, is not attached" + ) + elif mode == "_Restart": + log.info(f"Action: restarting pageserver {pageserver.id}") + pageserver.stop() + pageserver.start() + if last_state_ps[0].startswith("Attached") and latest_attached == pageserver.id: + log.info("Entering postgres...") + workload.churn_rows(rng.randint(128, 256), pageserver.id) + workload.validate(pageserver.id) + elif last_state_ps[0].startswith("Attached"): + # The `attachment_service` will only re-attach on startup when a pageserver was the + # holder of the latest generation: otherwise the pageserver will revert to detached + # state if it was running attached with a stale generation + last_state[pageserver.id] = ("Detached", None) + else: + secondary_conf: Optional[Dict[str, Any]] = None + if mode == "Secondary": + secondary_conf = {"warm": rng.choice([True, False])} + + location_conf: Dict[str, Any] = { + "mode": mode, + "secondary_conf": secondary_conf, + "tenant_conf": {}, + } + + log.info(f"Action: Configuring pageserver {pageserver.id} to {location_conf}") + + # Select a generation number + if mode.startswith("Attached"): + if last_state_ps[1] is not None: + if rng.choice([True, False]): + # Move between attached states, staying in the same generation + generation = last_state_ps[1] + else: + # Switch generations, while also jumping between attached states + generation = env.attachment_service.attach_hook(tenant_id, pageserver.id) + latest_attached = pageserver.id + else: + generation = env.attachment_service.attach_hook(tenant_id, pageserver.id) + latest_attached = pageserver.id + else: + generation = None + + location_conf["generation"] = generation + + pageserver.tenant_location_configure(tenant_id, location_conf) + last_state[pageserver.id] = (mode, generation) + + if mode.startswith("Attached"): + # TODO: a variant of this test that runs background endpoint workloads, as well as + # the inter-step workloads. + + workload.churn_rows( + rng.randint(128, 256), pageserver.id, upload=mode != "AttachedStale" + ) + workload.validate(pageserver.id) + + # Attach all pageservers + for ps in env.pageservers: + location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}} + ps.tenant_location_configure(tenant_id, location_conf) + + # Confirm that all are readable + for ps in env.pageservers: + workload.validate(ps.id) + + # Detach all pageservers + for ps in env.pageservers: + location_conf = {"mode": "Detached", "secondary_conf": None, "tenant_conf": {}} + ps.tenant_location_configure(tenant_id, location_conf) + + # Confirm that all local disk state was removed on detach + # TODO + + +def test_live_migration(neon_env_builder: NeonEnvBuilder): + """ + Test the sequence of location states that are used in a live migration. + """ + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + pageserver_a = env.pageservers[0] + pageserver_b = env.pageservers[1] + + initial_generation = 1 + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, env.pageservers[0].id) + + # Make the destination a secondary location + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + + workload.churn_rows(64, pageserver_a.id, upload=False) + + # Set origin attachment to stale + log.info("Setting origin to AttachedStale") + pageserver_a.tenant_location_configure( + tenant_id, + { + "mode": "AttachedStale", + "secondary_conf": None, + "tenant_conf": {}, + "generation": initial_generation, + }, + flush_ms=5000, + ) + + migrated_generation = env.attachment_service.attach_hook(tenant_id, pageserver_b.id) + log.info(f"Acquired generation {migrated_generation} for destination pageserver") + assert migrated_generation == initial_generation + 1 + + # Writes and reads still work in AttachedStale. + workload.validate(pageserver_a.id) + + # Ensure that secondary location's timeline directory is populated: we will then + # do some more writes on top of that to ensure that the newly attached pageserver + # properly makes use of the downloaded layers as well as ingesting WAL to catch up. + pageserver_a.http_client().secondary_tenant_upload(tenant_id) + pageserver_b.http_client().secondary_tenant_download(tenant_id) + + # Generate some more dirty writes: we expect the origin to ingest WAL in + # in AttachedStale + workload.churn_rows(64, pageserver_a.id, upload=False) + workload.validate(pageserver_a.id) + + # Attach the destination + log.info("Setting destination to AttachedMulti") + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "AttachedMulti", + "secondary_conf": None, + "tenant_conf": {}, + "generation": migrated_generation, + }, + ) + + # Wait for destination LSN to catch up with origin + origin_lsn = pageserver_a.http_client().timeline_detail(tenant_id, timeline_id)[ + "last_record_lsn" + ] + + def caught_up(): + destination_lsn = pageserver_b.http_client().timeline_detail(tenant_id, timeline_id)[ + "last_record_lsn" + ] + log.info( + f"Waiting for LSN to catch up: origin {origin_lsn} vs destination {destination_lsn}" + ) + assert destination_lsn >= origin_lsn + + wait_until(100, 0.1, caught_up) + + # The destination should accept writes + workload.churn_rows(64, pageserver_b.id) + + # Dual attached: both are readable. + workload.validate(pageserver_a.id) + workload.validate(pageserver_b.id) + + # Revert the origin to secondary + log.info("Setting origin to Secondary") + pageserver_a.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + + workload.churn_rows(64, pageserver_b.id) + + # Put the destination into final state + pageserver_b.tenant_location_configure( + tenant_id, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": migrated_generation, + }, + ) + + workload.churn_rows(64, pageserver_b.id) + workload.validate(pageserver_b.id) + + +def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]: + """ + Inspect local storage on a pageserver to discover which layer files are present. + + :return: list of relative paths to layers, from the timeline root. + """ + timeline_path = pageserver.timeline_dir(tenant_id, timeline_id) + + def relative(p: Path) -> Path: + return p.relative_to(timeline_path) + + return sorted( + list( + map( + relative, + filter( + lambda path: path.name != "metadata" + and "ephemeral" not in path.name + and "temp" not in path.name, + timeline_path.glob("*"), + ), + ) + ) + ) + + +def test_secondary_downloads(neon_env_builder: NeonEnvBuilder): + """ + Test the overall data flow in secondary mode: + - Heatmap uploads from the attached location + - Heatmap & layer downloads from the secondary location + - Eviction of layers on the attached location results in deletion + on the secondary location as well. + """ + neon_env_builder.enable_generations = True + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage( + remote_storage_kind=RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + assert env.attachment_service is not None + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + ps_attached = env.pageservers[0] + ps_secondary = env.pageservers[1] + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageservers[0].id) + workload.write_rows(256, ps_attached.id) + + # Configure a secondary location + log.info("Setting up secondary location...") + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + }, + ) + readback_conf = ps_secondary.read_tenant_location_conf(tenant_id) + log.info(f"Read back conf: {readback_conf}") + + # Explicit upload/download cycle + # ============================== + log.info("Synchronizing after initial write...") + ps_attached.http_client().secondary_tenant_upload(tenant_id) + + ps_secondary.http_client().secondary_tenant_download(tenant_id) + + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # Make changes on attached pageserver, check secondary downloads them + # =================================================================== + log.info("Synchronizing after subsequent write...") + workload.churn_rows(128, ps_attached.id) + + ps_attached.http_client().secondary_tenant_upload(tenant_id) + ps_secondary.http_client().secondary_tenant_download(tenant_id) + + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # FIXME: this sleep is needed to avoid on-demand promotion of the layers we evict, while + # walreceiver is still doing something. + import time + + time.sleep(5) + + # Do evictions on attached pageserver, check secondary follows along + # ================================================================== + log.info("Evicting a layer...") + layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0] + ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name) + + log.info("Synchronizing after eviction...") + ps_attached.http_client().secondary_tenant_upload(tenant_id) + ps_secondary.http_client().secondary_tenant_download(tenant_id) + + assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id) + assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers( + ps_secondary, tenant_id, timeline_id + ) + + # Scrub the remote storage + # ======================== + # This confirms that the scrubber isn't upset by the presence of the heatmap + # TODO: depends on `jcsp/scrubber-index-part` branch. + + # Detach secondary and delete tenant + # =================================== + # This confirms that the heatmap gets cleaned up as well as other normal content. + log.info("Detaching secondary location...") + ps_secondary.tenant_location_configure( + tenant_id, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + }, + ) + + log.info("Deleting tenant...") + tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10) + + assert_prefix_empty( + neon_env_builder, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + )