diff --git a/Cargo.lock b/Cargo.lock index 293ed465ff..a978e4d744 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2410,9 +2410,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-timer" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" @@ -2515,6 +2515,27 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "governor" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0" +dependencies = [ + "cfg-if", + "dashmap 6.1.0", + "futures-sink", + "futures-timer", + "futures-util", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "group" version = "0.12.1" @@ -3725,6 +3746,12 @@ dependencies = [ "memoffset 0.9.0", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -3735,6 +3762,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "notify" version = "8.0.0" @@ -4591,6 +4624,12 @@ dependencies = [ "never-say-never", ] +[[package]] +name = "portable-atomic" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" + [[package]] name = "postgres" version = "0.19.7" @@ -5052,6 +5091,21 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "quanta" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -5182,6 +5236,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "raw-cpuid" +version = "11.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "rayon" version = "1.7.0" @@ -6395,6 +6458,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.6.0" @@ -6471,6 +6543,7 @@ dependencies = [ "diesel_migrations", "fail", "futures", + "governor", "hex", "http-utils", "humantime", diff --git a/Cargo.toml b/Cargo.toml index ff45d46a47..870b3412db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ futures = "0.3" futures-core = "0.3" futures-util = "0.3" git-version = "0.3" +governor = "0.8" hashbrown = "0.14" hashlink = "0.9.1" hdrhistogram = "7.5.2" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index b63ba154da..6b657b5ea0 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -21,6 +21,7 @@ clap.workspace = true cron.workspace = true fail.workspace = true futures.workspace = true +governor.workspace = true hex.workspace = true hyper0.workspace = true humantime.workspace = true diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index 64f0be3c23..3e448d7013 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -1,5 +1,5 @@ use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::{Duration, Instant}; use anyhow::Context; @@ -33,6 +33,7 @@ use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest}; use pageserver_client::{BlockUnblock, mgmt_api}; use routerify::Middleware; use tokio_util::sync::CancellationToken; +use tracing::warn; use utils::auth::{Scope, SwappableJwtAuth}; use utils::id::{NodeId, TenantId, TimelineId}; @@ -49,6 +50,7 @@ use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIME pub struct HttpState { service: Arc, auth: Option>, + rate_limiter: governor::DefaultKeyedRateLimiter, neon_metrics: NeonMetrics, allowlist_routes: &'static [&'static str], } @@ -59,9 +61,11 @@ impl HttpState { auth: Option>, build_info: BuildInfo, ) -> Self { + let quota = governor::Quota::per_second(service.get_config().tenant_rate_limit); Self { service, auth, + rate_limiter: governor::RateLimiter::keyed(quota), neon_metrics: NeonMetrics::new(build_info), allowlist_routes: &[ "/status", @@ -82,6 +86,40 @@ fn get_state(request: &Request) -> &HttpState { .as_ref() } +/// Rate limits tenant requests. +/// +/// TODO: this should be a request middleware, but requires us to extract the tenant ID from +/// different URLs in a systematic way. +/// +/// TODO: consider returning a 429 response if these start piling up. +async fn maybe_rate_limit(request: &Request, tenant_id: TenantId) { + // Check if the tenant should be rate-limited. + let rate_limiter = &get_state(request).rate_limiter; + if rate_limiter.check_key(&tenant_id).is_ok() { + return; + } + + // Measure the rate limiting delay. + let _timer = METRICS_REGISTRY + .metrics_group + .storage_controller_http_request_rate_limited + .start_timer(); + + // Log rate limited tenants once every 10 seconds. + static LOG_RATE_LIMITER: LazyLock> = + LazyLock::new(|| { + let quota = governor::Quota::with_period(Duration::from_secs(10)).unwrap(); + governor::RateLimiter::keyed(quota) + }); + + if LOG_RATE_LIMITER.check_key(&tenant_id).is_ok() { + warn!("tenant {tenant_id} is rate limited") + } + + // Wait for quota. + rate_limiter.until_key_ready(&tenant_id).await; +} + /// Pageserver calls into this on startup, to learn which tenants it should attach async fn handle_re_attach(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::GenerationsApi)?; @@ -247,6 +285,7 @@ async fn handle_tenant_config_get( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -264,6 +303,7 @@ async fn handle_tenant_time_travel_remote_storage( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -311,6 +351,7 @@ async fn handle_tenant_secondary_download( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis); + maybe_rate_limit(&req, tenant_id).await; match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -329,6 +370,7 @@ async fn handle_tenant_delete( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -356,6 +398,7 @@ async fn handle_tenant_timeline_create( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -381,6 +424,7 @@ async fn handle_tenant_timeline_delete( let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -457,6 +501,7 @@ async fn handle_tenant_timeline_archival_config( let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -482,6 +527,7 @@ async fn handle_tenant_timeline_detach_ancestor( let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -504,6 +550,7 @@ async fn handle_tenant_timeline_block_unblock_gc( ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; @@ -521,6 +568,7 @@ async fn handle_tenant_timeline_download_heatmap_layers( let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_shard_id.tenant_id).await; let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; let concurrency: Option = parse_query_param(&req, "concurrency")?; @@ -550,6 +598,7 @@ async fn handle_tenant_timeline_passthrough( ) -> Result, ApiError> { let tenant_or_shard_id: TenantShardId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_or_shard_id.tenant_id).await; let req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -654,6 +703,7 @@ async fn handle_tenant_locate( let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -669,9 +719,9 @@ async fn handle_tenant_describe( service: Arc, req: Request, ) -> Result, ApiError> { - check_permissions(&req, Scope::Scrubber)?; - let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::Scrubber)?; + // NB: don't rate limit: scrubber operation. match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1006,6 +1056,7 @@ async fn handle_tenant_shard_split( req: Request, ) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1028,6 +1079,7 @@ async fn handle_tenant_shard_migrate( req: Request, ) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1051,6 +1103,7 @@ async fn handle_tenant_shard_migrate_secondary( req: Request, ) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1074,6 +1127,7 @@ async fn handle_tenant_shard_cancel_reconcile( req: Request, ) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. let req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1093,6 +1147,7 @@ async fn handle_tenant_shard_cancel_reconcile( async fn handle_tenant_update_policy(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; + // NB: don't rate limit: admin operation. let mut req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1148,9 +1203,9 @@ async fn handle_step_down(req: Request) -> Result, ApiError } async fn handle_tenant_drop(req: Request) -> Result, ApiError> { - check_permissions(&req, Scope::PageServerApi)?; - let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { @@ -1165,9 +1220,9 @@ async fn handle_tenant_drop(req: Request) -> Result, ApiErr } async fn handle_tenant_import(req: Request) -> Result, ApiError> { - check_permissions(&req, Scope::PageServerApi)?; - let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; let req = match maybe_forward(req).await { ForwardOutcome::Forwarded(res) => { diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 380ffeb9b7..6ef17c0007 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroU32; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -98,6 +99,10 @@ struct Cli { #[arg(long)] priority_reconciler_concurrency: Option, + /// Tenant API rate limit, as requests per second per tenant. + #[arg(long, default_value = "10")] + tenant_rate_limit: NonZeroU32, + /// How long to wait for the initial database connection to be available. #[arg(long, default_value = "5s")] db_connect_timeout: humantime::Duration, @@ -339,6 +344,7 @@ async fn async_main() -> anyhow::Result<()> { priority_reconciler_concurrency: args .priority_reconciler_concurrency .unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT), + tenant_rate_limit: args.tenant_rate_limit, split_threshold: args.split_threshold, neon_local_repo_dir: args.neon_local_repo_dir, max_secondary_lag_bytes: args.max_secondary_lag_bytes, diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index f490edb68f..ea390df726 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -76,6 +76,10 @@ pub(crate) struct StorageControllerMetricGroup { pub(crate) storage_controller_http_request_latency: measured::HistogramVec, + /// HTTP rate limiting latency across all tenants and endpoints + #[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 10.0))] + pub(crate) storage_controller_http_request_rate_limited: measured::Histogram<10>, + /// Count of HTTP requests to the pageserver that resulted in an error, /// broken down by the pageserver node id, request name and method pub(crate) storage_controller_pageserver_request_error: diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 26ccfd5445..8fc7f7a0c5 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5,6 +5,7 @@ use std::borrow::Cow; use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap, HashSet}; use std::error::Error; +use std::num::NonZeroU32; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; @@ -365,6 +366,10 @@ pub struct Config { /// How many high-priority Reconcilers may be spawned concurrently pub priority_reconciler_concurrency: usize, + /// How many API requests per second to allow per tenant, across all + /// tenant-scoped API endpoints. Further API requests queue until ready. + pub tenant_rate_limit: NonZeroU32, + /// How large must a shard grow in bytes before we split it? /// None disables auto-splitting. pub split_threshold: Option, diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 4fce558840..abddfa2768 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -124,6 +124,8 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [ # controller's attempts to notify the endpoint). ".*reconciler.*neon_local notification hook failed.*", ".*reconciler.*neon_local error.*", + # Tenant rate limits may fire in tests that submit lots of API requests. + ".*tenant \\S+ is rate limited.*", ]