|
|
|
|
@@ -1,5 +1,5 @@
|
|
|
|
|
use std::str::FromStr;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::sync::{Arc, LazyLock};
|
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
|
|
|
|
use anyhow::Context;
|
|
|
|
|
@@ -33,6 +33,7 @@ use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
|
|
|
|
|
use pageserver_client::{BlockUnblock, mgmt_api};
|
|
|
|
|
use routerify::Middleware;
|
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
use tracing::warn;
|
|
|
|
|
use utils::auth::{Scope, SwappableJwtAuth};
|
|
|
|
|
use utils::id::{NodeId, TenantId, TimelineId};
|
|
|
|
|
|
|
|
|
|
@@ -49,6 +50,7 @@ use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIME
|
|
|
|
|
pub struct HttpState {
|
|
|
|
|
service: Arc<crate::service::Service>,
|
|
|
|
|
auth: Option<Arc<SwappableJwtAuth>>,
|
|
|
|
|
rate_limiter: governor::DefaultKeyedRateLimiter<TenantId>,
|
|
|
|
|
neon_metrics: NeonMetrics,
|
|
|
|
|
allowlist_routes: &'static [&'static str],
|
|
|
|
|
}
|
|
|
|
|
@@ -59,9 +61,11 @@ impl HttpState {
|
|
|
|
|
auth: Option<Arc<SwappableJwtAuth>>,
|
|
|
|
|
build_info: BuildInfo,
|
|
|
|
|
) -> Self {
|
|
|
|
|
let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit);
|
|
|
|
|
Self {
|
|
|
|
|
service,
|
|
|
|
|
auth,
|
|
|
|
|
rate_limiter: governor::RateLimiter::keyed(quota),
|
|
|
|
|
neon_metrics: NeonMetrics::new(build_info),
|
|
|
|
|
allowlist_routes: &[
|
|
|
|
|
"/status",
|
|
|
|
|
@@ -82,6 +86,40 @@ fn get_state(request: &Request<Body>) -> &HttpState {
|
|
|
|
|
.as_ref()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Rate limits tenant requests.
|
|
|
|
|
///
|
|
|
|
|
/// TODO: this should be a request middleware, but requires us to extract the tenant ID from
|
|
|
|
|
/// different URLs in a systematic way.
|
|
|
|
|
///
|
|
|
|
|
/// TODO: consider returning a 429 response if these start piling up.
|
|
|
|
|
async fn maybe_rate_limit(request: &Request<Body>, tenant_id: TenantId) {
|
|
|
|
|
// Check if the tenant should be rate-limited.
|
|
|
|
|
let rate_limiter = &get_state(request).rate_limiter;
|
|
|
|
|
if rate_limiter.check_key(&tenant_id).is_ok() {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Measure the rate limiting delay.
|
|
|
|
|
let _timer = METRICS_REGISTRY
|
|
|
|
|
.metrics_group
|
|
|
|
|
.storage_controller_http_request_rate_limited
|
|
|
|
|
.start_timer();
|
|
|
|
|
|
|
|
|
|
// Log rate limited tenants once every 10 seconds.
|
|
|
|
|
static LOG_RATE_LIMITER: LazyLock<governor::DefaultKeyedRateLimiter<TenantId>> =
|
|
|
|
|
LazyLock::new(|| {
|
|
|
|
|
let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap();
|
|
|
|
|
governor::RateLimiter::keyed(quota)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() {
|
|
|
|
|
warn!("tenant {tenant_id} is rate limited")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for quota.
|
|
|
|
|
rate_limiter.until_key_ready(&tenant_id).await;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Pageserver calls into this on startup, to learn which tenants it should attach
|
|
|
|
|
async fn handle_re_attach(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::GenerationsApi)?;
|
|
|
|
|
@@ -247,6 +285,7 @@ async fn handle_tenant_config_get(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -264,6 +303,7 @@ async fn handle_tenant_time_travel_remote_storage(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -311,6 +351,7 @@ async fn handle_tenant_secondary_download(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -329,6 +370,7 @@ async fn handle_tenant_delete(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -356,6 +398,7 @@ async fn handle_tenant_timeline_create(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -381,6 +424,7 @@ async fn handle_tenant_timeline_delete(
|
|
|
|
|
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
|
|
|
|
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -457,6 +501,7 @@ async fn handle_tenant_timeline_archival_config(
|
|
|
|
|
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
|
|
|
|
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -482,6 +527,7 @@ async fn handle_tenant_timeline_detach_ancestor(
|
|
|
|
|
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
|
|
|
|
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -504,6 +550,7 @@ async fn handle_tenant_timeline_block_unblock_gc(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
|
|
|
|
|
|
|
|
|
@@ -521,6 +568,7 @@ async fn handle_tenant_timeline_download_heatmap_layers(
|
|
|
|
|
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
|
|
|
|
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_shard_id.tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
|
|
|
|
let concurrency: Option<usize> = parse_query_param(&req, "concurrency")?;
|
|
|
|
|
@@ -550,6 +598,7 @@ async fn handle_tenant_timeline_passthrough(
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -654,6 +703,7 @@ async fn handle_tenant_locate(
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -669,9 +719,9 @@ async fn handle_tenant_describe(
|
|
|
|
|
service: Arc<Service>,
|
|
|
|
|
req: Request<Body>,
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Scrubber)?;
|
|
|
|
|
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::Scrubber)?;
|
|
|
|
|
// NB: don't rate limit: scrubber operation.
|
|
|
|
|
|
|
|
|
|
match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1006,6 +1056,7 @@ async fn handle_tenant_shard_split(
|
|
|
|
|
req: Request<Body>,
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1028,6 +1079,7 @@ async fn handle_tenant_shard_migrate(
|
|
|
|
|
req: Request<Body>,
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1051,6 +1103,7 @@ async fn handle_tenant_shard_migrate_secondary(
|
|
|
|
|
req: Request<Body>,
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1074,6 +1127,7 @@ async fn handle_tenant_shard_cancel_reconcile(
|
|
|
|
|
req: Request<Body>,
|
|
|
|
|
) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
let req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1093,6 +1147,7 @@ async fn handle_tenant_shard_cancel_reconcile(
|
|
|
|
|
|
|
|
|
|
async fn handle_tenant_update_policy(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::Admin)?;
|
|
|
|
|
// NB: don't rate limit: admin operation.
|
|
|
|
|
|
|
|
|
|
let mut req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1148,9 +1203,9 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
@@ -1165,9 +1220,9 @@ async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_tenant_import(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
|
|
|
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
|
|
|
check_permissions(&req, Scope::PageServerApi)?;
|
|
|
|
|
maybe_rate_limit(&req, tenant_id).await;
|
|
|
|
|
|
|
|
|
|
let req = match maybe_forward(req).await {
|
|
|
|
|
ForwardOutcome::Forwarded(res) => {
|
|
|
|
|
|