Files
neon/test_runner/performance/test_storage_controller_scale.py
Alexander Bayandin 30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00

580 lines
24 KiB
Python

from __future__ import annotations
import concurrent.futures
import random
import time
from collections import defaultdict
from enum import StrEnum
from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PageserverAvailability,
PageserverSchedulingPolicy,
StorageControllerMigrationConfig,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.compute_reconfigure import ComputeReconfigure
def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[str, int]:
"""
Get the number of shards attached to each node.
This function takes into account the intersection of the intent and the observed state.
If they do not match, it asserts out.
"""
tenant_placement = env.storage_controller.get_tenants_placement()
log.info(f"{tenant_placement=}")
matching = {
tid: tenant_placement[tid]["intent"]["attached"]
for tid in tenant_placement
if tenant_placement[tid]["intent"]["attached"]
== tenant_placement[tid]["observed"]["attached"]
}
assert len(matching) == total_shards
attached_per_node: defaultdict[str, int] = defaultdict(int)
for node_id in matching.values():
attached_per_node[node_id] += 1
return attached_per_node
def assert_consistent_balanced_attachments(env: NeonEnv, total_shards):
attached_per_node = get_consistent_node_shard_counts(env, total_shards)
min_shard_count = min(attached_per_node.values())
max_shard_count = max(attached_per_node.values())
flake_factor = 5 / 100
assert max_shard_count - min_shard_count <= int(total_shards * flake_factor)
@pytest.mark.timeout(3600) # super long running test: should go down as we optimize
def test_storage_controller_many_tenants(
neon_env_builder: NeonEnvBuilder, compute_reconfigure_listener: ComputeReconfigure
):
"""
Check that we cope well with a not-totally-trivial number of tenants.
This is checking for:
- Obvious concurrency bugs from issuing many tenant creations/modifications
concurrently.
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
a basic test starts failing from slowness.
This is _not_ a comprehensive scale test: just a basic sanity check that
we don't fall over for a thousand shards.
"""
neon_env_builder.num_pageservers = 6
neon_env_builder.storage_controller_config = {
# Default neon_local uses a small timeout: use a longer one to tolerate longer pageserver restarts.
# TODO: tune this down as restarts get faster (https://github.com/neondatabase/neon/pull/7553), to
# guard against regressions in restart time.
"max_offline": "30s",
"max_warming_up": "300s",
"use_local_compute_notifications": False,
}
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
AZS = ["alpha", "bravo", "charlie"]
def az_selector(node_id):
return f"az-{AZS[(node_id - 1) % len(AZS)]}"
neon_env_builder.pageserver_config_override = lambda ps_cfg: ps_cfg.update(
{"availability_zone": az_selector(ps_cfg["id"])}
)
# A small sleep on each call into the notify hook, to simulate the latency of doing a database write
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_configs()
env.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
env.storage_controller.allowed_errors.extend(
[
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.
".*Many shards are waiting to reconcile",
# We will create many timelines concurrently, so they might get slow enough to trip the warning
# that timeline creation is holding a lock too long.
".*Shared lock by TimelineCreate.*was held.*",
]
)
for ps in env.pageservers:
# Storage controller is allowed to drop pageserver requests when the cancellation token
# for a Reconciler fires.
ps.allowed_errors.append(".*request was dropped before completing.*")
# Total tenants
small_tenant_count = 7800
large_tenant_count = 200
tenant_count = small_tenant_count + large_tenant_count
large_tenant_shard_count = 8
total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count
# A small stripe size to encourage all shards to get some data
stripe_size = 1
# We use a fixed seed to make the test somewhat reproducible: we want a randomly
# chosen order in the sense that it's arbitrary, but not in the sense that it should change every run.
rng = random.Random(1234)
class Tenant:
def __init__(self):
# Tenants may optionally contain a timeline
self.timeline_id = None
# Tenants may be marked as 'large' to get multiple shard during creation phase
self.large = False
tenant_ids = list(TenantId.generate() for _i in range(0, tenant_count))
tenants = dict((tid, Tenant()) for tid in tenant_ids)
# We will create timelines in only a subset of tenants, because creating timelines
# does many megabytes of IO, and we want to densely simulate huge tenant counts on
# a single test node.
tenant_timelines_count = 100
# These lists are maintained for use with rng.choice
tenants_with_timelines = list(rng.sample(list(tenants.keys()), tenant_timelines_count))
tenants_without_timelines = list(
tenant_id for tenant_id in tenants if tenant_id not in tenants_with_timelines
)
# For our sharded tenants, we will make half of them with timelines and half without
assert large_tenant_count >= tenant_timelines_count / 2
for tenant_id in tenants_with_timelines[0 : large_tenant_count // 2]:
tenants[tenant_id].large = True
for tenant_id in tenants_without_timelines[0 : large_tenant_count // 2]:
tenants[tenant_id].large = True
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
def check_memory():
# Shards should be cheap_ in memory, as we will have very many of them
expect_memory_per_shard = 128 * 1024
rss = env.storage_controller.get_metric_value("process_resident_memory_bytes")
assert rss is not None
log.info(f"Resident memory: {rss} ({rss / total_shards} per shard)")
assert rss < expect_memory_per_shard * total_shards
def assert_all_tenants_scheduled_in_home_az():
for tenant_id in tenant_ids:
desc = env.storage_controller.tenant_describe(tenant_id)
preferred_az = None
for shard in desc["shards"]:
# All shards in a tenant should have the same preferred AZ
if preferred_az is None:
preferred_az = shard["preferred_az_id"]
else:
assert preferred_az == shard["preferred_az_id"]
# Attachment should be in the preferred AZ
assert shard["preferred_az_id"] == az_selector(shard["node_attached"]), (
f"Shard {shard['tenant_shard_id']} not in {shard['preferred_az_id']}"
)
# Secondary locations should not be in the preferred AZ
for node_secondary in shard["node_secondary"]:
assert shard["preferred_az_id"] != az_selector(node_secondary), (
f"Shard {shard['tenant_shard_id']} secondary should be in {shard['preferred_az_id']}"
)
# There should only be one secondary location (i.e. no migrations in flight)
assert len(shard["node_secondary"]) == 1
# Issue more concurrent operations than the storage controller's reconciler concurrency semaphore
# permits, to ensure that we are exercising stressing that.
api_concurrency = 135
# A different concurrency limit for bulk tenant+timeline creations: these do I/O and will
# start timing on test nodes if we aren't a bit careful.
create_concurrency = 16
class Operation(StrEnum):
TIMELINE_OPS = "timeline_ops"
SHARD_MIGRATE = "shard_migrate"
TENANT_PASSTHROUGH = "tenant_passthrough"
run_ops = api_concurrency * 4
assert run_ops < len(tenants)
# Creation phase: make a lot of tenants, and create timelines in a subset of them
# This executor has concurrency set modestly, to avoid overloading pageservers with timeline creations.
with concurrent.futures.ThreadPoolExecutor(max_workers=create_concurrency) as executor:
tenant_create_futs = []
t1 = time.time()
for tenant_id, tenant in tenants.items():
if tenant.large:
shard_count = large_tenant_shard_count
else:
shard_count = 1
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
f = executor.submit(
env.storage_controller.tenant_create,
tenant_id,
shard_count,
stripe_size,
# Upload heatmaps fast, so that secondary downloads happen promptly, enabling
# the controller's optimization migrations to proceed promptly.
tenant_config={"heatmap_period": "10s"},
placement_policy={"Attached": 1},
)
tenant_create_futs.append(f)
# Wait for tenant creations to finish
for f in tenant_create_futs:
f.result()
log.info(
f"Created {len(tenants)} tenants in {time.time() - t1}, {len(tenants) / (time.time() - t1)}/s"
)
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
# would be for original scheduling decisions to always match optimizer's preference)
# (workaround for https://github.com/neondatabase/neon/issues/8969)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
# Create timelines in those tenants which are going to get one
t1 = time.time()
timeline_create_futs = []
for tenant_id in tenants_with_timelines:
timeline_id = TimelineId.generate()
tenants[tenant_id].timeline_id = timeline_id
f = executor.submit(
env.storage_controller.pageserver_api().timeline_create,
PgVersion.NOT_SET,
tenant_id,
timeline_id,
)
timeline_create_futs.append(f)
for f in timeline_create_futs:
f.result()
log.info(
f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s"
)
# Check initial scheduling
assert_all_tenants_scheduled_in_home_az()
az_attached_counts: defaultdict[str, int] = defaultdict(int)
az_secondary_counts: defaultdict[str, int] = defaultdict(int)
node_attached_counts: defaultdict[str, int] = defaultdict(int)
for tenant_id in tenants.keys():
desc = env.storage_controller.tenant_describe(tenant_id)
for shard in desc["shards"]:
az_attached_counts[az_selector(shard["node_attached"])] += 1
node_attached_counts[shard["node_attached"]] += 1
for node_secondary in shard["node_secondary"]:
az_secondary_counts[az_selector(node_secondary)] += 1
log.info(f"Initial node attached counts: {node_attached_counts}")
log.info(f"Initial AZ shard counts: {az_attached_counts}, {az_secondary_counts}")
# Plan operations: ensure each tenant with a timeline gets at least
# one of each operation type. Then add other tenants to make up the
# numbers.
ops_plan = []
for tenant_id in tenants_with_timelines:
ops_plan.append((tenant_id, Operation.TIMELINE_OPS))
ops_plan.append((tenant_id, Operation.SHARD_MIGRATE))
ops_plan.append((tenant_id, Operation.TENANT_PASSTHROUGH))
# Fill up remaining run_ops with migrations of tenants without timelines
other_migrate_tenants = rng.sample(tenants_without_timelines, run_ops - len(ops_plan))
for tenant_id in other_migrate_tenants:
ops_plan.append(
(
tenant_id,
rng.choice([Operation.SHARD_MIGRATE, Operation.TENANT_PASSTHROUGH]),
)
)
# Exercise phase: pick pseudo-random operations to do on the tenants + timelines
# This executor has concurrency high enough to stress the storage controller API.
with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor:
def exercise_timeline_ops(tenant_id, timeline_id):
# A read operation: this requires looking up shard zero and routing there
detail = virtual_ps_http.timeline_detail(tenant_id, timeline_id)
assert detail["timeline_id"] == str(timeline_id)
# A fan-out write operation to all shards in a tenant.
# - We use a metadata operation rather than something like a timeline create, because
# timeline creations are I/O intensive and this test isn't meant to be a stress test for
# doing lots of concurrent timeline creations.
archival_state = rng.choice(
[TimelineArchivalState.ARCHIVED, TimelineArchivalState.UNARCHIVED]
)
try:
virtual_ps_http.timeline_archival_config(tenant_id, timeline_id, archival_state)
except PageserverApiException as e:
if e.status_code == 404:
# FIXME: there is an edge case where timeline ops can encounter a 404 during
# a very short time window between generating a new generation number and
# attaching this tenant to its new pageserver.
# See https://github.com/neondatabase/neon/issues/9471
pass
else:
raise
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id, op in ops_plan:
if op == Operation.TIMELINE_OPS:
op_timeline_id = tenants[tenant_id].timeline_id
assert op_timeline_id is not None
# Exercise operations that modify tenant scheduling state but require traversing
# the fan-out-to-all-shards functionality.
f = executor.submit(
exercise_timeline_ops,
tenant_id,
op_timeline_id,
)
elif op == Operation.SHARD_MIGRATE:
# A reconciler operation: migrate a shard.
desc = env.storage_controller.tenant_describe(tenant_id)
shard_number = rng.randint(0, len(desc["shards"]) - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, len(desc["shards"]))
# Migrate it to its secondary location
dest_ps_id = desc["shards"][shard_number]["node_secondary"][0]
f = executor.submit(
env.storage_controller.tenant_shard_migrate,
tenant_shard_id,
dest_ps_id,
StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
)
elif op == Operation.TENANT_PASSTHROUGH:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
futs.append(f)
# Wait for mixed ops to finish
for f in futs:
f.result()
log.info("Completed mixed operations phase")
# Some of the operations above (notably migrations) might leave the controller in a state where it has
# some work to do, for example optimizing shard placement after we do a random migration. Wait for the system
# to reach a quiescent state before doing following checks.
#
# - Set max_interval low because we probably have a significant number of optimizations to complete and would like
# the test to run quickly.
# - Set timeout high because we might be waiting for optimizations that reuqire a secondary
# to warm up, and if we just started a secondary in the previous step, it might wait some time
# before downloading its heatmap
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.consistency_check()
check_memory()
# This loop waits for reconcile_all to indicate no pending work, and then calls it once more to time
# how long the call takes when idle: this iterates over shards while doing no I/O and should be reliably fast: if
# it isn't, that's a sign that we have made some algorithmic mistake (e.g. O(N**2) scheduling)
#
# We do not require that the system is quiescent already here, although at present in this point in the test
# that may be the case.
log.info("Reconciling all & timing")
while True:
t1 = time.time()
reconcilers = env.storage_controller.reconcile_all()
if reconcilers == 0:
# Time how long a no-op background reconcile takes: this measures how long it takes to
# loop over all the shards looking for work to do.
runtime = time.time() - t1
log.info(f"No-op call to reconcile_all took {runtime}s")
assert runtime < 1
break
# Restart the storage controller
log.info("Restarting controller")
env.storage_controller.stop()
env.storage_controller.start()
# See how long the controller takes to pass its readiness check. This should be fast because
# all the nodes are online: offline pageservers are the only thing that's allowed to delay
# startup.
readiness_period = env.storage_controller.wait_until_ready()
assert readiness_period < 5
# Consistency check is safe here: the storage controller's restart should not have caused any reconcilers
# to run, as it was in a stable state before restart. If it did, that's a bug.
env.storage_controller.consistency_check()
check_memory()
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts before rolling restart: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
# Restart pageservers gracefully: this exercises the /re-attach pageserver API
# and the storage controller drain and fill API
log.info("Restarting pageservers...")
# Parameters for how long we expect it to take to migrate all of the tenants from/to
# a node during a drain/fill operation
DRAIN_FILL_TIMEOUT = 240
DRAIN_FILL_BACKOFF = 5
for ps in env.pageservers:
log.info(f"Draining pageserver {ps.id}")
t1 = time.time()
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
backoff=DRAIN_FILL_BACKOFF,
)
log.info(f"Drained pageserver {ps.id} in {time.time() - t1}s")
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after draining node {ps.id}: {shard_counts}")
# Assert that we've drained the node
assert shard_counts[str(ps.id)] == 0
# Assert that those shards actually went somewhere
assert sum(shard_counts.values()) == total_shards
ps.restart()
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=24,
backoff=1,
)
log.info(f"Filling pageserver {ps.id}")
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2
)
env.storage_controller.poll_node_status(
ps.id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.ACTIVE,
max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF,
backoff=DRAIN_FILL_BACKOFF,
)
log.info(f"Filled pageserver {ps.id} in {time.time() - t1}s")
# Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior
# would be for original scheduling decisions to always match optimizer's preference)
# (workaround for https://github.com/neondatabase/neon/issues/8969)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
shard_counts = get_consistent_node_shard_counts(env, total_shards)
log.info(f"Shard counts after filling node {ps.id}: {shard_counts}")
assert_consistent_balanced_attachments(env, total_shards)
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
env.storage_controller.consistency_check()
# Since we did `reconcile_until_idle` during the above loop, the system should be left in
# an optimally scheduled state. Validate that this includes all the tenants being scheduled
# in their home AZ.
assert_all_tenants_scheduled_in_home_az()
# Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,
# as they were not offline long enough to trigger any scheduling changes.
env.storage_controller.consistency_check()
check_memory()
# Simulate loss of an AZ
victim_az = "az-alpha"
killed_pageservers = []
for ps in env.pageservers:
if az_selector(ps.id) == victim_az:
ps.stop(immediate=True)
killed_pageservers.append(ps)
log.info(f"Killed pageserver {ps.id}")
assert killed_pageservers
# Wait for the controller to notice the pageservers are dead
def assert_pageservers_availability(
pageservers: list[NeonPageserver], expected_availability: PageserverAvailability
):
nodes = env.storage_controller.nodes()
checked_any = False
node_ids = [ps.id for ps in pageservers]
for node in nodes:
if node["id"] in node_ids:
checked_any = True
assert node["availability"] == expected_availability, (
f"Node {node['id']} is not {expected_availability} yet: {node['availability']}"
)
assert checked_any
wait_until(
lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.OFFLINE),
timeout=60,
)
# Let the controller finish all its rescheduling
env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120)
# Check that all the tenants are rescheduled to the remaining pageservers
for tenant_id in tenant_ids:
desc = env.storage_controller.tenant_describe(tenant_id)
for shard in desc["shards"]:
# Attachment should be outside the AZ where we killed the pageservers
assert az_selector(shard["node_attached"]) != victim_az, (
f"Shard {shard['tenant_shard_id']} still in {victim_az} (node {shard['node_attached']})"
)
# Bring back the pageservers
for ps in killed_pageservers:
ps.start()
wait_until(
lambda: assert_pageservers_availability(killed_pageservers, PageserverAvailability.ACTIVE),
timeout=60,
)
# A very long timeout is required: we will be migrating all the tenants on all the pageservers
# in the region that we just restored. Assume it'll take up to twice as long as it took to fill
# a single node
env.storage_controller.reconcile_until_idle(
max_interval=0.1, timeout_secs=DRAIN_FILL_TIMEOUT * 4
)
assert_all_tenants_scheduled_in_home_az()
# Stop the storage controller before tearing down fixtures, because it otherwise might log
# errors trying to call our `ComputeReconfigure`.
env.storage_controller.stop()