mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
fix(storcon): correctly converts 404 for tenant passthrough requests (#12631)
## Problem Follow up of https://github.com/neondatabase/neon/pull/12620 Discussions: https://databricks.slack.com/archives/C09254R641L/p1752677940697529 The original code and after the patch above we converts 404s to 503s regardless of the type of 404. We should only do that for tenant not found errors. For other 404s like timeline not found, we should not prompt clients to retry. ## Summary of changes - Inspect the response body to figure out the type of 404. If it's a tenant not found error, return 503. - Otherwise, fallthrough and return 404 as-is. - Add `tenant_shard_remote_mutation` that manipulates a single shard. - Use `Service::tenant_shard_remote_mutation` for tenant shard passthrough requests. This prevents us from another race that the attach state changes within the request. (This patch mainly addresses the case that the tenant is "not yet attached"). - TODO: lease API is still using the old code path. We should refactor it to use `tenant_remote_mutation`. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -48,7 +48,10 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::persistence::SafekeeperUpsert;
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
|
||||
use crate::service::{
|
||||
LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service,
|
||||
TenantMutationLocations,
|
||||
};
|
||||
|
||||
/// State available to HTTP request handlers
|
||||
pub struct HttpState {
|
||||
@@ -734,77 +737,104 @@ async fn handle_tenant_timeline_passthrough(
|
||||
path
|
||||
);
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() {
|
||||
service
|
||||
let tenant_shard_id = if tenant_or_shard_id.is_unsharded() {
|
||||
// If the request contains only tenant ID, find the node that holds shard zero
|
||||
let (_, shard_id) = service
|
||||
.tenant_shard0_node(tenant_or_shard_id.tenant_id)
|
||||
.await?
|
||||
.await?;
|
||||
shard_id
|
||||
} else {
|
||||
let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?;
|
||||
(node, tenant_or_shard_id, consistent)
|
||||
tenant_or_shard_id
|
||||
};
|
||||
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
let path = format!("{path}");
|
||||
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
|
||||
let tenant_shard_str = format!("{tenant_shard_id}");
|
||||
let path = path.replace(&tenant_str, &tenant_shard_str);
|
||||
let service_inner = service.clone();
|
||||
|
||||
let latency = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_passthrough_request_latency;
|
||||
service.tenant_shard_remote_mutation(tenant_shard_id, |locations| async move {
|
||||
let TenantMutationLocations(locations) = locations;
|
||||
if locations.is_empty() {
|
||||
return Err(ApiError::NotFound(anyhow::anyhow!("Tenant {} not found", tenant_or_shard_id.tenant_id).into()));
|
||||
}
|
||||
|
||||
let path_label = path_without_ids(&path)
|
||||
.split('/')
|
||||
.filter(|token| !token.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
.join("_");
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: &node.get_id().to_string(),
|
||||
path: &path_label,
|
||||
method: crate::metrics::Method::Get,
|
||||
};
|
||||
let (tenant_or_shard_id, locations) = locations.into_iter().next().unwrap();
|
||||
let node = locations.latest.node;
|
||||
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
let path = format!("{path}");
|
||||
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
|
||||
let tenant_shard_str = format!("{tenant_shard_id}");
|
||||
let path = path.replace(&tenant_str, &tenant_shard_str);
|
||||
|
||||
let client = mgmt_api::Client::new(
|
||||
service.get_http_client().clone(),
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.op_raw(method, path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let error_counter = &METRICS_REGISTRY
|
||||
let latency = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_passthrough_request_error;
|
||||
error_counter.inc(labels);
|
||||
}
|
||||
.storage_controller_passthrough_request_latency;
|
||||
|
||||
// Transform 404 into 503 if we raced with a migration
|
||||
if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent {
|
||||
// Rather than retry here, send the client a 503 to prompt a retry: this matches
|
||||
// the pageserver's use of 503, and all clients calling this API should retry on 503.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(),
|
||||
));
|
||||
}
|
||||
let path_label = path_without_ids(&path)
|
||||
.split('/')
|
||||
.filter(|token| !token.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
.join("_");
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: &node.get_id().to_string(),
|
||||
path: &path_label,
|
||||
method: crate::metrics::Method::Get,
|
||||
};
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
|
||||
for (k, v) in resp.headers() {
|
||||
builder = builder.header(k.as_str(), v.as_bytes());
|
||||
}
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let response = builder
|
||||
.body(Body::wrap_stream(resp.bytes_stream()))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
let client = mgmt_api::Client::new(
|
||||
service_inner.get_http_client().clone(),
|
||||
node.base_url(),
|
||||
service_inner.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.op_raw(method, path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
|
||||
|
||||
Ok(response)
|
||||
if !resp.status().is_success() {
|
||||
let error_counter = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_passthrough_request_error;
|
||||
error_counter.inc(labels);
|
||||
}
|
||||
let resp_staus = resp.status();
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp_staus)?);
|
||||
for (k, v) in resp.headers() {
|
||||
builder = builder.header(k.as_str(), v.as_bytes());
|
||||
}
|
||||
let resp_bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
// Inspect 404 errors: at this point, we know that the tenant exists, but the pageserver we route
|
||||
// the request to might not yet be ready. Therefore, if it is a _tenant_ not found error, we can
|
||||
// convert it into a 503. TODO: we should make this part of the check in `tenant_shard_remote_mutation`.
|
||||
// However, `tenant_shard_remote_mutation` currently cannot inspect the HTTP error response body,
|
||||
// so we have to do it here instead.
|
||||
if resp_staus == reqwest::StatusCode::NOT_FOUND {
|
||||
let resp_str = std::str::from_utf8(&resp_bytes)
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
// We only handle "tenant not found" errors; other 404s like timeline not found should
|
||||
// be forwarded as-is.
|
||||
if resp_str.contains(&format!("tenant {tenant_or_shard_id}")) {
|
||||
// Rather than retry here, send the client a 503 to prompt a retry: this matches
|
||||
// the pageserver's use of 503, and all clients calling this API should retry on 503.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!(
|
||||
"Pageserver {node} returned tenant 404 due to ongoing migration, retry later"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
let response = builder
|
||||
.body(Body::from(resp_bytes))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
Ok(response)
|
||||
}).await?
|
||||
}
|
||||
|
||||
async fn handle_tenant_locate(
|
||||
|
||||
@@ -719,19 +719,19 @@ pub(crate) enum ReconcileResultRequest {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MutationLocation {
|
||||
node: Node,
|
||||
generation: Generation,
|
||||
pub(crate) struct MutationLocation {
|
||||
pub(crate) node: Node,
|
||||
pub(crate) generation: Generation,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ShardMutationLocations {
|
||||
latest: MutationLocation,
|
||||
other: Vec<MutationLocation>,
|
||||
pub(crate) struct ShardMutationLocations {
|
||||
pub(crate) latest: MutationLocation,
|
||||
pub(crate) other: Vec<MutationLocation>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
|
||||
pub(crate) struct TenantMutationLocations(pub BTreeMap<TenantShardId, ShardMutationLocations>);
|
||||
|
||||
struct ReconcileAllResult {
|
||||
spawned_reconciles: usize,
|
||||
@@ -763,6 +763,29 @@ impl ReconcileAllResult {
|
||||
}
|
||||
}
|
||||
|
||||
enum TenantIdOrShardId {
|
||||
TenantId(TenantId),
|
||||
TenantShardId(TenantShardId),
|
||||
}
|
||||
|
||||
impl TenantIdOrShardId {
|
||||
fn tenant_id(&self) -> TenantId {
|
||||
match self {
|
||||
TenantIdOrShardId::TenantId(tenant_id) => *tenant_id,
|
||||
TenantIdOrShardId::TenantShardId(tenant_shard_id) => tenant_shard_id.tenant_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn matches(&self, tenant_shard_id: &TenantShardId) -> bool {
|
||||
match self {
|
||||
TenantIdOrShardId::TenantId(tenant_id) => tenant_shard_id.tenant_id == *tenant_id,
|
||||
TenantIdOrShardId::TenantShardId(this_tenant_shard_id) => {
|
||||
this_tenant_shard_id == tenant_shard_id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub fn get_config(&self) -> &Config {
|
||||
&self.config
|
||||
@@ -4814,6 +4837,12 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(TenantShardAttachState {
|
||||
targets,
|
||||
by_node_id,
|
||||
@@ -5040,11 +5069,37 @@ impl Service {
|
||||
/// - Looks up the shards and the nodes where they were most recently attached
|
||||
/// - Guarantees that after the inner function returns, the shards' generations haven't moved on: this
|
||||
/// ensures that the remote operation acted on the most recent generation, and is therefore durable.
|
||||
async fn tenant_remote_mutation<R, O, F>(
|
||||
pub(crate) async fn tenant_remote_mutation<R, O, F>(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
op: O,
|
||||
) -> Result<R, ApiError>
|
||||
where
|
||||
O: FnOnce(TenantMutationLocations) -> F,
|
||||
F: std::future::Future<Output = R>,
|
||||
{
|
||||
self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantId(tenant_id), op)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_remote_mutation<R, O, F>(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
op: O,
|
||||
) -> Result<R, ApiError>
|
||||
where
|
||||
O: FnOnce(TenantMutationLocations) -> F,
|
||||
F: std::future::Future<Output = R>,
|
||||
{
|
||||
self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantShardId(tenant_shard_id), op)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn tenant_remote_mutation_inner<R, O, F>(
|
||||
&self,
|
||||
tenant_id_or_shard_id: TenantIdOrShardId,
|
||||
op: O,
|
||||
) -> Result<R, ApiError>
|
||||
where
|
||||
O: FnOnce(TenantMutationLocations) -> F,
|
||||
F: std::future::Future<Output = R>,
|
||||
@@ -5056,7 +5111,13 @@ impl Service {
|
||||
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
|
||||
// will still be the latest when we're done: we will check generations again at the end of
|
||||
// this function to handle that.
|
||||
let generations = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let generations = self
|
||||
.persistence
|
||||
.tenant_generations(tenant_id_or_shard_id.tenant_id())
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if generations
|
||||
.iter()
|
||||
@@ -5070,9 +5131,14 @@ impl Service {
|
||||
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
|
||||
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
|
||||
let locked = self.inner.read().unwrap();
|
||||
for (shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let tenant_shards = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(
|
||||
tenant_id_or_shard_id.tenant_id(),
|
||||
))
|
||||
.filter(|(shard_id, _)| tenant_id_or_shard_id.matches(shard_id))
|
||||
.collect::<Vec<_>>();
|
||||
for (shard_id, shard) in tenant_shards {
|
||||
match shard.policy {
|
||||
PlacementPolicy::Attached(_) => {
|
||||
// This shard is meant to be attached: the caller is not wrong to try and
|
||||
@@ -5182,7 +5248,14 @@ impl Service {
|
||||
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
|
||||
// our remote operation executed on the latest generation and is therefore persistent.
|
||||
{
|
||||
let latest_generations = self.persistence.tenant_generations(tenant_id).await?;
|
||||
let latest_generations = self
|
||||
.persistence
|
||||
.tenant_generations(tenant_id_or_shard_id.tenant_id())
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if latest_generations
|
||||
.into_iter()
|
||||
.map(
|
||||
@@ -5316,7 +5389,7 @@ impl Service {
|
||||
pub(crate) async fn tenant_shard0_node(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(Node, TenantShardId, bool), ApiError> {
|
||||
) -> Result<(Node, TenantShardId), ApiError> {
|
||||
let tenant_shard_id = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, _shard)) = locked
|
||||
@@ -5334,7 +5407,7 @@ impl Service {
|
||||
|
||||
self.tenant_shard_node(tenant_shard_id)
|
||||
.await
|
||||
.map(|(node, consistent)| (node, tenant_shard_id, consistent))
|
||||
.map(|node| (node, tenant_shard_id))
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this
|
||||
@@ -5344,7 +5417,7 @@ impl Service {
|
||||
pub(crate) async fn tenant_shard_node(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(Node, bool), ApiError> {
|
||||
) -> Result<Node, ApiError> {
|
||||
// Look up in-memory state and maybe use the node from there.
|
||||
{
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -5374,8 +5447,7 @@ impl Service {
|
||||
"Shard refers to nonexistent node"
|
||||
)));
|
||||
};
|
||||
let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id);
|
||||
return Ok((node.clone(), consistent));
|
||||
return Ok(node.clone());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -5410,7 +5482,7 @@ impl Service {
|
||||
)));
|
||||
};
|
||||
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
|
||||
Ok((node.clone(), false))
|
||||
Ok(node.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_locate(
|
||||
|
||||
Reference in New Issue
Block a user