Compare commits

...

4 Commits

Author SHA1 Message Date
Jan Christian Grünhage
f5c1f749db nomerge: simulate run-kind=storage-rc-pr 2025-02-28 11:24:45 +01:00
Alexey Masterov
04a33e8af0 Change TEST_EXTENSIONS_TAG variable 2025-02-28 11:13:15 +01:00
Alexey Masterov
71f0235600 TEst extensions upgrade should work correctly if some neon images of compute tag are not accessible 2025-02-28 11:09:06 +01:00
John Spray
55633ebe3a storcon: enable API passthrough to nonzero shards (#11026)
## Problem

Storage controller will proxy GETs to pageserver-like tenant/timeline
paths through to the pageserver.

Usually GET passthroughs make sense to go to shard 0, e.g. if you want
to list timelines.

But sometimes you really want to know about a particular shard, e.g.
reading its cache state or similar.

## Summary of changes

- Accept shard IDs as well as tenant IDs in the passthrough route
- Refactor node lookup to take a shard ID and make the tenant ID case a
layer on top of that. This is one more lock take-drop during these
requests, but it's not particularly expensive and these requests
shouldn't be terribly frequent

This is not immediately used by anything, but will be there any time we
want to e.g. do a pass-through query to check the warmth of a tenant
cache on a particular shard or somesuch.
2025-02-28 08:42:08 +00:00
5 changed files with 61 additions and 22 deletions

View File

@@ -46,7 +46,7 @@ jobs:
env:
RUN_KIND: >-
${{
false
'storage-rc-pr'
|| (inputs.github-event-name == 'push' && github.ref_name == 'main') && 'push-main'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'

View File

@@ -831,7 +831,7 @@ jobs:
|| needs.meta.outputs.run-kind == 'pr' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'compute-rc-pr' && needs.meta.outputs.previous-storage-release
}}
TEST_EXTENSIONS_TAG: latest
TEST_EXTENSIONS_TAG: ${{ needs.meta.outputs.previous-compute-release }}
NEW_COMPUTE_TAG: ${{ needs.meta.outputs.build-tag }}
OLD_COMPUTE_TAG: ${{ needs.meta.outputs.previous-compute-release }}
run: ./docker-compose/test_extensions_upgrade.sh

View File

@@ -58,7 +58,7 @@ function check_timeline() {
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TAG=${OLD_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}
@@ -82,7 +82,7 @@ EXTENSIONS='[
{"extname": "pg_repack", "extdir": "pg_repack-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
COMPUTE_TAG=${NEW_COMPUTE_TAG} TEST_EXTENSIONS_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
@@ -90,7 +90,7 @@ create_extensions "${EXTNAMES}"
query="select json_object_agg(extname,extversion) from pg_extension where extname in ('${EXTNAMES// /\',\'}')"
new_vers=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
docker compose --profile test-extensions down
TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
COMPUTE_TAG=${OLD_COMPUTE_TAG} TEST_EXTENSIONS_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"

View File

@@ -547,7 +547,7 @@ async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
let req = match maybe_forward(req).await {
@@ -562,15 +562,28 @@ async fn handle_tenant_timeline_passthrough(
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
};
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
tracing::info!(
"Proxying request for tenant {} ({})",
tenant_or_shard_id.tenant_id,
path
);
// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
let (node, tenant_shard_id) = if tenant_or_shard_id.is_unsharded() {
service
.tenant_shard0_node(tenant_or_shard_id.tenant_id)
.await?
} else {
(
service.tenant_shard_node(tenant_or_shard_id).await?,
tenant_or_shard_id,
)
};
// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
let path = format!("{}", path);
let tenant_str = tenant_id.to_string();
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
let tenant_shard_str = format!("{}", tenant_shard_id);
let path = path.replace(&tenant_str, &tenant_shard_str);
@@ -610,7 +623,7 @@ async fn handle_tenant_timeline_passthrough(
// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND {
// Look up node again: if we migrated it will be different
let (new_node, _tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
let new_node = service.tenant_shard_node(tenant_shard_id).await?;
if new_node.get_id() != node.get_id() {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.

View File

@@ -4158,16 +4158,14 @@ impl Service {
}).await?
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
/// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0.
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId), ApiError> {
// Look up in-memory state and maybe use the node from there.
{
let tenant_shard_id = {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
let Some((tenant_shard_id, _shard)) = locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
@@ -4177,6 +4175,29 @@ impl Service {
));
};
*tenant_shard_id
};
self.tenant_shard_node(tenant_shard_id)
.await
.map(|node| (node, tenant_shard_id))
}
/// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this
/// function looks up and returns node. If the shard isn't found, returns Err(ApiError::NotFound)
pub(crate) async fn tenant_shard_node(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Node, ApiError> {
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
let Some(shard) = locked.tenants.get(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard {tenant_shard_id} not found").into(),
));
};
let Some(intent_node_id) = shard.intent.get_attached() else {
tracing::warn!(
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
@@ -4197,7 +4218,7 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
return Ok((node.clone(), *tenant_shard_id));
return Ok(node.clone());
}
};
@@ -4205,29 +4226,34 @@ impl Service {
// generation state: this will reflect the progress of any ongoing migration.
// Note that it is not guaranteed to _stay_ here, our caller must still handle
// the case where they call through to the pageserver and get a 404.
let db_result = self.persistence.tenant_generations(tenant_id).await?;
let db_result = self
.persistence
.tenant_generations(tenant_shard_id.tenant_id)
.await?;
let Some(ShardGenerationState {
tenant_shard_id,
tenant_shard_id: _,
generation: _,
generation_pageserver: Some(node_id),
}) = db_result.first()
}) = db_result
.into_iter()
.find(|s| s.tenant_shard_id == tenant_shard_id)
else {
// This can happen if we raced with a tenant deletion or a shard split. On a retry
// the caller will either succeed (shard split case), get a proper 404 (deletion case),
// or a conflict response (case where tenant was detached in background)
return Err(ApiError::ResourceUnavailable(
"Shard {} not found in database, or is not attached".into(),
format!("Shard {tenant_shard_id} not found in database, or is not attached").into(),
));
};
let locked = self.inner.read().unwrap();
let Some(node) = locked.nodes.get(node_id) else {
let Some(node) = locked.nodes.get(&node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard refers to nonexistent node"
)));
};
Ok((node.clone(), *tenant_shard_id))
Ok(node.clone())
}
pub(crate) fn tenant_locate(