fix(storcon): passthrough 404 as 503 during migrations (#12620)

## Problem

close LKB-270, close LKB-253

We periodically saw pageserver returns 404 -> storcon converts it to 500
to cplane, and causing branch operations fail. This is due to storcon is
migrating tenants across pageservers and the request was forwarded from
the storcon to pageservers while the tenant was not attached yet. Such
operations should be retried from cplane and storcon should return 503
in such cases.

## Summary of changes

- Refactor `tenant_timeline_lsn_lease` to have a single function process
and passthrough such requests: `collect_tenant_shards` for collecting
all shards and checking if they're consistent with the observed state,
`process_result_and_passthrough_errors` to convert 404 into 503 if
necessary.
- `tenant_shard_node` also checks observed state now.

Note that for passthrough shard0, we originally had a check to convert
404 to 503:

```
    // Transform 404 into 503 if we raced with a migration
    if resp.status() == reqwest::StatusCode::NOT_FOUND {
        // Look up node again: if we migrated it will be different
        let new_node = service.tenant_shard_node(tenant_shard_id).await?;
        if new_node.get_id() != node.get_id() {
            // Rather than retry here, send the client a 503 to prompt a retry: this matches
            // the pageserver's use of 503, and all clients calling this API should retry on 503.
            return Err(ApiError::ResourceUnavailable(
                format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
            ));
        }
    }
```

However, this only checks the intent state. It is possible that the
migration is in progress before/after the request is processed and
intent state is always the same throughout the API call, therefore 404
not being processed by this branch.

Also, not sure about if this new code is correct or not, need second
eyes on that:

```
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
Ok((node.clone(), false))
```

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-07-16 11:51:20 -04:00
committed by GitHub
parent 1178f6fe7c
commit 80e5771c67
2 changed files with 141 additions and 97 deletions

View File

@@ -735,15 +735,13 @@ async fn handle_tenant_timeline_passthrough(
);
// Find the node that holds shard zero
let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() {
service
.tenant_shard0_node(tenant_or_shard_id.tenant_id)
.await?
} else {
(
service.tenant_shard_node(tenant_or_shard_id).await?,
tenant_or_shard_id,
)
let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?;
(node, tenant_or_shard_id, consistent)
};
// Callers will always pass an unsharded tenant ID. Before proxying, we must
@@ -788,16 +786,12 @@ async fn handle_tenant_timeline_passthrough(
}
// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND {
// Look up node again: if we migrated it will be different
let new_node = service.tenant_shard_node(tenant_shard_id).await?;
if new_node.get_id() != node.get_id() {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404, was migrated to {new_node}").into(),
));
}
if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(),
));
}
// We have a reqest::Response, would like a http::Response
@@ -2597,6 +2591,17 @@ pub fn make_router(
)
},
)
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
)
},
)
// Tenant detail GET passthrough to shard zero:
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(
@@ -2615,17 +2620,6 @@ pub fn make_router(
RequestName("v1_tenant_passthrough"),
)
})
// Tenant timeline mark_invisible passthrough to shard zero
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/mark_invisible",
|r| {
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_timeline_mark_invisible_passthrough"),
)
},
)
}
#[cfg(test)]

View File

@@ -207,6 +207,27 @@ enum ShardGenerationValidity {
},
}
/// We collect the state of attachments for some operations to determine if the operation
/// needs to be retried when it fails.
struct TenantShardAttachState {
/// The targets of the operation.
///
/// Tenant shard ID, node ID, node, is intent node observed primary.
targets: Vec<(TenantShardId, NodeId, Node, bool)>,
/// The targets grouped by node ID.
by_node_id: HashMap<NodeId, (TenantShardId, Node, bool)>,
}
impl TenantShardAttachState {
fn for_api_call(&self) -> Vec<(TenantShardId, Node)> {
self.targets
.iter()
.map(|(tenant_shard_id, _, node, _)| (*tenant_shard_id, node.clone()))
.collect()
}
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
@@ -4752,6 +4773,86 @@ impl Service {
Ok(())
}
fn is_observed_consistent_with_intent(
&self,
shard: &TenantShard,
intent_node_id: NodeId,
) -> bool {
if let Some(location) = shard.observed.locations.get(&intent_node_id)
&& let Some(ref conf) = location.conf
&& (conf.mode == LocationConfigMode::AttachedSingle
|| conf.mode == LocationConfigMode::AttachedMulti)
{
true
} else {
false
}
}
fn collect_tenant_shards(
&self,
tenant_id: TenantId,
) -> Result<TenantShardAttachState, ApiError> {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
let mut by_node_id = HashMap::new();
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
let consistent = self.is_observed_consistent_with_intent(shard, *node_id);
targets.push((*tenant_shard_id, *node_id, node.clone(), consistent));
by_node_id.insert(*node_id, (*tenant_shard_id, node.clone(), consistent));
}
}
Ok(TenantShardAttachState {
targets,
by_node_id,
})
}
fn process_result_and_passthrough_errors<T>(
&self,
results: Vec<(Node, Result<T, mgmt_api::Error>)>,
attach_state: TenantShardAttachState,
) -> Result<Vec<(Node, T)>, ApiError> {
let mut processed_results: Vec<(Node, T)> = Vec::with_capacity(results.len());
debug_assert_eq!(results.len(), attach_state.targets.len());
for (node, res) in results {
let is_consistent = attach_state
.by_node_id
.get(&node.get_id())
.map(|(_, _, consistent)| *consistent);
match res {
Ok(res) => processed_results.push((node, res)),
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if is_consistent == Some(false) =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
node.get_id()
)
.into(),
));
}
Err(e) => return Err(passthrough_api_error(&node, e)),
}
}
Ok(processed_results)
}
pub(crate) async fn tenant_timeline_lsn_lease(
&self,
tenant_id: TenantId,
@@ -4765,49 +4866,11 @@ impl Service {
)
.await;
let mut retry_if_not_attached = false;
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
let attach_state = self.collect_tenant_shards(tenant_id)?;
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
if let Some(location) = shard.observed.locations.get(node_id) {
if let Some(ref conf) = location.conf {
if conf.mode != LocationConfigMode::AttachedSingle
&& conf.mode != LocationConfigMode::AttachedMulti
{
// If the shard is attached as secondary, we need to retry if 404.
retry_if_not_attached = true;
}
// If the shard is attached as primary, we should succeed.
} else {
// Location conf is not available yet, retry if 404.
retry_if_not_attached = true;
}
} else {
// The shard is not attached to the intended pageserver yet, retry if 404.
retry_if_not_attached = true;
}
}
}
targets
};
let res = self
let results = self
.tenant_for_shards_api(
targets,
attach_state.for_api_call(),
|tenant_shard_id, client| async move {
client
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
@@ -4820,31 +4883,13 @@ impl Service {
)
.await;
let leases = self.process_result_and_passthrough_errors(results, attach_state)?;
let mut valid_until = None;
for (node, r) in res {
match r {
Ok(lease) => {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if retry_if_not_attached =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
node.get_id()
)
.into(),
));
}
Err(e) => {
return Err(passthrough_api_error(&node, e));
}
for (_, lease) in leases {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Ok(LsnLease {
@@ -5267,10 +5312,12 @@ impl Service {
status_code
}
/// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0.
///
/// Returns the node, tenant shard id, and whether it is consistent with the observed state.
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId), ApiError> {
) -> Result<(Node, TenantShardId, bool), ApiError> {
let tenant_shard_id = {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, _shard)) = locked
@@ -5288,15 +5335,17 @@ impl Service {
self.tenant_shard_node(tenant_shard_id)
.await
.map(|node| (node, tenant_shard_id))
.map(|(node, consistent)| (node, tenant_shard_id, consistent))
}
/// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this
/// function looks up and returns node. If the shard isn't found, returns Err(ApiError::NotFound)
///
/// Returns the intent node and whether it is consistent with the observed state.
pub(crate) async fn tenant_shard_node(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Node, ApiError> {
) -> Result<(Node, bool), ApiError> {
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
@@ -5326,7 +5375,8 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
return Ok(node.clone());
let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id);
return Ok((node.clone(), consistent));
}
};
@@ -5360,8 +5410,8 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
Ok(node.clone())
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
Ok((node.clone(), false))
}
pub(crate) fn tenant_locate(