diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 5f9a1124de..6b6d081dcd 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -48,7 +48,10 @@ use crate::metrics::{ }; use crate::persistence::SafekeeperUpsert; use crate::reconciler::ReconcileError; -use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service}; +use crate::service::{ + LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service, + TenantMutationLocations, +}; /// State available to HTTP request handlers pub struct HttpState { @@ -734,77 +737,104 @@ async fn handle_tenant_timeline_passthrough( path ); - // Find the node that holds shard zero - let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() { - service + let tenant_shard_id = if tenant_or_shard_id.is_unsharded() { + // If the request contains only tenant ID, find the node that holds shard zero + let (_, shard_id) = service .tenant_shard0_node(tenant_or_shard_id.tenant_id) - .await? + .await?; + shard_id } else { - let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?; - (node, tenant_or_shard_id, consistent) + tenant_or_shard_id }; - // Callers will always pass an unsharded tenant ID. Before proxying, we must - // rewrite this to a shard-aware shard zero ID. - let path = format!("{path}"); - let tenant_str = tenant_or_shard_id.tenant_id.to_string(); - let tenant_shard_str = format!("{tenant_shard_id}"); - let path = path.replace(&tenant_str, &tenant_shard_str); + let service_inner = service.clone(); - let latency = &METRICS_REGISTRY - .metrics_group - .storage_controller_passthrough_request_latency; + service.tenant_shard_remote_mutation(tenant_shard_id, |locations| async move { + let TenantMutationLocations(locations) = locations; + if locations.is_empty() { + return Err(ApiError::NotFound(anyhow::anyhow!("Tenant {} not found", tenant_or_shard_id.tenant_id).into())); + } - let path_label = path_without_ids(&path) - .split('/') - .filter(|token| !token.is_empty()) - .collect::>() - .join("_"); - let labels = PageserverRequestLabelGroup { - pageserver_id: &node.get_id().to_string(), - path: &path_label, - method: crate::metrics::Method::Get, - }; + let (tenant_or_shard_id, locations) = locations.into_iter().next().unwrap(); + let node = locations.latest.node; - let _timer = latency.start_timer(labels.clone()); + // Callers will always pass an unsharded tenant ID. Before proxying, we must + // rewrite this to a shard-aware shard zero ID. + let path = format!("{path}"); + let tenant_str = tenant_or_shard_id.tenant_id.to_string(); + let tenant_shard_str = format!("{tenant_shard_id}"); + let path = path.replace(&tenant_str, &tenant_shard_str); - let client = mgmt_api::Client::new( - service.get_http_client().clone(), - node.base_url(), - service.get_config().pageserver_jwt_token.as_deref(), - ); - let resp = client.op_raw(method, path).await.map_err(|e| - // We return 503 here because if we can't successfully send a request to the pageserver, - // either we aren't available or the pageserver is unavailable. - ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?; - - if !resp.status().is_success() { - let error_counter = &METRICS_REGISTRY + let latency = &METRICS_REGISTRY .metrics_group - .storage_controller_passthrough_request_error; - error_counter.inc(labels); - } + .storage_controller_passthrough_request_latency; - // Transform 404 into 503 if we raced with a migration - if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent { - // Rather than retry here, send the client a 503 to prompt a retry: this matches - // the pageserver's use of 503, and all clients calling this API should retry on 503. - return Err(ApiError::ResourceUnavailable( - format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(), - )); - } + let path_label = path_without_ids(&path) + .split('/') + .filter(|token| !token.is_empty()) + .collect::>() + .join("_"); + let labels = PageserverRequestLabelGroup { + pageserver_id: &node.get_id().to_string(), + path: &path_label, + method: crate::metrics::Method::Get, + }; - // We have a reqest::Response, would like a http::Response - let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?); - for (k, v) in resp.headers() { - builder = builder.header(k.as_str(), v.as_bytes()); - } + let _timer = latency.start_timer(labels.clone()); - let response = builder - .body(Body::wrap_stream(resp.bytes_stream())) - .map_err(|e| ApiError::InternalServerError(e.into()))?; + let client = mgmt_api::Client::new( + service_inner.get_http_client().clone(), + node.base_url(), + service_inner.get_config().pageserver_jwt_token.as_deref(), + ); + let resp = client.op_raw(method, path).await.map_err(|e| + // We return 503 here because if we can't successfully send a request to the pageserver, + // either we aren't available or the pageserver is unavailable. + ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?; - Ok(response) + if !resp.status().is_success() { + let error_counter = &METRICS_REGISTRY + .metrics_group + .storage_controller_passthrough_request_error; + error_counter.inc(labels); + } + let resp_staus = resp.status(); + + // We have a reqest::Response, would like a http::Response + let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp_staus)?); + for (k, v) in resp.headers() { + builder = builder.header(k.as_str(), v.as_bytes()); + } + let resp_bytes = resp + .bytes() + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + // Inspect 404 errors: at this point, we know that the tenant exists, but the pageserver we route + // the request to might not yet be ready. Therefore, if it is a _tenant_ not found error, we can + // convert it into a 503. TODO: we should make this part of the check in `tenant_shard_remote_mutation`. + // However, `tenant_shard_remote_mutation` currently cannot inspect the HTTP error response body, + // so we have to do it here instead. + if resp_staus == reqwest::StatusCode::NOT_FOUND { + let resp_str = std::str::from_utf8(&resp_bytes) + .map_err(|e| ApiError::InternalServerError(e.into()))?; + // We only handle "tenant not found" errors; other 404s like timeline not found should + // be forwarded as-is. + if resp_str.contains(&format!("tenant {tenant_or_shard_id}")) { + // Rather than retry here, send the client a 503 to prompt a retry: this matches + // the pageserver's use of 503, and all clients calling this API should retry on 503. + return Err(ApiError::ResourceUnavailable( + format!( + "Pageserver {node} returned tenant 404 due to ongoing migration, retry later" + ) + .into(), + )); + } + } + let response = builder + .body(Body::from(resp_bytes)) + .map_err(|e| ApiError::InternalServerError(e.into()))?; + Ok(response) + }).await? } async fn handle_tenant_locate( diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ec3b419437..a1ff9b3c61 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -719,19 +719,19 @@ pub(crate) enum ReconcileResultRequest { } #[derive(Clone)] -struct MutationLocation { - node: Node, - generation: Generation, +pub(crate) struct MutationLocation { + pub(crate) node: Node, + pub(crate) generation: Generation, } #[derive(Clone)] -struct ShardMutationLocations { - latest: MutationLocation, - other: Vec, +pub(crate) struct ShardMutationLocations { + pub(crate) latest: MutationLocation, + pub(crate) other: Vec, } #[derive(Default, Clone)] -struct TenantMutationLocations(BTreeMap); +pub(crate) struct TenantMutationLocations(pub BTreeMap); struct ReconcileAllResult { spawned_reconciles: usize, @@ -763,6 +763,29 @@ impl ReconcileAllResult { } } +enum TenantIdOrShardId { + TenantId(TenantId), + TenantShardId(TenantShardId), +} + +impl TenantIdOrShardId { + fn tenant_id(&self) -> TenantId { + match self { + TenantIdOrShardId::TenantId(tenant_id) => *tenant_id, + TenantIdOrShardId::TenantShardId(tenant_shard_id) => tenant_shard_id.tenant_id, + } + } + + fn matches(&self, tenant_shard_id: &TenantShardId) -> bool { + match self { + TenantIdOrShardId::TenantId(tenant_id) => tenant_shard_id.tenant_id == *tenant_id, + TenantIdOrShardId::TenantShardId(this_tenant_shard_id) => { + this_tenant_shard_id == tenant_shard_id + } + } + } +} + impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -4814,6 +4837,12 @@ impl Service { } } + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {tenant_id} not found").into(), + )); + } + Ok(TenantShardAttachState { targets, by_node_id, @@ -5040,11 +5069,37 @@ impl Service { /// - Looks up the shards and the nodes where they were most recently attached /// - Guarantees that after the inner function returns, the shards' generations haven't moved on: this /// ensures that the remote operation acted on the most recent generation, and is therefore durable. - async fn tenant_remote_mutation( + pub(crate) async fn tenant_remote_mutation( &self, tenant_id: TenantId, op: O, ) -> Result + where + O: FnOnce(TenantMutationLocations) -> F, + F: std::future::Future, + { + self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantId(tenant_id), op) + .await + } + + pub(crate) async fn tenant_shard_remote_mutation( + &self, + tenant_shard_id: TenantShardId, + op: O, + ) -> Result + where + O: FnOnce(TenantMutationLocations) -> F, + F: std::future::Future, + { + self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantShardId(tenant_shard_id), op) + .await + } + + async fn tenant_remote_mutation_inner( + &self, + tenant_id_or_shard_id: TenantIdOrShardId, + op: O, + ) -> Result where O: FnOnce(TenantMutationLocations) -> F, F: std::future::Future, @@ -5056,7 +5111,13 @@ impl Service { // run concurrently with reconciliations, and it is not guaranteed that the node we find here // will still be the latest when we're done: we will check generations again at the end of // this function to handle that. - let generations = self.persistence.tenant_generations(tenant_id).await?; + let generations = self + .persistence + .tenant_generations(tenant_id_or_shard_id.tenant_id()) + .await? + .into_iter() + .filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id)) + .collect::>(); if generations .iter() @@ -5070,9 +5131,14 @@ impl Service { // One or more shards has not been attached to a pageserver. Check if this is because it's configured // to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry) let locked = self.inner.read().unwrap(); - for (shard_id, shard) in - locked.tenants.range(TenantShardId::tenant_range(tenant_id)) - { + let tenant_shards = locked + .tenants + .range(TenantShardId::tenant_range( + tenant_id_or_shard_id.tenant_id(), + )) + .filter(|(shard_id, _)| tenant_id_or_shard_id.matches(shard_id)) + .collect::>(); + for (shard_id, shard) in tenant_shards { match shard.policy { PlacementPolicy::Attached(_) => { // This shard is meant to be attached: the caller is not wrong to try and @@ -5182,7 +5248,14 @@ impl Service { // Post-check: are all the generations of all the shards the same as they were initially? This proves that // our remote operation executed on the latest generation and is therefore persistent. { - let latest_generations = self.persistence.tenant_generations(tenant_id).await?; + let latest_generations = self + .persistence + .tenant_generations(tenant_id_or_shard_id.tenant_id()) + .await? + .into_iter() + .filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id)) + .collect::>(); + if latest_generations .into_iter() .map( @@ -5316,7 +5389,7 @@ impl Service { pub(crate) async fn tenant_shard0_node( &self, tenant_id: TenantId, - ) -> Result<(Node, TenantShardId, bool), ApiError> { + ) -> Result<(Node, TenantShardId), ApiError> { let tenant_shard_id = { let locked = self.inner.read().unwrap(); let Some((tenant_shard_id, _shard)) = locked @@ -5334,7 +5407,7 @@ impl Service { self.tenant_shard_node(tenant_shard_id) .await - .map(|(node, consistent)| (node, tenant_shard_id, consistent)) + .map(|node| (node, tenant_shard_id)) } /// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this @@ -5344,7 +5417,7 @@ impl Service { pub(crate) async fn tenant_shard_node( &self, tenant_shard_id: TenantShardId, - ) -> Result<(Node, bool), ApiError> { + ) -> Result { // Look up in-memory state and maybe use the node from there. { let locked = self.inner.read().unwrap(); @@ -5374,8 +5447,7 @@ impl Service { "Shard refers to nonexistent node" ))); }; - let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id); - return Ok((node.clone(), consistent)); + return Ok(node.clone()); } }; @@ -5410,7 +5482,7 @@ impl Service { ))); }; // As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent. - Ok((node.clone(), false)) + Ok(node.clone()) } pub(crate) fn tenant_locate( diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 23b9d1c8c9..f95b0ee4d1 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -847,7 +847,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): return res_json def timeline_lsn_lease( - self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn + self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, lsn: Lsn, **kwargs ): data = { "lsn": str(lsn), @@ -857,6 +857,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.post( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/lsn_lease", json=data, + **kwargs, ) self.verbose_error(res) res_json = res.json() diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index fbdb14b6bb..9986c1f24a 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING import fixtures.utils import pytest from fixtures.auth_tokens import TokenScope -from fixtures.common_types import TenantId, TenantShardId, TimelineId +from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( DEFAULT_AZ_ID, @@ -47,6 +47,7 @@ from fixtures.utils import ( wait_until, ) from fixtures.workload import Workload +from requests.adapters import HTTPAdapter from urllib3 import Retry from werkzeug.wrappers.response import Response @@ -4858,3 +4859,103 @@ def test_storage_controller_migrate_with_pageserver_restart( "shards": [{"node_id": int(secondary.id), "shard_number": 0}], "preferred_az": DEFAULT_AZ_ID, } + + +@run_only_on_default_postgres("PG version is not important for this test") +def test_storage_controller_forward_404(neon_env_builder: NeonEnvBuilder): + """ + Ensures that the storage controller correctly forwards 404s and converts some of them + into 503s before forwarding to the client. + """ + neon_env_builder.num_pageservers = 2 + neon_env_builder.num_azs = 2 + + env = neon_env_builder.init_start() + env.storage_controller.allowed_errors.append(".*Reconcile error.*") + env.storage_controller.allowed_errors.append(".*Timed out.*") + + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + # 404s on tenants and timelines are forwarded as-is when reconciler is not running. + + # Access a non-existing timeline -> 404 + with pytest.raises(PageserverApiException) as e: + env.storage_controller.pageserver_api().timeline_detail( + env.initial_tenant, TimelineId.generate() + ) + assert e.value.status_code == 404 + with pytest.raises(PageserverApiException) as e: + env.storage_controller.pageserver_api().timeline_lsn_lease( + env.initial_tenant, TimelineId.generate(), Lsn(0) + ) + assert e.value.status_code == 404 + + # Access a non-existing tenant when reconciler is not running -> 404 + with pytest.raises(PageserverApiException) as e: + env.storage_controller.pageserver_api().timeline_detail( + TenantId.generate(), env.initial_timeline + ) + assert e.value.status_code == 404 + with pytest.raises(PageserverApiException) as e: + env.storage_controller.pageserver_api().timeline_lsn_lease( + TenantId.generate(), env.initial_timeline, Lsn(0) + ) + assert e.value.status_code == 404 + + # Normal requests should succeed + detail = env.storage_controller.pageserver_api().timeline_detail( + env.initial_tenant, env.initial_timeline + ) + last_record_lsn = Lsn(detail["last_record_lsn"]) + env.storage_controller.pageserver_api().timeline_lsn_lease( + env.initial_tenant, env.initial_timeline, last_record_lsn + ) + + # Get into a situation where the intent state is not the same as the observed state. + describe = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0] + current_primary = describe["node_attached"] + current_secondary = describe["node_secondary"][0] + assert current_primary != current_secondary + + # Pause the reconciler so that the generation number won't be updated. + env.storage_controller.configure_failpoints( + ("reconciler-live-migrate-post-generation-inc", "pause") + ) + + # Do the migration in another thread; the request will be dropped as we don't wait. + shard_zero = TenantShardId(env.initial_tenant, 0, 0) + concurrent.futures.ThreadPoolExecutor(max_workers=1).submit( + env.storage_controller.tenant_shard_migrate, + shard_zero, + current_secondary, + StorageControllerMigrationConfig(override_scheduler=True), + ) + # Not the best way to do this, we should wait until the migration gets started. + time.sleep(1) + placement = env.storage_controller.get_tenants_placement()[str(shard_zero)] + assert placement["observed"] != placement["intent"] + assert placement["observed"]["attached"] == current_primary + assert placement["intent"]["attached"] == current_secondary + + # Now we issue requests that would cause 404 again + retry_strategy = Retry(total=0) + adapter = HTTPAdapter(max_retries=retry_strategy) + + no_retry_api = env.storage_controller.pageserver_api() + no_retry_api.mount("http://", adapter) + no_retry_api.mount("https://", adapter) + + # As intent state != observed state, tenant not found error should return 503, + # so that the client can retry once we've successfully migrated. + with pytest.raises(PageserverApiException) as e: + no_retry_api.timeline_detail(env.initial_tenant, TimelineId.generate()) + assert e.value.status_code == 503, f"unexpected status code and error: {e.value}" + with pytest.raises(PageserverApiException) as e: + no_retry_api.timeline_lsn_lease(env.initial_tenant, TimelineId.generate(), Lsn(0)) + assert e.value.status_code == 503, f"unexpected status code and error: {e.value}" + + # Unblock reconcile operations + env.storage_controller.configure_failpoints( + ("reconciler-live-migrate-post-generation-inc", "off") + )