diff --git a/libs/utils/src/http/error.rs b/libs/utils/src/http/error.rs index 5e05e4e713..02fc9e3b99 100644 --- a/libs/utils/src/http/error.rs +++ b/libs/utils/src/http/error.rs @@ -28,6 +28,9 @@ pub enum ApiError { #[error("Resource temporarily unavailable: {0}")] ResourceUnavailable(Cow<'static, str>), + #[error("Too many requests: {0}")] + TooManyRequests(Cow<'static, str>), + #[error("Shutting down")] ShuttingDown, @@ -73,6 +76,10 @@ impl ApiError { err.to_string(), StatusCode::SERVICE_UNAVAILABLE, ), + ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status( + err.to_string(), + StatusCode::TOO_MANY_REQUESTS, + ), ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status( err.to_string(), StatusCode::REQUEST_TIMEOUT, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2985ab1efb..1079d8df29 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -715,6 +715,8 @@ async fn timeline_archival_config_handler( .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; + tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; + tenant .apply_timeline_archival_config(timeline_id, request_data.state, ctx) .await?; diff --git a/proxy/src/serverless/http_util.rs b/proxy/src/serverless/http_util.rs index 87a72ec5f0..c1c5764d17 100644 --- a/proxy/src/serverless/http_util.rs +++ b/proxy/src/serverless/http_util.rs @@ -41,6 +41,10 @@ pub(crate) fn api_error_into_response(this: ApiError) -> Response HttpErrorBody::response_from_msg_and_status( + err.to_string(), + StatusCode::TOO_MANY_REQUESTS, + ), ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status( err.to_string(), StatusCode::REQUEST_TIMEOUT, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index cc735dc27e..cedee54534 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -246,6 +246,11 @@ fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError { // storage controller's auth configuration. ApiError::InternalServerError(anyhow::anyhow!("{node} {status}: {msg}")) } + mgmt_api::Error::ApiError(status @ StatusCode::TOO_MANY_REQUESTS, msg) => { + // Pass through 429 errors: if pageserver is asking us to wait + retry, we in + // turn ask our clients to wait + retry + ApiError::Conflict(format!("{node} {status}: {status} {msg}")) + } mgmt_api::Error::ApiError(status, msg) => { // Presume general case of pageserver API errors is that we tried to do something // that can't be done right now. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 059707c8ed..a313ac2ed3 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1986,11 +1986,11 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"reconcile_all waited for {n} shards") return n - def reconcile_until_idle(self, timeout_secs=30): + def reconcile_until_idle(self, timeout_secs=30, max_interval=5): start_at = time.time() n = 1 - delay_sec = 0.5 - delay_max = 5 + delay_sec = 0.1 + delay_max = max_interval while n > 0: n = self.reconcile_all() if n == 0: diff --git a/test_runner/performance/test_storage_controller_scale.py b/test_runner/performance/test_storage_controller_scale.py index 452a856714..d2eba751f8 100644 --- a/test_runner/performance/test_storage_controller_scale.py +++ b/test_runner/performance/test_storage_controller_scale.py @@ -4,9 +4,10 @@ import concurrent.futures import random import time from collections import defaultdict +from enum import Enum import pytest -from fixtures.common_types import TenantId, TenantShardId, TimelineId +from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.compute_reconfigure import ComputeReconfigure from fixtures.log_helper import log from fixtures.neon_fixtures import ( @@ -34,6 +35,7 @@ def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[ if tenant_placement[tid]["intent"]["attached"] == tenant_placement[tid]["observed"]["attached"] } + assert len(matching) == total_shards attached_per_node: defaultdict[str, int] = defaultdict(int) @@ -107,15 +109,48 @@ def test_storage_controller_many_tenants( ps.allowed_errors.append(".*request was dropped before completing.*") # Total tenants - tenant_count = 4000 + small_tenant_count = 7800 + large_tenant_count = 200 + tenant_count = small_tenant_count + large_tenant_count + large_tenant_shard_count = 8 + total_shards = small_tenant_count + large_tenant_count * large_tenant_shard_count - # Shards per tenant - shard_count = 2 - stripe_size = 1024 + # A small stripe size to encourage all shards to get some data + stripe_size = 1 - total_shards = tenant_count * shard_count + # We use a fixed seed to make the test somewhat reproducible: we want a randomly + # chosen order in the sense that it's arbitrary, but not in the sense that it should change every run. + rng = random.Random(1234) - tenants = set(TenantId.generate() for _i in range(0, tenant_count)) + class Tenant: + def __init__(self): + # Tenants may optionally contain a timeline + self.timeline_id = None + + # Tenants may be marked as 'large' to get multiple shard during creation phase + self.large = False + + tenant_ids = list(TenantId.generate() for _i in range(0, tenant_count)) + tenants = dict((tid, Tenant()) for tid in tenant_ids) + + # We will create timelines in only a subset of tenants, because creating timelines + # does many megabytes of IO, and we want to densely simulate huge tenant counts on + # a single test node. + tenant_timelines_count = 100 + + # These lists are maintained for use with rng.choice + tenants_with_timelines = list(rng.sample(tenants.keys(), tenant_timelines_count)) + tenants_without_timelines = list( + tenant_id for tenant_id in tenants if tenant_id not in tenants_with_timelines + ) + + # For our sharded tenants, we will make half of them with timelines and half without + assert large_tenant_count >= tenant_timelines_count / 2 + for tenant_id in tenants_with_timelines[0 : large_tenant_count // 2]: + tenants[tenant_id].large = True + + for tenant_id in tenants_without_timelines[0 : large_tenant_count // 2]: + tenants[tenant_id].large = True virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True) @@ -125,23 +160,39 @@ def test_storage_controller_many_tenants( rss = env.storage_controller.get_metric_value("process_resident_memory_bytes") assert rss is not None - log.info(f"Resident memory: {rss} ({ rss / (shard_count * tenant_count)} per shard)") - assert rss < expect_memory_per_shard * shard_count * tenant_count - - # We use a fixed seed to make the test somewhat reproducible: we want a randomly - # chosen order in the sense that it's arbitrary, but not in the sense that it should change every run. - rng = random.Random(1234) + log.info(f"Resident memory: {rss} ({ rss / total_shards} per shard)") + assert rss < expect_memory_per_shard * total_shards # Issue more concurrent operations than the storage controller's reconciler concurrency semaphore # permits, to ensure that we are exercising stressing that. api_concurrency = 135 - # We will create tenants directly via API, not via neon_local, to avoid any false - # serialization of operations in neon_local (it e.g. loads/saves a config file on each call) - with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor: - futs = [] + # A different concurrency limit for bulk tenant+timeline creations: these do I/O and will + # start timing on test nodes if we aren't a bit careful. + create_concurrency = 16 + + class Operation(str, Enum): + TIMELINE_OPS = "timeline_ops" + SHARD_MIGRATE = "shard_migrate" + TENANT_PASSTHROUGH = "tenant_passthrough" + + run_ops = api_concurrency * 4 + assert run_ops < len(tenants) + + # Creation phase: make a lot of tenants, and create timelines in a subset of them + # This executor has concurrency set modestly, to avoid overloading pageservers with timeline creations. + with concurrent.futures.ThreadPoolExecutor(max_workers=create_concurrency) as executor: + tenant_create_futs = [] t1 = time.time() - for tenant_id in tenants: + + for tenant_id, tenant in tenants.items(): + if tenant.large: + shard_count = large_tenant_shard_count + else: + shard_count = 1 + + # We will create tenants directly via API, not via neon_local, to avoid any false + # serialization of operations in neon_local (it e.g. loads/saves a config file on each call) f = executor.submit( env.storage_controller.tenant_create, tenant_id, @@ -152,44 +203,106 @@ def test_storage_controller_many_tenants( tenant_config={"heatmap_period": "10s"}, placement_policy={"Attached": 1}, ) - futs.append(f) + tenant_create_futs.append(f) - # Wait for creations to finish - for f in futs: + # Wait for tenant creations to finish + for f in tenant_create_futs: f.result() log.info( f"Created {len(tenants)} tenants in {time.time() - t1}, {len(tenants) / (time.time() - t1)}/s" ) - run_ops = api_concurrency * 4 - assert run_ops < len(tenants) - op_tenants = list(tenants)[0:run_ops] + # Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior + # would be for original scheduling decisions to always match optimizer's preference) + # (workaround for https://github.com/neondatabase/neon/issues/8969) + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) + + # Create timelines in those tenants which are going to get one + t1 = time.time() + timeline_create_futs = [] + for tenant_id in tenants_with_timelines: + timeline_id = TimelineId.generate() + tenants[tenant_id].timeline_id = timeline_id + f = executor.submit( + env.storage_controller.pageserver_api().timeline_create, + PgVersion.NOT_SET, + tenant_id, + timeline_id, + ) + timeline_create_futs.append(f) + + for f in timeline_create_futs: + f.result() + log.info( + f"Created {len(tenants_with_timelines)} timelines in {time.time() - t1}, {len(tenants_with_timelines) / (time.time() - t1)}/s" + ) + + # Plan operations: ensure each tenant with a timeline gets at least + # one of each operation type. Then add other tenants to make up the + # numbers. + ops_plan = [] + for tenant_id in tenants_with_timelines: + ops_plan.append((tenant_id, Operation.TIMELINE_OPS)) + ops_plan.append((tenant_id, Operation.SHARD_MIGRATE)) + ops_plan.append((tenant_id, Operation.TENANT_PASSTHROUGH)) + + # Fill up remaining run_ops with migrations of tenants without timelines + other_migrate_tenants = rng.sample(tenants_without_timelines, run_ops - len(ops_plan)) + + for tenant_id in other_migrate_tenants: + ops_plan.append( + ( + tenant_id, + rng.choice([Operation.SHARD_MIGRATE, Operation.TENANT_PASSTHROUGH]), + ) + ) + + # Exercise phase: pick pseudo-random operations to do on the tenants + timelines + # This executor has concurrency high enough to stress the storage controller API. + with concurrent.futures.ThreadPoolExecutor(max_workers=api_concurrency) as executor: + + def exercise_timeline_ops(tenant_id, timeline_id): + # A read operation: this requires looking up shard zero and routing there + detail = virtual_ps_http.timeline_detail(tenant_id, timeline_id) + assert detail["timeline_id"] == str(timeline_id) + + # A fan-out write operation to all shards in a tenant. + # - We use a metadata operation rather than something like a timeline create, because + # timeline creations are I/O intensive and this test isn't meant to be a stress test for + # doing lots of concurrent timeline creations. + archival_state = rng.choice( + [TimelineArchivalState.ARCHIVED, TimelineArchivalState.UNARCHIVED] + ) + virtual_ps_http.timeline_archival_config(tenant_id, timeline_id, archival_state) # Generate a mixture of operations and dispatch them all concurrently futs = [] - for tenant_id in op_tenants: - op = rng.choice([0, 1, 2]) - if op == 0: - # A fan-out write operation to all shards in a tenant (timeline creation) + for tenant_id, op in ops_plan: + if op == Operation.TIMELINE_OPS: + op_timeline_id = tenants[tenant_id].timeline_id + assert op_timeline_id is not None + + # Exercise operations that modify tenant scheduling state but require traversing + # the fan-out-to-all-shards functionality. f = executor.submit( - virtual_ps_http.timeline_create, - PgVersion.NOT_SET, + exercise_timeline_ops, tenant_id, - TimelineId.generate(), + op_timeline_id, ) - elif op == 1: + elif op == Operation.SHARD_MIGRATE: # A reconciler operation: migrate a shard. - shard_number = rng.randint(0, shard_count - 1) - tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count) + desc = env.storage_controller.tenant_describe(tenant_id) + + shard_number = rng.randint(0, len(desc["shards"]) - 1) + tenant_shard_id = TenantShardId(tenant_id, shard_number, len(desc["shards"])) # Migrate it to its secondary location - desc = env.storage_controller.tenant_describe(tenant_id) dest_ps_id = desc["shards"][shard_number]["node_secondary"][0] f = executor.submit( env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id ) - elif op == 2: + elif op == Operation.TENANT_PASSTHROUGH: # A passthrough read to shard zero f = executor.submit(virtual_ps_http.tenant_status, tenant_id) @@ -199,10 +312,18 @@ def test_storage_controller_many_tenants( for f in futs: f.result() + log.info("Completed mixed operations phase") + # Some of the operations above (notably migrations) might leave the controller in a state where it has # some work to do, for example optimizing shard placement after we do a random migration. Wait for the system # to reach a quiescent state before doing following checks. - env.storage_controller.reconcile_until_idle() + # + # - Set max_interval low because we probably have a significant number of optimizations to complete and would like + # the test to run quickly. + # - Set timeout high because we might be waiting for optimizations that reuqire a secondary + # to warm up, and if we just started a secondary in the previous step, it might wait some time + # before downloading its heatmap + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) env.storage_controller.consistency_check() check_memory() @@ -213,6 +334,7 @@ def test_storage_controller_many_tenants( # # We do not require that the system is quiescent already here, although at present in this point in the test # that may be the case. + log.info("Reconciling all & timing") while True: t1 = time.time() reconcilers = env.storage_controller.reconcile_all() @@ -225,6 +347,7 @@ def test_storage_controller_many_tenants( break # Restart the storage controller + log.info("Restarting controller") env.storage_controller.stop() env.storage_controller.start() @@ -246,7 +369,16 @@ def test_storage_controller_many_tenants( # Restart pageservers gracefully: this exercises the /re-attach pageserver API # and the storage controller drain and fill API + log.info("Restarting pageservers...") + + # Parameters for how long we expect it to take to migrate all of the tenants from/to + # a node during a drain/fill operation + DRAIN_FILL_TIMEOUT = 240 + DRAIN_FILL_BACKOFF = 5 + for ps in env.pageservers: + log.info(f"Draining pageserver {ps.id}") + t1 = time.time() env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2 ) @@ -255,9 +387,10 @@ def test_storage_controller_many_tenants( ps.id, PageserverAvailability.ACTIVE, PageserverSchedulingPolicy.PAUSE_FOR_RESTART, - max_attempts=24, - backoff=5, + max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF, + backoff=DRAIN_FILL_BACKOFF, ) + log.info(f"Drained pageserver {ps.id} in {time.time() - t1}s") shard_counts = get_consistent_node_shard_counts(env, total_shards) log.info(f"Shard counts after draining node {ps.id}: {shard_counts}") @@ -275,6 +408,7 @@ def test_storage_controller_many_tenants( backoff=1, ) + log.info(f"Filling pageserver {ps.id}") env.storage_controller.retryable_node_operation( lambda ps_id: env.storage_controller.node_fill(ps_id), ps.id, max_attempts=3, backoff=2 ) @@ -282,16 +416,23 @@ def test_storage_controller_many_tenants( ps.id, PageserverAvailability.ACTIVE, PageserverSchedulingPolicy.ACTIVE, - max_attempts=24, - backoff=5, + max_attempts=DRAIN_FILL_TIMEOUT // DRAIN_FILL_BACKOFF, + backoff=DRAIN_FILL_BACKOFF, ) + log.info(f"Filled pageserver {ps.id} in {time.time() - t1}s") + + # Waiting for optimizer to stabilize, if it disagrees with scheduling (the correct behavior + # would be for original scheduling decisions to always match optimizer's preference) + # (workaround for https://github.com/neondatabase/neon/issues/8969) + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) + shard_counts = get_consistent_node_shard_counts(env, total_shards) log.info(f"Shard counts after filling node {ps.id}: {shard_counts}") assert_consistent_balanced_attachments(env, total_shards) - env.storage_controller.reconcile_until_idle() + env.storage_controller.reconcile_until_idle(max_interval=0.1, timeout_secs=120) env.storage_controller.consistency_check() # Consistency check is safe here: restarting pageservers should not have caused any Reconcilers to spawn,