diff --git a/Cargo.lock b/Cargo.lock index c514625518..0c246bd258 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2950,17 +2950,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leaky-bucket" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853" -dependencies = [ - "parking_lot 0.12.1", - "tokio", - "tracing", -] - [[package]] name = "libc" version = "0.2.150" @@ -3714,7 +3703,6 @@ dependencies = [ "humantime-serde", "hyper 0.14.26", "itertools 0.10.5", - "leaky-bucket", "md5", "metrics", "nix 0.27.1", @@ -6983,7 +6971,6 @@ dependencies = [ "humantime", "hyper 0.14.26", "jsonwebtoken", - "leaky-bucket", "metrics", "nix 0.27.1", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 7bd9a26394..fa949f9757 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,6 @@ ipnet = "2.9.0" itertools = "0.10" jsonwebtoken = "9" lasso = "0.7" -leaky-bucket = "1.0.1" libc = "0.2" md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d39ac75707..1d896863df 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -7,7 +7,7 @@ pub use utilization::PageserverUtilization; use std::{ collections::HashMap, io::{BufRead, Read}, - num::{NonZeroU64, NonZeroUsize}, + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, str::FromStr, sync::atomic::AtomicUsize, time::{Duration, SystemTime}, @@ -486,12 +486,11 @@ pub struct EvictionPolicyLayerAccessThreshold { #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct ThrottleConfig { pub task_kinds: Vec, // TaskKind - pub initial: usize, + pub initial: u32, #[serde(with = "humantime_serde")] pub refill_interval: Duration, - pub refill_amount: NonZeroUsize, - pub max: usize, - pub fair: bool, + pub refill_amount: NonZeroU32, + pub max: u32, } impl ThrottleConfig { @@ -501,9 +500,8 @@ impl ThrottleConfig { // other values don't matter with emtpy `task_kinds`. initial: 0, refill_interval: Duration::from_millis(1), - refill_amount: NonZeroUsize::new(1).unwrap(), + refill_amount: NonZeroU32::new(1).unwrap(), max: 1, - fair: true, } } /// The requests per second allowed by the given config. diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 6e593eeac1..777fb95ece 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -26,7 +26,6 @@ hyper = { workspace = true, features = ["full"] } fail.workspace = true futures = { workspace = true} jsonwebtoken.workspace = true -leaky-bucket.workspace = true nix.workspace = true once_cell.workspace = true pin-project-lite.workspace = true diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs new file mode 100644 index 0000000000..a120dc0ac5 --- /dev/null +++ b/libs/utils/src/leaky_bucket.rs @@ -0,0 +1,280 @@ +//! This module implements the Generic Cell Rate Algorithm for a simplified +//! version of the Leaky Bucket rate limiting system. +//! +//! # Leaky Bucket +//! +//! If the bucket is full, no new requests are allowed and are throttled/errored. +//! If the bucket is partially full/empty, new requests are added to the bucket in +//! terms of "tokens". +//! +//! Over time, tokens are removed from the bucket, naturally allowing new requests at a steady rate. +//! +//! The bucket size tunes the burst support. The drain rate tunes the steady-rate requests per second. +//! +//! # [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm) +//! +//! GCRA is a continuous rate leaky-bucket impl that stores minimal state and requires +//! no background jobs to drain tokens, as the design utilises timestamps to drain automatically over time. +//! +//! We store an "empty_at" timestamp as the only state. As time progresses, we will naturally approach +//! the empty state. The full-bucket state is calculated from `empty_at - config.bucket_width`. +//! +//! Another explaination can be found here: + +use std::{sync::Mutex, time::Duration}; + +use tokio::{sync::Notify, time::Instant}; + +pub struct LeakyBucketConfig { + /// This is the "time cost" of a single request unit. + /// Should loosely represent how long it takes to handle a request unit in active resource time. + /// Loosely speaking this is the inverse of the steady-rate requests-per-second + pub cost: Duration, + + /// total size of the bucket + pub bucket_width: Duration, +} + +impl LeakyBucketConfig { + pub fn new(rps: f64, bucket_size: f64) -> Self { + let cost = Duration::from_secs_f64(rps.recip()); + let bucket_width = cost.mul_f64(bucket_size); + Self { cost, bucket_width } + } +} + +pub struct LeakyBucketState { + /// Bucket is represented by `allow_at..empty_at` where `allow_at = empty_at - config.bucket_width`. + /// + /// At any given time, `empty_at - now` represents the number of tokens in the bucket, multiplied by the "time_cost". + /// Adding `n` tokens to the bucket is done by moving `empty_at` forward by `n * config.time_cost`. + /// If `now < allow_at`, the bucket is considered filled and cannot accept any more tokens. + /// Draining the bucket will happen naturally as `now` moves forward. + /// + /// Let `n` be some "time cost" for the request, + /// If now is after empty_at, the bucket is empty and the empty_at is reset to now, + /// If now is within the `bucket window + n`, we are within time budget. + /// If now is before the `bucket window + n`, we have run out of budget. + /// + /// This is inspired by the generic cell rate algorithm (GCRA) and works + /// exactly the same as a leaky-bucket. + pub empty_at: Instant, +} + +impl LeakyBucketState { + pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self { + LeakyBucketState { + empty_at: Instant::now() + config.cost.mul_f64(initial_tokens), + } + } + + pub fn bucket_is_empty(&self, now: Instant) -> bool { + // if self.end is after now, the bucket is not empty + self.empty_at <= now + } + + /// Immediately adds tokens to the bucket, if there is space. + /// + /// In a scenario where you are waiting for available rate, + /// rather than just erroring immediately, `started` corresponds to when this waiting started. + /// + /// `n` is the number of tokens that will be filled in the bucket. + /// + /// # Errors + /// + /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when + /// there will be space again. + pub fn add_tokens( + &mut self, + config: &LeakyBucketConfig, + started: Instant, + n: f64, + ) -> Result<(), Instant> { + let now = Instant::now(); + + // invariant: started <= now + debug_assert!(started <= now); + + // If the bucket was empty when we started our search, + // we should update the `empty_at` value accordingly. + // this prevents us from having negative tokens in the bucket. + let mut empty_at = self.empty_at; + if empty_at < started { + empty_at = started; + } + + let n = config.cost.mul_f64(n); + let new_empty_at = empty_at + n; + let allow_at = new_empty_at.checked_sub(config.bucket_width); + + // empty_at + // allow_at | new_empty_at + // / | / + // -------o-[---------o-|--]--------- + // now1 ^ now2 ^ + // + // at now1, the bucket would be completely filled if we add n tokens. + // at now2, the bucket would be partially filled if we add n tokens. + + match allow_at { + Some(allow_at) if now < allow_at => Err(allow_at), + _ => { + self.empty_at = new_empty_at; + Ok(()) + } + } + } +} + +pub struct RateLimiter { + pub config: LeakyBucketConfig, + pub state: Mutex, + /// a queue to provide this fair ordering. + pub queue: Notify, +} + +struct Requeue<'a>(&'a Notify); + +impl Drop for Requeue<'_> { + fn drop(&mut self) { + self.0.notify_one(); + } +} + +impl RateLimiter { + pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self { + RateLimiter { + state: Mutex::new(LeakyBucketState::with_initial_tokens( + &config, + initial_tokens, + )), + config, + queue: { + let queue = Notify::new(); + queue.notify_one(); + queue + }, + } + } + + pub fn steady_rps(&self) -> f64 { + self.config.cost.as_secs_f64().recip() + } + + /// returns true if we did throttle + pub async fn acquire(&self, count: usize) -> bool { + let mut throttled = false; + + let start = tokio::time::Instant::now(); + + // wait until we are the first in the queue + let mut notified = std::pin::pin!(self.queue.notified()); + if !notified.as_mut().enable() { + throttled = true; + notified.await; + } + + // notify the next waiter in the queue when we are done. + let _guard = Requeue(&self.queue); + + loop { + let res = self + .state + .lock() + .unwrap() + .add_tokens(&self.config, start, count as f64); + match res { + Ok(()) => return throttled, + Err(ready_at) => { + throttled = true; + tokio::time::sleep_until(ready_at).await; + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::Instant; + + use super::{LeakyBucketConfig, LeakyBucketState}; + + #[tokio::test(start_paused = true)] + async fn check() { + let config = LeakyBucketConfig { + // average 100rps + cost: Duration::from_millis(10), + // burst up to 100 requests + bucket_width: Duration::from_millis(1000), + }; + + let mut state = LeakyBucketState { + empty_at: Instant::now(), + }; + + // supports burst + { + // should work for 100 requests this instant + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(10)); + } + + // doesn't overfill + { + // after 1s we should have an empty bucket again. + tokio::time::advance(Duration::from_secs(1)).await; + assert!(state.bucket_is_empty(Instant::now())); + + // after 1s more, we should not over count the tokens and allow more than 200 requests. + tokio::time::advance(Duration::from_secs(1)).await; + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(10)); + } + + // supports sustained rate over a long period + { + tokio::time::advance(Duration::from_secs(1)).await; + + // should sustain 100rps + for _ in 0..2000 { + tokio::time::advance(Duration::from_millis(10)).await; + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + } + + // supports requesting more tokens than can be stored in the bucket + // we just wait a little bit longer upfront. + { + // start the bucket completely empty + tokio::time::advance(Duration::from_secs(5)).await; + assert!(state.bucket_is_empty(Instant::now())); + + // requesting 200 tokens of space should take 200*cost = 2s + // but we already have 1s available, so we wait 1s from start. + let start = Instant::now(); + + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_secs(1)); + + tokio::time::advance(Duration::from_millis(500)).await; + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + + tokio::time::advance(Duration::from_millis(500)).await; + state.add_tokens(&config, start, 200.0).unwrap(); + + // bucket should be completely full now + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(10)); + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index f4fc0ba57b..218dd468b1 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -71,6 +71,7 @@ pub mod postgres_client; pub mod tracing_span_assert; +pub mod leaky_bucket; pub mod rate_limit; /// Simple once-barrier and a guard which keeps barrier awaiting. diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 85c5e24afc..9c02ce3fbc 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,7 +37,6 @@ humantime.workspace = true humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true -leaky-bucket.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index f3f3d5e3ae..f222e708e1 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -10,6 +10,7 @@ use std::{ use arc_swap::ArcSwap; use enumset::EnumSet; use tracing::{error, warn}; +use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -33,8 +34,7 @@ pub struct Throttle { pub struct Inner { task_kinds: EnumSet, - rate_limiter: Arc, - config: Config, + rate_limiter: Arc, } pub type Config = pageserver_api::models::ThrottleConfig; @@ -77,8 +77,7 @@ where refill_interval, refill_amount, max, - fair, - } = &config; + } = config; let task_kinds: EnumSet = task_kinds .iter() .filter_map(|s| match TaskKind::from_str(s) { @@ -93,18 +92,21 @@ where } }) .collect(); + + // steady rate, we expect `refill_amount` requests per `refill_interval`. + // dividing gives us the rps. + let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64(); + let config = LeakyBucketConfig::new(rps, f64::from(max)); + + // initial tracks how many tokens are available to put in the bucket + // we want how many tokens are currently in the bucket + let initial_tokens = max - initial; + + let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens)); + Inner { task_kinds, - rate_limiter: Arc::new( - leaky_bucket::RateLimiter::builder() - .initial(*initial) - .interval(*refill_interval) - .refill(refill_amount.get()) - .max(*max) - .fair(*fair) - .build(), - ), - config, + rate_limiter: Arc::new(rate_limiter), } } pub fn reconfigure(&self, config: Config) { @@ -127,7 +129,7 @@ where /// See [`Config::steady_rps`]. pub fn steady_rps(&self) -> f64 { - self.inner.load().config.steady_rps() + self.inner.load().rate_limiter.steady_rps() } pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option { @@ -136,18 +138,9 @@ where return None; }; let start = std::time::Instant::now(); - let mut did_throttle = false; - let acquire = inner.rate_limiter.acquire(key_count); - // turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate - let acquire = tokio::task::unconstrained(acquire); - let mut acquire = std::pin::pin!(acquire); - std::future::poll_fn(|cx| { - use std::future::Future; - let poll = acquire.as_mut().poll(cx); - did_throttle = did_throttle || poll.is_pending(); - poll - }) - .await; + + let did_throttle = inner.rate_limiter.acquire(key_count).await; + self.count_accounted.fetch_add(1, Ordering::Relaxed); if did_throttle { self.count_throttled.fetch_add(1, Ordering::Relaxed); diff --git a/proxy/src/rate_limiter.rs b/proxy/src/rate_limiter.rs index e5f5867998..6e38f89458 100644 --- a/proxy/src/rate_limiter.rs +++ b/proxy/src/rate_limiter.rs @@ -10,7 +10,5 @@ pub(crate) use limit_algorithm::{ }; pub(crate) use limiter::GlobalRateLimiter; -pub use leaky_bucket::{ - EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter, LeakyBucketState, -}; +pub use leaky_bucket::{EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter}; pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index fa8cb75256..bf4d85f2e4 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -8,6 +8,7 @@ use dashmap::DashMap; use rand::{thread_rng, Rng}; use tokio::time::Instant; use tracing::info; +use utils::leaky_bucket::LeakyBucketState; use crate::intern::EndpointIdInt; @@ -16,7 +17,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter; pub struct LeakyBucketRateLimiter { map: DashMap, - config: LeakyBucketConfig, + config: utils::leaky_bucket::LeakyBucketConfig, access_count: AtomicUsize, } @@ -29,7 +30,7 @@ impl LeakyBucketRateLimiter { pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self { Self { map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards), - config, + config: config.into(), access_count: AtomicUsize::new(0), } } @@ -42,12 +43,12 @@ impl LeakyBucketRateLimiter { self.do_gc(now); } - let mut entry = self.map.entry(key).or_insert_with(|| LeakyBucketState { - time: now, - filled: 0.0, - }); + let mut entry = self + .map + .entry(key) + .or_insert_with(|| LeakyBucketState { empty_at: now }); - entry.check(&self.config, now, n as f64) + entry.add_tokens(&self.config, now, n as f64).is_ok() } fn do_gc(&self, now: Instant) { @@ -59,7 +60,7 @@ impl LeakyBucketRateLimiter { let shard = thread_rng().gen_range(0..n); self.map.shards()[shard] .write() - .retain(|_, value| !value.get_mut().update(&self.config, now)); + .retain(|_, value| !value.get().bucket_is_empty(now)); } } @@ -68,11 +69,6 @@ pub struct LeakyBucketConfig { pub max: f64, } -pub struct LeakyBucketState { - filled: f64, - time: Instant, -} - #[cfg(test)] impl LeakyBucketConfig { pub(crate) fn new(rps: f64, max: f64) -> Self { @@ -82,40 +78,9 @@ impl LeakyBucketConfig { } } -impl LeakyBucketState { - pub(crate) fn new() -> Self { - Self { - filled: 0.0, - time: Instant::now(), - } - } - - /// updates the timer and returns true if the bucket is empty - fn update(&mut self, info: &LeakyBucketConfig, now: Instant) -> bool { - let drain = now.duration_since(self.time); - let drain = drain.as_secs_f64() * info.rps; - - self.filled = (self.filled - drain).clamp(0.0, info.max); - self.time = now; - - self.filled == 0.0 - } - - pub(crate) fn check(&mut self, info: &LeakyBucketConfig, now: Instant, n: f64) -> bool { - self.update(info, now); - - if self.filled + n > info.max { - return false; - } - self.filled += n; - - true - } -} - -impl Default for LeakyBucketState { - fn default() -> Self { - Self::new() +impl From for utils::leaky_bucket::LeakyBucketConfig { + fn from(config: LeakyBucketConfig) -> Self { + utils::leaky_bucket::LeakyBucketConfig::new(config.rps, config.max) } } @@ -125,48 +90,55 @@ mod tests { use std::time::Duration; use tokio::time::Instant; + use utils::leaky_bucket::LeakyBucketState; - use super::{LeakyBucketConfig, LeakyBucketState}; + use super::LeakyBucketConfig; #[tokio::test(start_paused = true)] async fn check() { - let info = LeakyBucketConfig::new(500.0, 2000.0); - let mut bucket = LeakyBucketState::new(); + let config: utils::leaky_bucket::LeakyBucketConfig = + LeakyBucketConfig::new(500.0, 2000.0).into(); + assert_eq!(config.cost, Duration::from_millis(2)); + assert_eq!(config.bucket_width, Duration::from_secs(4)); + + let mut bucket = LeakyBucketState { + empty_at: Instant::now(), + }; // should work for 2000 requests this second for _ in 0..2000 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); - assert_eq!(bucket.filled, 2000.0); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(bucket.empty_at - Instant::now(), config.bucket_width); // in 1ms we should drain 0.5 tokens. // make sure we don't lose any tokens tokio::time::advance(Duration::from_millis(1)).await; - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); tokio::time::advance(Duration::from_millis(1)).await; - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); // in 10ms we should drain 5 tokens tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // in 10s we should drain 5000 tokens // but cap is only 2000 tokio::time::advance(Duration::from_secs(10)).await; for _ in 0..2000 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // should sustain 500rps for _ in 0..2000 { tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } } } diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index a7eda73d4c..bb337d9cc1 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -162,7 +162,6 @@ def test_fully_custom_config(positive_env: NeonEnv): "min_resident_size_override": 23, "timeline_get_throttle": { "task_kinds": ["PageRequestHandler"], - "fair": True, "initial": 0, "refill_interval": "1s", "refill_amount": 1000, diff --git a/test_runner/regress/test_pageserver_getpage_throttle.py b/test_runner/regress/test_pageserver_getpage_throttle.py index 111285b40c..4c9eac5cd7 100644 --- a/test_runner/regress/test_pageserver_getpage_throttle.py +++ b/test_runner/regress/test_pageserver_getpage_throttle.py @@ -1,3 +1,4 @@ +import copy import json import uuid @@ -116,3 +117,58 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P assert ( duration_secs >= 10 * actual_smgr_query_seconds ), "smgr metrics should not include throttle wait time" + + +throttle_config_with_field_fair_set = { + "task_kinds": ["PageRequestHandler"], + "fair": True, + "initial": 27, + "refill_interval": "43s", + "refill_amount": 23, + "max": 42, +} + + +def assert_throttle_config_with_field_fair_set(conf): + """ + Field `fair` is ignored, so, responses don't contain it + """ + without_fair = copy.deepcopy(throttle_config_with_field_fair_set) + without_fair.pop("fair") + + assert conf == without_fair + + +def test_throttle_fair_config_is_settable_but_ignored_in_mgmt_api(neon_env_builder: NeonEnvBuilder): + """ + To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out. + """ + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + # with_fair config should still be settable + ps_http.set_tenant_config( + env.initial_tenant, + {"timeline_get_throttle": throttle_config_with_field_fair_set}, + ) + conf = ps_http.tenant_config(env.initial_tenant) + assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"]) + assert_throttle_config_with_field_fair_set( + conf.tenant_specific_overrides["timeline_get_throttle"] + ) + + +def test_throttle_fair_config_is_settable_but_ignored_in_config_toml( + neon_env_builder: NeonEnvBuilder, +): + """ + To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out. + """ + + def set_tenant_config(ps_cfg): + ps_cfg["tenant_config"] = {"timeline_get_throttle": throttle_config_with_field_fair_set} + + neon_env_builder.pageserver_config_override = set_tenant_config + env = neon_env_builder.init_start() + ps_http = env.pageserver.http_client() + conf = ps_http.tenant_config(env.initial_tenant) + assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])