diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index a6638f5191..1745bf5575 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -1,10 +1,11 @@
+use crate::http;
use crate::metrics::{
HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
METRICS_REGISTRY,
};
use crate::persistence::SafekeeperPersistence;
use crate::reconciler::ReconcileError;
-use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
+use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use hyper::header::CONTENT_TYPE;
@@ -22,6 +23,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::{mgmt_api, BlockUnblock};
+use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
@@ -87,9 +89,16 @@ fn get_state(request: &Request
) -> &HttpState {
}
/// Pageserver calls into this on startup, to learn which tenants it should attach
-async fn handle_re_attach(mut req: Request) -> Result, ApiError> {
+async fn handle_re_attach(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let reattach_req = json_request::(&mut req).await?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.re_attach(reattach_req).await?)
@@ -97,9 +106,16 @@ async fn handle_re_attach(mut req: Request) -> Result, ApiE
/// Pageserver calls into this before doing deletions, to confirm that it still
/// holds the latest generation for the tenants with deletions enqueued
-async fn handle_validate(mut req: Request) -> Result, ApiError> {
+async fn handle_validate(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let validate_req = json_request::(&mut req).await?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
@@ -108,9 +124,16 @@ async fn handle_validate(mut req: Request) -> Result, ApiEr
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).
-async fn handle_attach_hook(mut req: Request) -> Result, ApiError> {
+async fn handle_attach_hook(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let attach_req = json_request::(&mut req).await?;
let state = get_state(&req);
@@ -124,9 +147,16 @@ async fn handle_attach_hook(mut req: Request) -> Result, Ap
)
}
-async fn handle_inspect(mut req: Request) -> Result, ApiError> {
+async fn handle_inspect(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let inspect_req = json_request::(&mut req).await?;
let state = get_state(&req);
@@ -136,10 +166,17 @@ async fn handle_inspect(mut req: Request) -> Result, ApiErr
async fn handle_tenant_create(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
check_permissions(&req, Scope::PageServerApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let create_req = json_request::(&mut req).await?;
json_response(
@@ -150,11 +187,18 @@ async fn handle_tenant_create(
async fn handle_tenant_location_config(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let config_req = json_request::(&mut req).await?;
json_response(
StatusCode::OK,
@@ -166,10 +210,17 @@ async fn handle_tenant_location_config(
async fn handle_tenant_config_set(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
check_permissions(&req, Scope::PageServerApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let config_req = json_request::(&mut req).await?;
json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
@@ -182,16 +233,30 @@ async fn handle_tenant_config_get(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
}
async fn handle_tenant_time_travel_remote_storage(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let time_travel_req = json_request::(&mut req).await?;
let timestamp_raw = must_get_query_param(&req, "travel_to")?;
@@ -232,6 +297,13 @@ async fn handle_tenant_secondary_download(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
json_response(map_reqwest_hyper_status(status)?, progress)
}
@@ -243,6 +315,13 @@ async fn handle_tenant_delete(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
let status_code = service
.tenant_delete(tenant_id)
.await
@@ -258,11 +337,18 @@ async fn handle_tenant_delete(
async fn handle_tenant_timeline_create(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let create_req = json_request::(&mut req).await?;
json_response(
StatusCode::CREATED,
@@ -277,9 +363,16 @@ async fn handle_tenant_timeline_delete(
req: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+
check_permissions(&req, Scope::PageServerApi)?;
- let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
// For timeline deletions, which both implement an "initially return 202, then 404 once
// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
@@ -337,12 +430,19 @@ async fn handle_tenant_timeline_delete(
async fn handle_tenant_timeline_archival_config(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+
check_permissions(&req, Scope::PageServerApi)?;
- let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
let create_req = json_request::(&mut req).await?;
@@ -358,9 +458,16 @@ async fn handle_tenant_timeline_detach_ancestor(
req: Request,
) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+
check_permissions(&req, Scope::PageServerApi)?;
- let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
let res = service
.tenant_timeline_detach_ancestor(tenant_id, timeline_id)
@@ -393,6 +500,13 @@ async fn handle_tenant_timeline_passthrough(
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let Some(path) = req.uri().path_and_query() else {
// This should never happen, our request router only calls us if there is a path
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
@@ -460,9 +574,17 @@ async fn handle_tenant_locate(
service: Arc,
req: Request,
) -> Result, ApiError> {
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+
check_permissions(&req, Scope::Admin)?;
- let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
json_response(StatusCode::OK, service.tenant_locate(tenant_id)?)
}
@@ -473,6 +595,14 @@ async fn handle_tenant_describe(
check_permissions(&req, Scope::Scrubber)?;
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
}
@@ -482,12 +612,26 @@ async fn handle_tenant_list(
) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
json_response(StatusCode::OK, service.tenant_list())
}
-async fn handle_node_register(mut req: Request) -> Result, ApiError> {
+async fn handle_node_register(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let register_req = json_request::(&mut req).await?;
let state = get_state(&req);
state.service.node_register(register_req).await?;
@@ -497,6 +641,13 @@ async fn handle_node_register(mut req: Request) -> Result,
async fn handle_node_list(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let nodes = state.service.node_list().await?;
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::>();
@@ -507,6 +658,13 @@ async fn handle_node_list(req: Request) -> Result, ApiError
async fn handle_node_drop(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(StatusCode::OK, state.service.node_drop(node_id).await?)
@@ -515,14 +673,28 @@ async fn handle_node_drop(req: Request) -> Result, ApiError
async fn handle_node_delete(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
json_response(StatusCode::OK, state.service.node_delete(node_id).await?)
}
-async fn handle_node_configure(mut req: Request) -> Result, ApiError> {
+async fn handle_node_configure(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let config_req = json_request::(&mut req).await?;
if node_id != config_req.node_id {
@@ -548,6 +720,13 @@ async fn handle_node_configure(mut req: Request) -> Result,
async fn handle_node_status(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
@@ -570,6 +749,13 @@ async fn handle_node_shards(req: Request) -> Result, ApiErr
async fn handle_get_leader(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let leader = state.service.get_leader().await.map_err(|err| {
ApiError::InternalServerError(anyhow::anyhow!(
@@ -583,6 +769,13 @@ async fn handle_get_leader(req: Request) -> Result, ApiErro
async fn handle_node_drain(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
@@ -594,6 +787,13 @@ async fn handle_node_drain(req: Request) -> Result, ApiErro
async fn handle_cancel_node_drain(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
@@ -605,6 +805,13 @@ async fn handle_cancel_node_drain(req: Request) -> Result,
async fn handle_node_fill(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
@@ -616,6 +823,13 @@ async fn handle_node_fill(req: Request) -> Result, ApiError
async fn handle_cancel_node_fill(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
@@ -624,9 +838,16 @@ async fn handle_cancel_node_fill(req: Request) -> Result, A
json_response(StatusCode::ACCEPTED, ())
}
-async fn handle_metadata_health_update(mut req: Request) -> Result, ApiError> {
+async fn handle_metadata_health_update(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Scrubber)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let update_req = json_request::(&mut req).await?;
let state = get_state(&req);
@@ -640,6 +861,13 @@ async fn handle_metadata_health_list_unhealthy(
) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
@@ -652,10 +880,17 @@ async fn handle_metadata_health_list_unhealthy(
}
async fn handle_metadata_health_list_outdated(
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let list_outdated_req = json_request::(&mut req).await?;
let state = get_state(&req);
let health_records = state
@@ -671,10 +906,17 @@ async fn handle_metadata_health_list_outdated(
async fn handle_tenant_shard_split(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let split_req = json_request::(&mut req).await?;
@@ -686,10 +928,17 @@ async fn handle_tenant_shard_split(
async fn handle_tenant_shard_migrate(
service: Arc,
- mut req: Request,
+ req: Request,
) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
let migrate_req = json_request::(&mut req).await?;
json_response(
@@ -700,9 +949,16 @@ async fn handle_tenant_shard_migrate(
)
}
-async fn handle_tenant_update_policy(mut req: Request) -> Result, ApiError> {
+async fn handle_tenant_update_policy(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
let update_req = json_request::(&mut req).await?;
let state = get_state(&req);
@@ -716,9 +972,16 @@ async fn handle_tenant_update_policy(mut req: Request) -> Result) -> Result, ApiError> {
+async fn handle_update_preferred_azs(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let mut req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let azs_req = json_request::(&mut req).await?;
let state = get_state(&req);
@@ -731,23 +994,46 @@ async fn handle_update_preferred_azs(mut req: Request) -> Result) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
json_response(StatusCode::OK, state.service.step_down().await)
}
async fn handle_tenant_drop(req: Request) -> Result, ApiError> {
- let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
json_response(StatusCode::OK, state.service.tenant_drop(tenant_id).await?)
}
async fn handle_tenant_import(req: Request) -> Result, ApiError> {
- let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
json_response(
@@ -759,6 +1045,13 @@ async fn handle_tenant_import(req: Request) -> Result, ApiE
async fn handle_tenants_dump(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
state.service.tenants_dump()
}
@@ -766,6 +1059,13 @@ async fn handle_tenants_dump(req: Request) -> Result, ApiEr
async fn handle_scheduler_dump(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
state.service.scheduler_dump()
}
@@ -773,6 +1073,13 @@ async fn handle_scheduler_dump(req: Request) -> Result, Api
async fn handle_consistency_check(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
json_response(StatusCode::OK, state.service.consistency_check().await?)
@@ -781,19 +1088,40 @@ async fn handle_consistency_check(req: Request) -> Result,
async fn handle_reconcile_all(req: Request) -> Result, ApiError> {
check_permissions(&req, Scope::Admin)?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
}
/// Status endpoint is just used for checking that our HTTP listener is up
-async fn handle_status(_req: Request) -> Result, ApiError> {
+async fn handle_status(req: Request) -> Result, ApiError> {
+ match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(_req) => {}
+ };
+
json_response(StatusCode::OK, ())
}
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
async fn handle_ready(req: Request) -> Result, ApiError> {
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
if state.service.startup_complete.is_ready() {
json_response(StatusCode::OK, ())
@@ -816,6 +1144,13 @@ async fn handle_get_safekeeper(req: Request) -> Result, Api
let id = parse_request_param::(&req, "id")?;
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let res = state.service.get_safekeeper(id).await;
@@ -847,6 +1182,13 @@ async fn handle_upsert_safekeeper(mut req: Request) -> Result {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
state.service.upsert_safekeeper(body).await?;
@@ -925,10 +1267,7 @@ pub fn prologue_leadership_status_check_middleware<
let allowed_routes = match leadership_status {
LeadershipStatus::Leader => AllowedRoutes::All,
- LeadershipStatus::SteppedDown => {
- // TODO: does it make sense to allow /status here?
- AllowedRoutes::Some(["/control/v1/step_down", "/status", "/metrics"].to_vec())
- }
+ LeadershipStatus::SteppedDown => AllowedRoutes::All,
LeadershipStatus::Candidate => {
AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
}
@@ -1005,6 +1344,13 @@ fn epilogue_metrics_middleware
pub async fn measured_metrics_handler(req: Request) -> Result, ApiError> {
pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
+ let req = match maybe_forward(req).await {
+ ForwardOutcome::Forwarded(res) => {
+ return res;
+ }
+ ForwardOutcome::NotForwarded(req) => req,
+ };
+
let state = get_state(&req);
let payload = crate::metrics::METRICS_REGISTRY.encode(&state.neon_metrics);
let response = Response::builder()
@@ -1032,6 +1378,220 @@ where
request_span(request, handler).await
}
+enum ForwardOutcome {
+ Forwarded(Result, ApiError>),
+ NotForwarded(Request),
+}
+
+/// Potentially forward the request to the current storage controler leader.
+/// More specifically we forward when:
+/// 1. Request is not one of ["/control/v1/step_down", "/status", "/ready", "/metrics"]
+/// 2. Current instance is in [`LeadershipStatus::SteppedDown`] state
+/// 3. There is a leader in the database to forward to
+/// 4. Leader from step (3) is not the current instance
+///
+/// Why forward?
+/// It turns out that we can't rely on external orchestration to promptly route trafic to the
+/// new leader. This is downtime inducing. Forwarding provides a safe way out.
+///
+/// Why is it safe?
+/// If a storcon instance is persisted in the database, then we know that it is the current leader.
+/// There's one exception: time between handling step-down request and the new leader updating the
+/// database.
+///
+/// Let's treat the happy case first. The stepped down node does not produce any side effects,
+/// since all request handling happens on the leader.
+///
+/// As for the edge case, we are guaranteed to always have a maximum of two running instances.
+/// Hence, if we are in the edge case scenario the leader persisted in the database is the
+/// stepped down instance that received the request. Condition (4) above covers this scenario.
+async fn maybe_forward(req: Request) -> ForwardOutcome {
+ const NOT_FOR_FORWARD: [&str; 4] = ["/control/v1/step_down", "/status", "/ready", "/metrics"];
+
+ let uri = req.uri().to_string();
+ let uri_for_forward = !NOT_FOR_FORWARD.contains(&uri.as_str());
+
+ let state = get_state(&req);
+ let leadership_status = state.service.get_leadership_status();
+
+ if leadership_status != LeadershipStatus::SteppedDown || !uri_for_forward {
+ return ForwardOutcome::NotForwarded(req);
+ }
+
+ let leader = state.service.get_leader().await;
+ let leader = {
+ match leader {
+ Ok(Some(leader)) => leader,
+ Ok(None) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::ResourceUnavailable(
+ "No leader to forward to while in stepped down state".into(),
+ )));
+ }
+ Err(err) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
+ anyhow::anyhow!(
+ "Failed to get leader for forwarding while in stepped down state: {err}"
+ ),
+ )));
+ }
+ }
+ };
+
+ let cfg = state.service.get_config();
+ if let Some(ref self_addr) = cfg.address_for_peers {
+ let leader_addr = match Uri::from_str(leader.address.as_str()) {
+ Ok(uri) => uri,
+ Err(err) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(
+ anyhow::anyhow!(
+ "Failed to parse leader uri for forwarding while in stepped down state: {err}"
+ ),
+ )));
+ }
+ };
+
+ if *self_addr == leader_addr {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Leader is stepped down instance"
+ ))));
+ }
+ }
+
+ tracing::info!("Forwarding {} to leader at {}", uri, leader.address);
+
+ // Use [`RECONCILE_TIMEOUT`] as the max amount of time a request should block for and
+ // include some leeway to get the timeout for proxied requests.
+ const PROXIED_REQUEST_TIMEOUT: Duration = Duration::from_secs(RECONCILE_TIMEOUT.as_secs() + 10);
+ let client = reqwest::ClientBuilder::new()
+ .timeout(PROXIED_REQUEST_TIMEOUT)
+ .build();
+ let client = match client {
+ Ok(client) => client,
+ Err(err) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Failed to build leader client for forwarding while in stepped down state: {err}"
+ ))));
+ }
+ };
+
+ let request: reqwest::Request = match convert_request(req, &client, leader.address).await {
+ Ok(r) => r,
+ Err(err) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Failed to convert request for forwarding while in stepped down state: {err}"
+ ))));
+ }
+ };
+
+ let response = match client.execute(request).await {
+ Ok(r) => r,
+ Err(err) => {
+ return ForwardOutcome::Forwarded(Err(ApiError::InternalServerError(anyhow::anyhow!(
+ "Failed to forward while in stepped down state: {err}"
+ ))));
+ }
+ };
+
+ ForwardOutcome::Forwarded(convert_response(response).await)
+}
+
+/// Convert a [`reqwest::Response`] to a [hyper::Response`] by passing through
+/// a stable representation (string, bytes or integer)
+///
+/// Ideally, we would not have to do this since both types use the http crate
+/// under the hood. However, they use different versions of the crate and keeping
+/// second order dependencies in sync is difficult.
+async fn convert_response(resp: reqwest::Response) -> Result, ApiError> {
+ use std::str::FromStr;
+
+ let mut builder = hyper::Response::builder().status(resp.status().as_u16());
+ for (key, value) in resp.headers().into_iter() {
+ let key = hyper::header::HeaderName::from_str(key.as_str()).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
+ })?;
+
+ let value = hyper::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
+ })?;
+
+ builder = builder.header(key, value);
+ }
+
+ let body = http::Body::wrap_stream(resp.bytes_stream());
+
+ builder.body(body).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Response conversion failed: {err}"))
+ })
+}
+
+/// Convert a [`reqwest::Request`] to a [hyper::Request`] by passing through
+/// a stable representation (string, bytes or integer)
+///
+/// See [`convert_response`] for why we are doing it this way.
+async fn convert_request(
+ req: hyper::Request,
+ client: &reqwest::Client,
+ to_address: String,
+) -> Result {
+ use std::str::FromStr;
+
+ let (parts, body) = req.into_parts();
+ let method = reqwest::Method::from_str(parts.method.as_str()).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })?;
+
+ let path_and_query = parts.uri.path_and_query().ok_or_else(|| {
+ ApiError::InternalServerError(anyhow::anyhow!(
+ "Request conversion failed: no path and query"
+ ))
+ })?;
+
+ let uri = reqwest::Url::from_str(
+ format!(
+ "{}{}",
+ to_address.trim_end_matches("/"),
+ path_and_query.as_str()
+ )
+ .as_str(),
+ )
+ .map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })?;
+
+ let mut headers = reqwest::header::HeaderMap::new();
+ for (key, value) in parts.headers.into_iter() {
+ let key = match key {
+ Some(k) => k,
+ None => {
+ continue;
+ }
+ };
+
+ let key = reqwest::header::HeaderName::from_str(key.as_str()).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })?;
+
+ let value = reqwest::header::HeaderValue::from_bytes(value.as_bytes()).map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })?;
+
+ headers.insert(key, value);
+ }
+
+ let body = hyper::body::to_bytes(body).await.map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })?;
+
+ client
+ .request(method, uri)
+ .headers(headers)
+ .body(body)
+ .build()
+ .map_err(|err| {
+ ApiError::InternalServerError(anyhow::anyhow!("Request conversion failed: {err}"))
+ })
+}
+
pub fn make_router(
service: Arc,
auth: Option>,
diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py
index 2e21f8fb46..2d72dbb2df 100644
--- a/test_runner/regress/test_storage_controller.py
+++ b/test_runner/regress/test_storage_controller.py
@@ -2048,8 +2048,11 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
# Make a change to the tenant config to trigger a slow reconcile
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
virtual_ps_http.patch_tenant_config_client_side(tid, {"compaction_threshold": 5}, None)
- env.storage_controller.allowed_errors.append(
- ".*Accepted configuration update but reconciliation failed.*"
+ env.storage_controller.allowed_errors.extend(
+ [
+ ".*Accepted configuration update but reconciliation failed.*",
+ ".*Leader is stepped down instance",
+ ]
)
observed_state = env.storage_controller.step_down()
@@ -2072,9 +2075,9 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
assert "compaction_threshold" in ps_tenant_conf.effective_config
assert ps_tenant_conf.effective_config["compaction_threshold"] == 5
- # Validate that the storcon is not replying to the usual requests
- # once it has stepped down.
- with pytest.raises(StorageControllerApiException, match="stepped_down"):
+ # Validate that the storcon attempts to forward the request, but stops.
+ # when it realises it is still the current leader.
+ with pytest.raises(StorageControllerApiException, match="Leader is stepped down instance"):
env.storage_controller.tenant_list()
# Validate that we can step down multiple times and the observed state
@@ -2221,6 +2224,15 @@ def test_storage_controller_leadership_transfer(
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
+ if not step_down_times_out:
+ # Check that the stepped down instance forwards requests
+ # to the new leader while it's still running.
+ storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
+ env.storage_controller.tenant_list()
+ env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
+ status = env.storage_controller.node_status(env.pageservers[0].id)
+ assert status["scheduling"] == "Pause"
+
if step_down_times_out:
env.storage_controller.allowed_errors.extend(
[