diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4911917bf4..aebf4952de 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -6,6 +6,7 @@ import concurrent.futures import filecmp import json import os +import random import re import shutil import subprocess @@ -2788,6 +2789,33 @@ class NeonPageserver(PgProtocol, LogUtils): layers = self.list_layers(tenant_id, timeline_id) return layer_name in [parse_layer_file_name(p.name) for p in layers] + def evict_random_layers( + self, rng: random.Random, tenant_id: TenantId, timeline_id: TimelineId, ratio: float + ): + """ + Evict 50% of the layers on a pageserver + """ + timeline_path = self.timeline_dir(tenant_id, timeline_id) + initial_local_layers = sorted( + list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) + ) + client = self.http_client() + for layer in initial_local_layers: + if ( + "ephemeral" in layer.name + or "temp_download" in layer.name + or "___temp" in layer.name + ): + continue + + layer_name = parse_layer_file_name(layer.name) + + if rng.uniform(0.0, 1.0) < ratio: + log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer_name.to_str()}") + client.evict_layer( + tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer_name.to_str() + ) + class PgBin: """A helper class for executing postgres binaries""" diff --git a/test_runner/regress/test_pageserver_restarts_under_workload.py b/test_runner/regress/test_pageserver_restarts_under_workload.py index 65569f3bac..a3fb867f62 100644 --- a/test_runner/regress/test_pageserver_restarts_under_workload.py +++ b/test_runner/regress/test_pageserver_restarts_under_workload.py @@ -1,16 +1,21 @@ # This test spawns pgbench in a thread in the background and concurrently restarts pageserver, # checking how client is able to transparently restore connection to pageserver # +import random import threading import time +import pytest +from fixtures.common_types import TenantShardId from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, PgBin +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin +from fixtures.remote_storage import RemoteStorageKind +from fixtures.utils import wait_until # Test restarting page server, while safekeeper and compute node keep # running. -def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgBin): +def test_pageserver_restarts_under_workload(neon_simple_env: NeonEnv, pg_bin: PgBin): env = neon_simple_env env.neon_cli.create_branch("test_pageserver_restarts") endpoint = env.endpoints.create_start("test_pageserver_restarts") @@ -32,3 +37,149 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB env.pageserver.start() thread.join() + + +@pytest.mark.timeout(600) +def test_pageserver_migration_under_workload(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + tenant_conf = { + # We want layer rewrites to happen as soon as possible (this is the most stressful + # case for the system), so set PITR interval to something tiny. + "pitr_interval": "5s", + # Scaled down thresholds. We will run at ~1GB scale but would like to emulate + # the behavior of a system running at ~100GB scale. + "checkpoint_distance": f"{1024 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{1024 * 1024}", + "image_creation_threshold": "2", + "image_layer_creation_check_threshold": "0", + # Do compaction & GC super frequently so that they are likely to be running while we are migrating + "compaction_period": "1s", + "gc_period": "1s", + } + + neon_env_builder.num_pageservers = 2 + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_pageserver_remote_storage( + RemoteStorageKind.MOCK_S3, + ) + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + for ps in env.pageservers: + ps.allowed_errors.extend( + [ + ".*Dropped remote consistent LSN updates.*", + ".*Dropping stale deletions.*", + # A stress test may hit the flush_ms timeout while transitioning a tenant to AttachedStale + ".*Timed out waiting for flush to remote storage, proceeding anyway.*", + ] + ) + + timeline_id = env.neon_cli.create_branch("branch") + endpoint = env.endpoints.create_start("branch") + n_migrations = 50 + scale = 100 + + test_stop = threading.Event() + + def run_pgbench(connstr: str): + ex = None + while not test_stop.is_set(): + log.info(f"Starting pgbench on {connstr}") + try: + pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) + pg_bin.run_capture(["pgbench", "-T10", connstr]) + except Exception as e: + # We don't stop the loop running pgbench when it fails: we still want to run the test + # to completion to see if the pageserver itself hit any issues when under load, even if + # the client is not getting a clean experience. + ex = e + log.warning(f"pgbench failed: {e}") + + if ex is not None: + raise ex + + def safekeeper_restarts(): + while not test_stop.is_set(): + for safekeeper in env.safekeepers: + if test_stop.is_set(): + break + + log.info(f"Restarting safekeeper {safekeeper.id}") + safekeeper.stop() + time.sleep(2) + safekeeper.start() + time.sleep(5) + + # Run pgbench in the background to ensure some data is there to ingest, and some getpage requests are running + pgbench_thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) + pgbench_thread.start() + + # Restart safekeepers in the background, so that pageserver's ingestion is not totally smooth, and sometimes + # during a tenant shutdown we might be in the process of finding a new safekeeper to download from + safekeeper_thread = threading.Thread(target=safekeeper_restarts, daemon=True) + safekeeper_thread.start() + + # TODO: inject some randomized timing noise: + # - before-downloading-layer-stream-pausable + # - flush-frozen-pausable + + def assert_getpage_reqs(ps_attached, init_getpage_reqs): + """ + Assert that the getpage request counter has advanced on a particular node + """ + getpage_reqs = ( + env.get_pageserver(ps_attached) + .http_client() + .get_metric_value( + "pageserver_smgr_query_seconds_global_count", {"smgr_query_type": "get_page_at_lsn"} + ) + ) + log.info(f"getpage reqs({ps_attached}): {getpage_reqs} (vs init {init_getpage_reqs})") + assert getpage_reqs is not None + assert getpage_reqs > init_getpage_reqs + + rng = random.Random(0xDEADBEEF) + + # Ping-pong the tenant between pageservers: this repeatedly exercises the transitions between secondarry and + # attached, and implicitly exercises the tenant/timeline shutdown() methods under load + tenant_id = env.initial_tenant + ps_attached = env.get_tenant_pageserver(tenant_id).id + ps_secondary = [p for p in env.pageservers if p.id != ps_attached][0].id + for _ in range(n_migrations): + # Snapshot of request counter before migration + init_getpage_reqs = ( + env.get_pageserver(ps_secondary) + .http_client() + .get_metric_value( + "pageserver_smgr_query_seconds_global_count", {"smgr_query_type": "get_page_at_lsn"} + ) + ) + if init_getpage_reqs is None: + init_getpage_reqs = 0 + + env.storage_controller.tenant_shard_migrate(TenantShardId(tenant_id, 0, 0), ps_secondary) + ps_attached, ps_secondary = ps_secondary, ps_attached + + # Make sure we've seen some getpage requests: avoid migrating when a client isn't really doing anything + wait_until( + 30, + 1, + lambda ps_attached=ps_attached, # type: ignore + init_getpage_reqs=init_getpage_reqs: assert_getpage_reqs( + ps_attached, init_getpage_reqs + ), + ) + + # Evict some layers, so that we're exercising getpage requests that involve a layer download + env.get_pageserver(ps_attached).evict_random_layers(rng, tenant_id, timeline_id, 0.1) + + # Since this test exists to detach shutdown issues, be strict on any warnings logged about delays in shutdown. + # Do this each loop iteration so that on failures we don't bury the issue behind many more iterations of logs. + for ps in env.pageservers: + assert not ps.log_contains( + "(kept the gate from closing|closing is taking longer than expected)" + ) + + test_stop.set() + pgbench_thread.join() + safekeeper_thread.join() diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 8431840dc0..88f064efe1 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -7,7 +7,7 @@ from typing import Any, Dict, Optional import pytest from fixtures.common_types import TenantId, TimelineId from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, StorageScrubber +from fixtures.neon_fixtures import NeonEnvBuilder, StorageScrubber from fixtures.pageserver.common_types import parse_layer_file_name from fixtures.pageserver.utils import ( assert_prefix_empty, @@ -34,30 +34,6 @@ TENANT_CONF = { } -def evict_random_layers( - rng: random.Random, pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId -): - """ - Evict 50% of the layers on a pageserver - """ - timeline_path = pageserver.timeline_dir(tenant_id, timeline_id) - initial_local_layers = sorted( - list(filter(lambda path: path.name != "metadata", timeline_path.glob("*"))) - ) - client = pageserver.http_client() - for layer in initial_local_layers: - if "ephemeral" in layer.name or "temp_download" in layer.name: - continue - - layer_name = parse_layer_file_name(layer.name) - - if rng.choice([True, False]): - log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer_name.to_str()}") - client.evict_layer( - tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer_name.to_str() - ) - - @pytest.mark.parametrize("seed", [1, 2, 3]) def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int): """ @@ -136,7 +112,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int): if mode == "_Evictions": if last_state_ps[0].startswith("Attached"): log.info(f"Action: evictions on pageserver {pageserver.id}") - evict_random_layers(rng, pageserver, tenant_id, timeline_id) + pageserver.evict_random_layers(rng, tenant_id, timeline_id, 0.5) else: log.info( f"Action: skipping evictions on pageserver {pageserver.id}, is not attached"