From 25bd74fd6a8afcc6bdf65b9f860d37b41da549a7 Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 1 Mar 2024 21:15:34 +0000 Subject: [PATCH] tests: controller rolling failure test --- test_runner/fixtures/neon_fixtures.py | 18 ++ test_runner/fixtures/workload.py | 38 +++- test_runner/regress/test_sharding_service.py | 188 ++++++++++++++++++- 3 files changed, 233 insertions(+), 11 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 06713e157a..43bf77d4b6 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1518,6 +1518,7 @@ class NeonCli(AbstractNeonCli): conf: Optional[Dict[str, Any]] = None, shard_count: Optional[int] = None, shard_stripe_size: Optional[int] = None, + placement_policy: Optional[str] = None, set_default: bool = False, ) -> Tuple[TenantId, TimelineId]: """ @@ -1551,6 +1552,9 @@ class NeonCli(AbstractNeonCli): if shard_stripe_size is not None: args.extend(["--shard-stripe-size", str(shard_stripe_size)]) + if placement_policy is not None: + args.extend(["--placement-policy", str(placement_policy)]) + res = self.raw_cli(args) res.check_returncode() return tenant_id, timeline_id @@ -2168,6 +2172,20 @@ class NeonAttachmentService(MetricsGetter): ) log.info("Attachment service passed consistency check") + def balance_all(self): + self.request( + "POST", + f"{self.env.attachment_service_api}/control/v1/balance/all", + headers=self.headers(TokenScope.ADMIN), + ) + + def balance_attached(self): + self.request( + "POST", + f"{self.env.attachment_service_api}/control/v1/balance/attached", + headers=self.headers(TokenScope.ADMIN), + ) + def __enter__(self) -> "NeonAttachmentService": return self diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 1d5394dc1d..a218bdf4eb 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -1,3 +1,4 @@ +import threading from typing import Optional from fixtures.log_helper import log @@ -11,6 +12,10 @@ from fixtures.neon_fixtures import ( from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload from fixtures.types import TenantId, TimelineId +# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex +# to ensure we don't do that: this enables running lots of Workloads in parallel safely. +ENDPOINT_LOCK = threading.Lock() + class Workload: """ @@ -41,17 +46,30 @@ class Workload: self._endpoint: Optional[Endpoint] = None + def reconfigure(self): + """ + Request the endpoint to reconfigure based on location reported by storage controller + """ + if self._endpoint is not None: + with ENDPOINT_LOCK: + self._endpoint.reconfigure() + def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint: - if self._endpoint is None: - self._endpoint = self.env.endpoints.create( - self.branch_name, - tenant_id=self.tenant_id, - pageserver_id=pageserver_id, - endpoint_id="ep-workload", - ) - self._endpoint.start(pageserver_id=pageserver_id) - else: - self._endpoint.reconfigure(pageserver_id=pageserver_id) + # We may be running alongside other Workloads for different tenants. Full TTID is + # obnoxiously long for use here, but a cut-down version is still unique enough for tests. + endpoint_id = f"ep-workload-{str(self.tenant_id)[0:4]}-{str(self.timeline_id)[0:4]}" + + with ENDPOINT_LOCK: + if self._endpoint is None: + self._endpoint = self.env.endpoints.create( + self.branch_name, + tenant_id=self.tenant_id, + pageserver_id=pageserver_id, + endpoint_id=endpoint_id, + ) + self._endpoint.start(pageserver_id=pageserver_id) + else: + self._endpoint.reconfigure(pageserver_id=pageserver_id) connstring = self._endpoint.safe_psql( "SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'" diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index aecc244a47..0b7ce1fc72 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -1,3 +1,5 @@ +import concurrent.futures +import random import time from collections import defaultdict from datetime import datetime, timezone @@ -23,8 +25,9 @@ from fixtures.pageserver.utils import ( ) from fixtures.pg_version import PgVersion from fixtures.remote_storage import RemoteStorageKind, s3_storage -from fixtures.types import TenantId, TimelineId +from fixtures.types import TenantId, TenantShardId, TimelineId from fixtures.utils import run_pg_bench_small, wait_until +from fixtures.workload import Workload from mypy_boto3_s3.type_defs import ( ObjectTypeDef, ) @@ -770,3 +773,186 @@ def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder): assert "pitr_interval" not in readback_ps.tenant_specific_overrides env.attachment_service.consistency_check() + + +def test_storcon_rolling_failures( + neon_env_builder: NeonEnvBuilder, httpserver: HTTPServer, httpserver_listen_address +): + neon_env_builder.num_pageservers = 8 + + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify-attach" + + workloads: dict[TenantId, Workload] = {} + + # Do neon_local endpoint reconfiguration in the background so that we can + # accept a healthy rate of calls into notify-attach. + reconfigure_threads = concurrent.futures.ThreadPoolExecutor(max_workers=1) + + def handler(request: Request): + """ + Although storage controller can use neon_local directly, this causes problems when + the test is also concurrently modifying endpoints. Instead, configure storage controller + to send notifications up to this test code, which will route all endpoint updates + through Workload, which has a mutex to make it all safe. + """ + assert request.json is not None + body: dict[str, Any] = request.json + log.info(f"notify-attach request: {body}") + + try: + workload = workloads[TenantId(body["tenant_id"])] + except KeyError: + pass + else: + # This causes the endpoint to query storage controller for its location, which + # is redundant since we already have it here, but this avoids extending the + # neon_local CLI to take full lists of locations + reconfigure_threads.submit(lambda workload=workload: workload.reconfigure()) # type: ignore[no-any-return] + + return Response(status=200) + + httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(handler) + + env = neon_env_builder.init_start() + + for ps in env.pageservers: + # We will do unclean detaches + ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*") + + n_tenants = 32 + tenants = [(env.initial_tenant, env.initial_timeline)] + for i in range(0, n_tenants - 1): + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + shard_count = [1, 2, 4][i % 3] + env.neon_cli.create_tenant( + tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}' + ) + tenants.append((tenant_id, timeline_id)) + + # Background pain: + # - TODO: some fraction of pageserver API requests hang + # (this requires implementing wrap of location_conf calls with proper timeline/cancel) + # - TODO: continuous tenant/timeline creation/destruction over a different ID range than + # the ones we're using for availability checks. + + rng = random.Random(0xDEADBEEF) + + for tenant_id, timeline_id in tenants: + workload = Workload(env, tenant_id, timeline_id) + workloads[tenant_id] = workload + + def node_evacuated(node_id: int): + counts = get_node_shard_counts(env, [t[0] for t in tenants]) + assert counts[node_id] == 0 + + def attachments_active(): + for tid, _tlid in tenants: + for shard in env.attachment_service.locate(tid): + psid = shard["node_id"] + tsid = TenantShardId.parse(shard["shard_id"]) + status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid) + assert status["state"]["slug"] == "Active" + log.info(f"Shard {tsid} active on node {psid}") + + failpoints = ("api-503", "5%1000*return(1)") + failpoints_str = f"{failpoints[0]}={failpoints[1]}" + for ps in env.pageservers: + ps.http_client().configure_failpoints(failpoints) + + def for_all_workloads(callback, timeout=60): + futs = [] + with concurrent.futures.ThreadPoolExecutor() as pool: + for _tenant_id, workload in workloads.items(): + futs.append(pool.submit(callback, workload)) + + for f in futs: + f.result(timeout=timeout) + + def clean_fail_restore(): + """ + Clean shutdown of a node: mark it offline in storage controller, wait for new attachment + locations to activate, then SIGTERM it. + - Endpoints should not fail any queries + - New attach locations should activate within bounded time. + """ + victim = rng.choice(env.pageservers) + env.attachment_service.node_configure(victim.id, {"availability": "Offline"}) + + wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc] + wait_until(10, 1, attachments_active) + + victim.stop(immediate=False) + + traffic() + + victim.start(extra_env_vars={"FAILPOINTS": failpoints_str}) + + # Revert shards to attach at their original locations + env.attachment_service.balance_attached() + wait_until(10, 1, attachments_active) + + def hard_fail_restore(): + """ + Simulate an unexpected death of a pageserver node + """ + victim = rng.choice(env.pageservers) + victim.stop(immediate=True) + # TODO: once we implement heartbeats detecting node failures, remove this + # explicit marking offline and rely on storage controller to detect it itself. + env.attachment_service.node_configure(victim.id, {"availability": "Offline"}) + wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc] + wait_until(10, 1, attachments_active) + traffic() + victim.start(extra_env_vars={"FAILPOINTS": failpoints_str}) + env.attachment_service.balance_attached() + wait_until(10, 1, attachments_active) + + def traffic(): + """ + Check that all tenants are working for postgres clients + """ + + def exercise_one(workload): + workload.churn_rows(100) + workload.validate() + + for_all_workloads(exercise_one) + + def init_one(workload): + workload.init() + workload.write_rows(100) + + for_all_workloads(init_one, timeout=60) + + for i in range(0, 20): + mode = rng.choice([0, 1, 2]) + log.info(f"Iteration {i}, mode {mode}") + if mode == 0: + # Traffic interval: sometimes, instead of a failure, just let the clients + # write a load of data. This avoids chaos tests ending up with unrealistically + # small quantities of data in flight. + traffic() + elif mode == 1: + clean_fail_restore() + elif mode == 2: + hard_fail_restore() + + # Fail and restart: hard-kill one node. Notify the storage controller that it is offline. + # Success criteria: + # - New attach locations should activate within bounded time + # - TODO: once we do heartbeating, we should not have to explicitly mark the node offline + + # TODO: fail and remove: fail a node, and remove it from the cluster. + # Success criteria: + # - Endpoints should not fail any queries + # - New attach locations should activate within bounded time + # - New secondary locations should fill up with data within bounded time + + # TODO: somehow need to wait for reconciles to complete before doing consistency check + # (or make the check wait). + + # Do consistency check on every iteration, not just at the end: this makes it more obvious + # which change caused an issue. + env.attachment_service.consistency_check()