From bbb2fa7cdd1284376155fcbbdf34191b335df4e6 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 4 Jul 2024 06:04:19 +0100 Subject: [PATCH] tests: perform graceful rolling restarts in storcon scale test (#8173) ## Problem Scale test doesn't exercise drain & fill. ## Summary of changes Make scale test exercise drain & fill --- test_runner/fixtures/neon_fixtures.py | 47 +++++++ .../test_storage_controller_scale.py | 124 ++++++++++++++++-- .../regress/test_storage_controller.py | 59 ++------- 3 files changed, 171 insertions(+), 59 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 565aaba6e0..c002e11c1c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2113,6 +2113,21 @@ class NeonStorageController(MetricsGetter, LogUtils): self.running = False return self + @staticmethod + def retryable_node_operation(op, ps_id, max_attempts, backoff): + while max_attempts > 0: + try: + op(ps_id) + return + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Operation failed ({max_attempts} attempts left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + @staticmethod def raise_api_exception(res: requests.Response): try: @@ -2453,6 +2468,38 @@ class NeonStorageController(MetricsGetter, LogUtils): ) log.info("storage controller passed consistency check") + def poll_node_status( + self, node_id: int, desired_scheduling_policy: str, max_attempts: int, backoff: int + ): + """ + Poll the node status until it reaches 'desired_scheduling_policy' or 'max_attempts' have been exhausted + """ + log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy") + while max_attempts > 0: + try: + status = self.node_status(node_id) + policy = status["scheduling"] + if policy == desired_scheduling_policy: + return + else: + max_attempts -= 1 + log.info(f"Status call returned {policy=} ({max_attempts} attempts left)") + + if max_attempts == 0: + raise AssertionError( + f"Status for {node_id=} did not reach {desired_scheduling_policy=}" + ) + + time.sleep(backoff) + except StorageControllerApiException as e: + max_attempts -= 1 + log.info(f"Status call failed ({max_attempts} retries left): {e}") + + if max_attempts == 0: + raise e + + time.sleep(backoff) + def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]): if isinstance(config_strings, tuple): pairs = [config_strings] diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index a4c8c8ac42..d65a66b010 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -1,18 +1,89 @@ import concurrent.futures import random import time +from collections import defaultdict +from typing import Any, Dict import pytest from fixtures.common_types import TenantId, TenantShardId, TimelineId from fixtures.compute_reconfigure import ComputeReconfigure from fixtures.log_helper import log -from fixtures.neon_fixtures import ( - NeonEnvBuilder, -) +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.pageserver.http import PageserverHttpClient from fixtures.pg_version import PgVersion +def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]: + """ + Get the number of shards attached to each node. + This function takes into account the intersection of the intent and the observed state. + If they do not match, it asserts out. + """ + tenants = env.storage_controller.tenant_list() + + intent = dict() + observed = dict() + + tenant_placement: defaultdict[str, Dict[str, Any]] = defaultdict( + lambda: { + "observed": {"attached": None, "secondary": []}, + "intent": {"attached": None, "secondary": []}, + } + ) + + for t in tenants: + for node_id, loc_state in t["observed"]["locations"].items(): + if ( + loc_state is not None + and "conf" in loc_state + and loc_state["conf"] is not None + and loc_state["conf"]["mode"] + in set(["AttachedSingle", "AttachedMulti", "AttachedStale"]) + ): + observed[t["tenant_shard_id"]] = int(node_id) + tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id) + + if ( + loc_state is not None + and "conf" in loc_state + and loc_state["conf"] is not None + and loc_state["conf"]["mode"] == "Secondary" + ): + tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id)) + + if "attached" in t["intent"]: + intent[t["tenant_shard_id"]] = t["intent"]["attached"] + tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"] + + if "secondary" in t["intent"]: + tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"][ + "secondary" + ] + + log.info(f"{tenant_placement=}") + + matching = { + tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid] + } + assert len(matching) == total_shards + + attached_per_node: defaultdict[str, int] = defaultdict(int) + for node_id in matching.values(): + attached_per_node[node_id] += 1 + + return attached_per_node + + +def assert_consistent_balanced_attachments(env: NeonEnv, total_shards): + attached_per_node = get_consistent_node_shard_counts(env, total_shards) + + min_shard_count = min(attached_per_node.values()) + max_shard_count = max(attached_per_node.values()) + + flake_factor = 5 / 100 + assert max_shard_count - min_shard_count <= int(total_shards * flake_factor) + + @pytest.mark.timeout(3600) # super long running test: should go down as we optimize def test_storage_controller_many_tenants( neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure @@ -44,7 +115,8 @@ def test_storage_controller_many_tenants( # A small sleep on each call into the notify hook, to simulate the latency of doing a database write compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01)) - env = neon_env_builder.init_start() + env = neon_env_builder.init_configs() + neon_env_builder.start() # We will intentionally stress reconciler concurrrency, which triggers a warning when lots # of shards are hitting the delayed path. @@ -79,6 +151,8 @@ def test_storage_controller_many_tenants( shard_count = 2 stripe_size = 1024 + total_shards = tenant_count * shard_count + tenants = set(TenantId.generate() for _i in range(0, tenant_count)) virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True) @@ -195,10 +269,44 @@ def test_storage_controller_many_tenants( env.storage_controller.consistency_check() check_memory() - # Restart pageservers: this exercises the /re-attach API - for pageserver in env.pageservers: - pageserver.stop() - pageserver.start() + shard_counts = get_consistent_node_shard_counts(env, total_shards) + log.info(f"Shard counts before rolling restart: {shard_counts}") + + assert_consistent_balanced_attachments(env, total_shards) + + # Restart pageservers gracefully: this exercises the /re-attach pageserver API + # and the storage controller drain and fill API + for ps in env.pageservers: + env.storage_controller.retryable_node_operation( + lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 + ) + + env.storage_controller.poll_node_status( + ps.id, "PauseForRestart", max_attempts=24, backoff=5 + ) + + shard_counts = get_consistent_node_shard_counts(env, total_shards) + log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") + # Assert that we've drained the node + assert shard_counts[str(ps.id)] == 0 + # Assert that those shards actually went somewhere + assert sum(shard_counts.values()) == total_shards + + ps.restart() + env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=1) + + env.storage_controller.retryable_node_operation( + lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 + ) + env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=24, backoff=5) + + shard_counts = get_consistent_node_shard_counts(env, total_shards) + log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") + + assert_consistent_balanced_attachments(env, total_shards) + + env.storage_controller.reconcile_until_idle() + env.storage_controller.consistency_check() # Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn, # as they were not offline long enough to trigger any scheduling changes. diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 1b294fb2d0..a78f566f0e 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1518,49 +1518,6 @@ def test_tenant_import(neon_env_builder: NeonEnvBuilder, shard_count, remote_sto workload.validate() -def retryable_node_operation(op, ps_id, max_attempts, backoff): - while max_attempts > 0: - try: - op(ps_id) - return - except StorageControllerApiException as e: - max_attempts -= 1 - log.info(f"Operation failed ({max_attempts} attempts left): {e}") - - if max_attempts == 0: - raise e - - time.sleep(backoff) - - -def poll_node_status(env, node_id, desired_scheduling_policy, max_attempts, backoff): - log.info(f"Polling {node_id} for {desired_scheduling_policy} scheduling policy") - while max_attempts > 0: - try: - status = env.storage_controller.node_status(node_id) - policy = status["scheduling"] - if policy == desired_scheduling_policy: - return - else: - max_attempts -= 1 - log.info(f"Status call returned {policy=} ({max_attempts} attempts left)") - - if max_attempts == 0: - raise AssertionError( - f"Status for {node_id=} did not reach {desired_scheduling_policy=}" - ) - - time.sleep(backoff) - except StorageControllerApiException as e: - max_attempts -= 1 - log.info(f"Status call failed ({max_attempts} retries left): {e}") - - if max_attempts == 0: - raise e - - time.sleep(backoff) - - def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): """ Graceful reststart of storage controller clusters use the drain and @@ -1601,10 +1558,10 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): # Perform a graceful rolling restart for ps in env.pageservers: - retryable_node_operation( + env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 ) - poll_node_status(env, ps.id, "PauseForRestart", max_attempts=6, backoff=5) + env.storage_controller.poll_node_status(ps.id, "PauseForRestart", max_attempts=6, backoff=5) shard_counts = get_node_shard_counts(env, tenant_ids) log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") @@ -1614,12 +1571,12 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder): assert sum(shard_counts.values()) == total_shards ps.restart() - poll_node_status(env, ps.id, "Active", max_attempts=10, backoff=1) + env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=10, backoff=1) - retryable_node_operation( + env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 ) - poll_node_status(env, ps.id, "Active", max_attempts=6, backoff=5) + env.storage_controller.poll_node_status(ps.id, "Active", max_attempts=6, backoff=5) shard_counts = get_node_shard_counts(env, tenant_ids) log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") @@ -1657,15 +1614,15 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder): ps_id_to_drain = env.pageservers[0].id - retryable_node_operation( + env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_drain(ps_id), ps_id_to_drain, max_attempts=3, backoff=2, ) - poll_node_status(env, ps_id_to_drain, "Draining", max_attempts=6, backoff=2) + env.storage_controller.poll_node_status(ps_id_to_drain, "Draining", max_attempts=6, backoff=2) env.storage_controller.cancel_node_drain(ps_id_to_drain) - poll_node_status(env, ps_id_to_drain, "Active", max_attempts=6, backoff=2) + env.storage_controller.poll_node_status(ps_id_to_drain, "Active", max_attempts=6, backoff=2)