From 4b711caf5edb808a6bfbe69dc6a1cbe9a7ff70a6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 25 Sep 2024 14:56:39 +0100 Subject: [PATCH] storage controller: make proxying of GETs to pageservers more robust (#9065) ## Problem These commits are split off from https://github.com/neondatabase/neon/pull/8971/commits where I was fixing this to make a better scale test pass -- Vlad also independently recognized these issues with cloudbench in https://github.com/neondatabase/neon/issues/9062. 1. The storage controller proxies GET requests to pageservers based on their intent, not the ground truth of where they're really attached. 2. Proxied requests can race with scheduling to tenants, resulting in 404 responses if the request hits the wrong pageserver. Closes: https://github.com/neondatabase/neon/issues/9062 ## Summary of changes 1. If a shard has a running reconciler, then use the database generation_pageserver to decide who to proxy the request to 2. If such a request gets a 404 response and its scheduled node has changed since the request was dispatched. --- storage_controller/src/http.rs | 23 ++++-- storage_controller/src/reconciler.rs | 4 + storage_controller/src/service.rs | 76 ++++++++++++----- .../regress/test_storage_controller.py | 82 +++++++++++++++++++ 4 files changed, 158 insertions(+), 27 deletions(-) diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 95e4a469ac..4dd8badd03 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -515,7 +515,7 @@ async fn handle_tenant_timeline_passthrough( tracing::info!("Proxying request for tenant {} ({})", tenant_id, path); // Find the node that holds shard zero - let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?; + let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; // Callers will always pass an unsharded tenant ID. Before proxying, we must // rewrite this to a shard-aware shard zero ID. @@ -545,10 +545,10 @@ async fn handle_tenant_timeline_passthrough( let _timer = latency.start_timer(labels.clone()); let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref()); - let resp = client.get_raw(path).await.map_err(|_e| - // FIXME: give APiError a proper Unavailable variant. We return 503 here because - // if we can't successfully send a request to the pageserver, we aren't available. - ApiError::ShuttingDown)?; + let resp = client.get_raw(path).await.map_err(|e| + // We return 503 here because if we can't successfully send a request to the pageserver, + // either we aren't available or the pageserver is unavailable. + ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?; if !resp.status().is_success() { let error_counter = &METRICS_REGISTRY @@ -557,6 +557,19 @@ async fn handle_tenant_timeline_passthrough( error_counter.inc(labels); } + // Transform 404 into 503 if we raced with a migration + if resp.status() == reqwest::StatusCode::NOT_FOUND { + // Look up node again: if we migrated it will be different + let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?; + if new_node.get_id() != node.get_id() { + // Rather than retry here, send the client a 503 to prompt a retry: this matches + // the pageserver's use of 503, and all clients calling this API should retry on 503. + return Err(ApiError::ResourceUnavailable( + format!("Pageserver {node} returned 404, was migrated to {new_node}").into(), + )); + } + } + // We have a reqest::Response, would like a http::Response let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?); for (k, v) in resp.headers() { diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs index 750bcd7c01..93b1c80566 100644 --- a/storage_controller/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -541,6 +541,8 @@ impl Reconciler { } } + pausable_failpoint!("reconciler-live-migrate-pre-generation-inc"); + // Increment generation before attaching to new pageserver self.generation = Some( self.persistence @@ -617,6 +619,8 @@ impl Reconciler { }, ); + pausable_failpoint!("reconciler-live-migrate-post-detach"); + tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",); let dest_final_conf = build_location_config( &self.shard, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 6a11e9650c..a5e0129684 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -3508,34 +3508,66 @@ impl Service { /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this /// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound) - pub(crate) fn tenant_shard0_node( + pub(crate) async fn tenant_shard0_node( &self, tenant_id: TenantId, ) -> Result<(Node, TenantShardId), ApiError> { - let locked = self.inner.read().unwrap(); - let Some((tenant_shard_id, shard)) = locked - .tenants - .range(TenantShardId::tenant_range(tenant_id)) - .next() + // Look up in-memory state and maybe use the node from there. + { + let locked = self.inner.read().unwrap(); + let Some((tenant_shard_id, shard)) = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .next() + else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {tenant_id} not found").into(), + )); + }; + + let Some(intent_node_id) = shard.intent.get_attached() else { + tracing::warn!( + tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Shard not scheduled (policy {:?}), cannot generate pass-through URL", + shard.policy + ); + return Err(ApiError::Conflict( + "Cannot call timeline API on non-attached tenant".to_string(), + )); + }; + + if shard.reconciler.is_none() { + // Optimization: while no reconcile is in flight, we may trust our in-memory state + // to tell us which pageserver to use. Otherwise we will fall through and hit the database + let Some(node) = locked.nodes.get(intent_node_id) else { + // This should never happen + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard refers to nonexistent node" + ))); + }; + return Ok((node.clone(), *tenant_shard_id)); + } + }; + + // Look up the latest attached pageserver location from the database + // generation state: this will reflect the progress of any ongoing migration. + // Note that it is not guaranteed to _stay_ here, our caller must still handle + // the case where they call through to the pageserver and get a 404. + let db_result = self.persistence.tenant_generations(tenant_id).await?; + let Some(ShardGenerationState { + tenant_shard_id, + generation: _, + generation_pageserver: Some(node_id), + }) = db_result.first() else { - return Err(ApiError::NotFound( - anyhow::anyhow!("Tenant {tenant_id} not found").into(), + // This can happen if we raced with a tenant deletion or a shard split. On a retry + // the caller will either succeed (shard split case), get a proper 404 (deletion case), + // or a conflict response (case where tenant was detached in background) + return Err(ApiError::ResourceUnavailable( + "Shard {} not found in database, or is not attached".into(), )); }; - - // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might - // point to somewhere we haven't attached yet. - let Some(node_id) = shard.intent.get_attached() else { - tracing::warn!( - tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), - "Shard not scheduled (policy {:?}), cannot generate pass-through URL", - shard.policy - ); - return Err(ApiError::Conflict( - "Cannot call timeline API on non-attached tenant".to_string(), - )); - }; - + let locked = self.inner.read().unwrap(); let Some(node) = locked.nodes.get(node_id) else { // This should never happen return Err(ApiError::InternalServerError(anyhow::anyhow!( diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 4106efd4f9..3861f0b822 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4,6 +4,7 @@ import threading import time from collections import defaultdict from datetime import datetime, timezone +from enum import Enum from typing import Any, Dict, List, Optional, Set, Tuple, Union import pytest @@ -2466,6 +2467,87 @@ def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvB raise +class MigrationFailpoints(Enum): + # While only the origin is attached + PRE_GENERATION_INC = "reconciler-live-migrate-pre-generation-inc" + # While both locations are attached + POST_NOTIFY = "reconciler-live-migrate-post-notify" + # While only the destination is attached + POST_DETACH = "reconciler-live-migrate-post-detach" + + +@pytest.mark.parametrize( + "migration_failpoint", + [ + MigrationFailpoints.PRE_GENERATION_INC, + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ], +) +def test_storage_controller_proxy_during_migration( + neon_env_builder: NeonEnvBuilder, migration_failpoint: MigrationFailpoints +): + """ + If we send a proxied GET request to the controller during a migration, it should route + the request to whichever pageserver was most recently issued a generation. + + Reproducer for https://github.com/neondatabase/neon/issues/9062 + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + env = neon_env_builder.init_configs() + env.start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + env.neon_cli.create_tenant(tenant_id, timeline_id) + + # Activate a failpoint that will cause live migration to get stuck _after_ the generation has been issued + # to the new pageserver: this should result in requests routed to the new pageserver. + env.storage_controller.configure_failpoints((migration_failpoint.value, "pause")) + + origin_pageserver = env.get_tenant_pageserver(tenant_id) + dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0] + + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + migrate_fut = executor.submit( + env.storage_controller.tenant_shard_migrate, + TenantShardId(tenant_id, 0, 0), + dest_ps_id, + ) + + def has_hit_migration_failpoint(): + expr = f"at failpoint {str(migration_failpoint.value)}" + log.info(expr) + assert env.storage_controller.log_contains(expr) + + wait_until(10, 1, has_hit_migration_failpoint) + + # This request should be routed to whichever pageserver holds the highest generation + tenant_info = env.storage_controller.pageserver_api().tenant_status( + tenant_id, + ) + + if migration_failpoint in ( + MigrationFailpoints.POST_NOTIFY, + MigrationFailpoints.POST_DETACH, + ): + # We expect request to land on the destination + assert tenant_info["generation"] == 2 + elif migration_failpoint == MigrationFailpoints.PRE_GENERATION_INC: + # We expect request to land on the origin + assert tenant_info["generation"] == 1 + + # Eventually migration completes + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + migrate_fut.result() + except: + # Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown + env.storage_controller.configure_failpoints((migration_failpoint.value, "off")) + raise + + @run_only_on_default_postgres("this is like a 'unit test' against storcon db") def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_configs()