diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 62fc212e12..c8227f0219 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -735,15 +735,13 @@ async fn handle_tenant_timeline_passthrough( ); // Find the node that holds shard zero - let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() { + let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() { service .tenant_shard0_node(tenant_or_shard_id.tenant_id) .await? } else { - ( - service.tenant_shard_node(tenant_or_shard_id).await?, - tenant_or_shard_id, - ) + let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?; + (node, tenant_or_shard_id, consistent) }; // Callers will always pass an unsharded tenant ID. Before proxying, we must @@ -788,16 +786,12 @@ async fn handle_tenant_timeline_passthrough( } // Transform 404 into 503 if we raced with a migration - if resp.status() == reqwest::StatusCode::NOT_FOUND { - // Look up node again: if we migrated it will be different - let new_node = service.tenant_shard_node(tenant_shard_id).await?; - if new_node.get_id() != node.get_id() { - // Rather than retry here, send the client a 503 to prompt a retry: this matches - // the pageserver's use of 503, and all clients calling this API should retry on 503. - return Err(ApiError::ResourceUnavailable( - format!("Pageserver {node} returned 404, was migrated to {new_node}").into(), - )); - } + if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent { + // Rather than retry here, send the client a 503 to prompt a retry: this matches + // the pageserver's use of 503, and all clients calling this API should retry on 503. + return Err(ApiError::ResourceUnavailable( + format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(), + )); } // We have a reqest::Response, would like a http::Response @@ -2597,6 +2591,17 @@ pub fn make_router( ) }, ) + // Tenant timeline mark_invisible passthrough to shard zero + .put( + "/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible", + |r| { + tenant_service_handler( + r, + handle_tenant_timeline_passthrough, + RequestName("v1_tenant_timeline_mark_invisible_passthrough"), + ) + }, + ) // Tenant detail GET passthrough to shard zero: .get("/v1/tenant/:tenant_id", |r| { tenant_service_handler( @@ -2615,17 +2620,6 @@ pub fn make_router( RequestName("v1_tenant_passthrough"), ) }) - // Tenant timeline mark_invisible passthrough to shard zero - .put( - "/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible", - |r| { - tenant_service_handler( - r, - handle_tenant_timeline_passthrough, - RequestName("v1_tenant_timeline_mark_invisible_passthrough"), - ) - }, - ) } #[cfg(test)] diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 638cb410fa..0c5d7f44d4 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -207,6 +207,27 @@ enum ShardGenerationValidity { }, } +/// We collect the state of attachments for some operations to determine if the operation +/// needs to be retried when it fails. +struct TenantShardAttachState { + /// The targets of the operation. + /// + /// Tenant shard ID, node ID, node, is intent node observed primary. + targets: Vec<(TenantShardId, NodeId, Node, bool)>, + + /// The targets grouped by node ID. + by_node_id: HashMap, +} + +impl TenantShardAttachState { + fn for_api_call(&self) -> Vec<(TenantShardId, Node)> { + self.targets + .iter() + .map(|(tenant_shard_id, _, node, _)| (*tenant_shard_id, node.clone())) + .collect() + } +} + pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128; pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256; pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32; @@ -4752,6 +4773,86 @@ impl Service { Ok(()) } + fn is_observed_consistent_with_intent( + &self, + shard: &TenantShard, + intent_node_id: NodeId, + ) -> bool { + if let Some(location) = shard.observed.locations.get(&intent_node_id) + && let Some(ref conf) = location.conf + && (conf.mode == LocationConfigMode::AttachedSingle + || conf.mode == LocationConfigMode::AttachedMulti) + { + true + } else { + false + } + } + + fn collect_tenant_shards( + &self, + tenant_id: TenantId, + ) -> Result { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + let mut by_node_id = HashMap::new(); + + // If the request got an unsharded tenant id, then apply + // the operation to all shards. Otherwise, apply it to a specific shard. + let shards_range = TenantShardId::tenant_range(tenant_id); + + for (tenant_shard_id, shard) in locked.tenants.range(shards_range) { + if let Some(node_id) = shard.intent.get_attached() { + let node = locked + .nodes + .get(node_id) + .expect("Pageservers may not be deleted while referenced"); + + let consistent = self.is_observed_consistent_with_intent(shard, *node_id); + + targets.push((*tenant_shard_id, *node_id, node.clone(), consistent)); + by_node_id.insert(*node_id, (*tenant_shard_id, node.clone(), consistent)); + } + } + + Ok(TenantShardAttachState { + targets, + by_node_id, + }) + } + + fn process_result_and_passthrough_errors( + &self, + results: Vec<(Node, Result)>, + attach_state: TenantShardAttachState, + ) -> Result, ApiError> { + let mut processed_results: Vec<(Node, T)> = Vec::with_capacity(results.len()); + debug_assert_eq!(results.len(), attach_state.targets.len()); + for (node, res) in results { + let is_consistent = attach_state + .by_node_id + .get(&node.get_id()) + .map(|(_, _, consistent)| *consistent); + match res { + Ok(res) => processed_results.push((node, res)), + Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) + if is_consistent == Some(false) => + { + // This is expected if the attach is not finished yet. Return 503 so that the client can retry. + return Err(ApiError::ResourceUnavailable( + format!( + "Timeline is not attached to the pageserver {} yet, please retry", + node.get_id() + ) + .into(), + )); + } + Err(e) => return Err(passthrough_api_error(&node, e)), + } + } + Ok(processed_results) + } + pub(crate) async fn tenant_timeline_lsn_lease( &self, tenant_id: TenantId, @@ -4765,49 +4866,11 @@ impl Service { ) .await; - let mut retry_if_not_attached = false; - let targets = { - let locked = self.inner.read().unwrap(); - let mut targets = Vec::new(); + let attach_state = self.collect_tenant_shards(tenant_id)?; - // If the request got an unsharded tenant id, then apply - // the operation to all shards. Otherwise, apply it to a specific shard. - let shards_range = TenantShardId::tenant_range(tenant_id); - - for (tenant_shard_id, shard) in locked.tenants.range(shards_range) { - if let Some(node_id) = shard.intent.get_attached() { - let node = locked - .nodes - .get(node_id) - .expect("Pageservers may not be deleted while referenced"); - - targets.push((*tenant_shard_id, node.clone())); - - if let Some(location) = shard.observed.locations.get(node_id) { - if let Some(ref conf) = location.conf { - if conf.mode != LocationConfigMode::AttachedSingle - && conf.mode != LocationConfigMode::AttachedMulti - { - // If the shard is attached as secondary, we need to retry if 404. - retry_if_not_attached = true; - } - // If the shard is attached as primary, we should succeed. - } else { - // Location conf is not available yet, retry if 404. - retry_if_not_attached = true; - } - } else { - // The shard is not attached to the intended pageserver yet, retry if 404. - retry_if_not_attached = true; - } - } - } - targets - }; - - let res = self + let results = self .tenant_for_shards_api( - targets, + attach_state.for_api_call(), |tenant_shard_id, client| async move { client .timeline_lease_lsn(tenant_shard_id, timeline_id, lsn) @@ -4820,31 +4883,13 @@ impl Service { ) .await; + let leases = self.process_result_and_passthrough_errors(results, attach_state)?; let mut valid_until = None; - for (node, r) in res { - match r { - Ok(lease) => { - if let Some(ref mut valid_until) = valid_until { - *valid_until = std::cmp::min(*valid_until, lease.valid_until); - } else { - valid_until = Some(lease.valid_until); - } - } - Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _)) - if retry_if_not_attached => - { - // This is expected if the attach is not finished yet. Return 503 so that the client can retry. - return Err(ApiError::ResourceUnavailable( - format!( - "Timeline is not attached to the pageserver {} yet, please retry", - node.get_id() - ) - .into(), - )); - } - Err(e) => { - return Err(passthrough_api_error(&node, e)); - } + for (_, lease) in leases { + if let Some(ref mut valid_until) = valid_until { + *valid_until = std::cmp::min(*valid_until, lease.valid_until); + } else { + valid_until = Some(lease.valid_until); } } Ok(LsnLease { @@ -5267,10 +5312,12 @@ impl Service { status_code } /// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0. + /// + /// Returns the node, tenant shard id, and whether it is consistent with the observed state. pub(crate) async fn tenant_shard0_node( &self, tenant_id: TenantId, - ) -> Result<(Node, TenantShardId), ApiError> { + ) -> Result<(Node, TenantShardId, bool), ApiError> { let tenant_shard_id = { let locked = self.inner.read().unwrap(); let Some((tenant_shard_id, _shard)) = locked @@ -5288,15 +5335,17 @@ impl Service { self.tenant_shard_node(tenant_shard_id) .await - .map(|node| (node, tenant_shard_id)) + .map(|(node, consistent)| (node, tenant_shard_id, consistent)) } /// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this /// function looks up and returns node. If the shard isn't found, returns Err(ApiError::NotFound) + /// + /// Returns the intent node and whether it is consistent with the observed state. pub(crate) async fn tenant_shard_node( &self, tenant_shard_id: TenantShardId, - ) -> Result { + ) -> Result<(Node, bool), ApiError> { // Look up in-memory state and maybe use the node from there. { let locked = self.inner.read().unwrap(); @@ -5326,7 +5375,8 @@ impl Service { "Shard refers to nonexistent node" ))); }; - return Ok(node.clone()); + let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id); + return Ok((node.clone(), consistent)); } }; @@ -5360,8 +5410,8 @@ impl Service { "Shard refers to nonexistent node" ))); }; - - Ok(node.clone()) + // As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent. + Ok((node.clone(), false)) } pub(crate) fn tenant_locate(