proxy+pageserver: shared leaky bucket impl (#8539)

In proxy I switched to a leaky-bucket impl using the GCRA algorithm. I
figured I could share the code with pageserver and remove the
leaky_bucket crate dependency with some very basic tokio timers and
queues for fairness.

The underlying algorithm should be fairly clear how it works from the
comments I have left in the code.

---

In benchmarking pageserver, @problame found that the new implementation
fixes a getpage throughput discontinuity in pageserver under the
`pagebench get-page-latest-lsn` benchmark with the clickbench dataset
(`test_perf_olap.py`).
The discontinuity is that for any of `--num-clients={2,3,4}`, getpage
throughput remains 10k.
With `--num-clients=5` and greater, getpage throughput then jumps to the
configured 20k rate limit.
With the changes in this PR, the discontinuity is gone, and we scale
throughput linearly to `--num-clients` until the configured rate limit.

More context in
https://github.com/neondatabase/cloud/issues/16886#issuecomment-2315257641.

closes https://github.com/neondatabase/cloud/issues/16886

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Conrad Ludgate
2024-08-29 12:26:52 +01:00
committed by GitHub
parent c2f8fdccd7
commit a644f01b6a
12 changed files with 395 additions and 114 deletions

13
Cargo.lock generated
View File

@@ -2950,17 +2950,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "leaky-bucket"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853"
dependencies = [
"parking_lot 0.12.1",
"tokio",
"tracing",
]
[[package]]
name = "libc"
version = "0.2.150"
@@ -3714,7 +3703,6 @@ dependencies = [
"humantime-serde",
"hyper 0.14.26",
"itertools 0.10.5",
"leaky-bucket",
"md5",
"metrics",
"nix 0.27.1",
@@ -6983,7 +6971,6 @@ dependencies = [
"humantime",
"hyper 0.14.26",
"jsonwebtoken",
"leaky-bucket",
"metrics",
"nix 0.27.1",
"once_cell",

View File

@@ -108,7 +108,6 @@ ipnet = "2.9.0"
itertools = "0.10"
jsonwebtoken = "9"
lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }

View File

@@ -7,7 +7,7 @@ pub use utilization::PageserverUtilization;
use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
num::{NonZeroU32, NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
@@ -486,12 +486,11 @@ pub struct EvictionPolicyLayerAccessThreshold {
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ThrottleConfig {
pub task_kinds: Vec<String>, // TaskKind
pub initial: usize,
pub initial: u32,
#[serde(with = "humantime_serde")]
pub refill_interval: Duration,
pub refill_amount: NonZeroUsize,
pub max: usize,
pub fair: bool,
pub refill_amount: NonZeroU32,
pub max: u32,
}
impl ThrottleConfig {
@@ -501,9 +500,8 @@ impl ThrottleConfig {
// other values don't matter with emtpy `task_kinds`.
initial: 0,
refill_interval: Duration::from_millis(1),
refill_amount: NonZeroUsize::new(1).unwrap(),
refill_amount: NonZeroU32::new(1).unwrap(),
max: 1,
fair: true,
}
}
/// The requests per second allowed by the given config.

View File

@@ -26,7 +26,6 @@ hyper = { workspace = true, features = ["full"] }
fail.workspace = true
futures = { workspace = true}
jsonwebtoken.workspace = true
leaky-bucket.workspace = true
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true

View File

@@ -0,0 +1,280 @@
//! This module implements the Generic Cell Rate Algorithm for a simplified
//! version of the Leaky Bucket rate limiting system.
//!
//! # Leaky Bucket
//!
//! If the bucket is full, no new requests are allowed and are throttled/errored.
//! If the bucket is partially full/empty, new requests are added to the bucket in
//! terms of "tokens".
//!
//! Over time, tokens are removed from the bucket, naturally allowing new requests at a steady rate.
//!
//! The bucket size tunes the burst support. The drain rate tunes the steady-rate requests per second.
//!
//! # [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)
//!
//! GCRA is a continuous rate leaky-bucket impl that stores minimal state and requires
//! no background jobs to drain tokens, as the design utilises timestamps to drain automatically over time.
//!
//! We store an "empty_at" timestamp as the only state. As time progresses, we will naturally approach
//! the empty state. The full-bucket state is calculated from `empty_at - config.bucket_width`.
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::{sync::Mutex, time::Duration};
use tokio::{sync::Notify, time::Instant};
pub struct LeakyBucketConfig {
/// This is the "time cost" of a single request unit.
/// Should loosely represent how long it takes to handle a request unit in active resource time.
/// Loosely speaking this is the inverse of the steady-rate requests-per-second
pub cost: Duration,
/// total size of the bucket
pub bucket_width: Duration,
}
impl LeakyBucketConfig {
pub fn new(rps: f64, bucket_size: f64) -> Self {
let cost = Duration::from_secs_f64(rps.recip());
let bucket_width = cost.mul_f64(bucket_size);
Self { cost, bucket_width }
}
}
pub struct LeakyBucketState {
/// Bucket is represented by `allow_at..empty_at` where `allow_at = empty_at - config.bucket_width`.
///
/// At any given time, `empty_at - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `empty_at` forward by `n * config.time_cost`.
/// If `now < allow_at`, the bucket is considered filled and cannot accept any more tokens.
/// Draining the bucket will happen naturally as `now` moves forward.
///
/// Let `n` be some "time cost" for the request,
/// If now is after empty_at, the bucket is empty and the empty_at is reset to now,
/// If now is within the `bucket window + n`, we are within time budget.
/// If now is before the `bucket window + n`, we have run out of budget.
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
pub empty_at: Instant,
}
impl LeakyBucketState {
pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self {
LeakyBucketState {
empty_at: Instant::now() + config.cost.mul_f64(initial_tokens),
}
}
pub fn bucket_is_empty(&self, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
self.empty_at <= now
}
/// Immediately adds tokens to the bucket, if there is space.
///
/// In a scenario where you are waiting for available rate,
/// rather than just erroring immediately, `started` corresponds to when this waiting started.
///
/// `n` is the number of tokens that will be filled in the bucket.
///
/// # Errors
///
/// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
/// there will be space again.
pub fn add_tokens(
&mut self,
config: &LeakyBucketConfig,
started: Instant,
n: f64,
) -> Result<(), Instant> {
let now = Instant::now();
// invariant: started <= now
debug_assert!(started <= now);
// If the bucket was empty when we started our search,
// we should update the `empty_at` value accordingly.
// this prevents us from having negative tokens in the bucket.
let mut empty_at = self.empty_at;
if empty_at < started {
empty_at = started;
}
let n = config.cost.mul_f64(n);
let new_empty_at = empty_at + n;
let allow_at = new_empty_at.checked_sub(config.bucket_width);
// empty_at
// allow_at | new_empty_at
// / | /
// -------o-[---------o-|--]---------
// now1 ^ now2 ^
//
// at now1, the bucket would be completely filled if we add n tokens.
// at now2, the bucket would be partially filled if we add n tokens.
match allow_at {
Some(allow_at) if now < allow_at => Err(allow_at),
_ => {
self.empty_at = new_empty_at;
Ok(())
}
}
}
}
pub struct RateLimiter {
pub config: LeakyBucketConfig,
pub state: Mutex<LeakyBucketState>,
/// a queue to provide this fair ordering.
pub queue: Notify,
}
struct Requeue<'a>(&'a Notify);
impl Drop for Requeue<'_> {
fn drop(&mut self) {
self.0.notify_one();
}
}
impl RateLimiter {
pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
RateLimiter {
state: Mutex::new(LeakyBucketState::with_initial_tokens(
&config,
initial_tokens,
)),
config,
queue: {
let queue = Notify::new();
queue.notify_one();
queue
},
}
}
pub fn steady_rps(&self) -> f64 {
self.config.cost.as_secs_f64().recip()
}
/// returns true if we did throttle
pub async fn acquire(&self, count: usize) -> bool {
let mut throttled = false;
let start = tokio::time::Instant::now();
// wait until we are the first in the queue
let mut notified = std::pin::pin!(self.queue.notified());
if !notified.as_mut().enable() {
throttled = true;
notified.await;
}
// notify the next waiter in the queue when we are done.
let _guard = Requeue(&self.queue);
loop {
let res = self
.state
.lock()
.unwrap()
.add_tokens(&self.config, start, count as f64);
match res {
Ok(()) => return throttled,
Err(ready_at) => {
throttled = true;
tokio::time::sleep_until(ready_at).await;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::Instant;
use super::{LeakyBucketConfig, LeakyBucketState};
#[tokio::test(start_paused = true)]
async fn check() {
let config = LeakyBucketConfig {
// average 100rps
cost: Duration::from_millis(10),
// burst up to 100 requests
bucket_width: Duration::from_millis(1000),
};
let mut state = LeakyBucketState {
empty_at: Instant::now(),
};
// supports burst
{
// should work for 100 requests this instant
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
// doesn't overfill
{
// after 1s we should have an empty bucket again.
tokio::time::advance(Duration::from_secs(1)).await;
assert!(state.bucket_is_empty(Instant::now()));
// after 1s more, we should not over count the tokens and allow more than 200 requests.
tokio::time::advance(Duration::from_secs(1)).await;
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
// supports sustained rate over a long period
{
tokio::time::advance(Duration::from_secs(1)).await;
// should sustain 100rps
for _ in 0..2000 {
tokio::time::advance(Duration::from_millis(10)).await;
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
}
// supports requesting more tokens than can be stored in the bucket
// we just wait a little bit longer upfront.
{
// start the bucket completely empty
tokio::time::advance(Duration::from_secs(5)).await;
assert!(state.bucket_is_empty(Instant::now()));
// requesting 200 tokens of space should take 200*cost = 2s
// but we already have 1s available, so we wait 1s from start.
let start = Instant::now();
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_secs(1));
tokio::time::advance(Duration::from_millis(500)).await;
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
tokio::time::advance(Duration::from_millis(500)).await;
state.add_tokens(&config, start, 200.0).unwrap();
// bucket should be completely full now
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
}
}

View File

@@ -71,6 +71,7 @@ pub mod postgres_client;
pub mod tracing_span_assert;
pub mod leaky_bucket;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.

View File

@@ -37,7 +37,6 @@ humantime.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
leaky-bucket.workspace = true
md5.workspace = true
nix.workspace = true
# hack to get the number of worker threads tokio uses

View File

@@ -10,6 +10,7 @@ use std::{
use arc_swap::ArcSwap;
use enumset::EnumSet;
use tracing::{error, warn};
use utils::leaky_bucket::{LeakyBucketConfig, RateLimiter};
use crate::{context::RequestContext, task_mgr::TaskKind};
@@ -33,8 +34,7 @@ pub struct Throttle<M: Metric> {
pub struct Inner {
task_kinds: EnumSet<TaskKind>,
rate_limiter: Arc<leaky_bucket::RateLimiter>,
config: Config,
rate_limiter: Arc<RateLimiter>,
}
pub type Config = pageserver_api::models::ThrottleConfig;
@@ -77,8 +77,7 @@ where
refill_interval,
refill_amount,
max,
fair,
} = &config;
} = config;
let task_kinds: EnumSet<TaskKind> = task_kinds
.iter()
.filter_map(|s| match TaskKind::from_str(s) {
@@ -93,18 +92,21 @@ where
}
})
.collect();
// steady rate, we expect `refill_amount` requests per `refill_interval`.
// dividing gives us the rps.
let rps = f64::from(refill_amount.get()) / refill_interval.as_secs_f64();
let config = LeakyBucketConfig::new(rps, f64::from(max));
// initial tracks how many tokens are available to put in the bucket
// we want how many tokens are currently in the bucket
let initial_tokens = max - initial;
let rate_limiter = RateLimiter::with_initial_tokens(config, f64::from(initial_tokens));
Inner {
task_kinds,
rate_limiter: Arc::new(
leaky_bucket::RateLimiter::builder()
.initial(*initial)
.interval(*refill_interval)
.refill(refill_amount.get())
.max(*max)
.fair(*fair)
.build(),
),
config,
rate_limiter: Arc::new(rate_limiter),
}
}
pub fn reconfigure(&self, config: Config) {
@@ -127,7 +129,7 @@ where
/// See [`Config::steady_rps`].
pub fn steady_rps(&self) -> f64 {
self.inner.load().config.steady_rps()
self.inner.load().rate_limiter.steady_rps()
}
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
@@ -136,18 +138,9 @@ where
return None;
};
let start = std::time::Instant::now();
let mut did_throttle = false;
let acquire = inner.rate_limiter.acquire(key_count);
// turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
let acquire = tokio::task::unconstrained(acquire);
let mut acquire = std::pin::pin!(acquire);
std::future::poll_fn(|cx| {
use std::future::Future;
let poll = acquire.as_mut().poll(cx);
did_throttle = did_throttle || poll.is_pending();
poll
})
.await;
let did_throttle = inner.rate_limiter.acquire(key_count).await;
self.count_accounted.fetch_add(1, Ordering::Relaxed);
if did_throttle {
self.count_throttled.fetch_add(1, Ordering::Relaxed);

View File

@@ -10,7 +10,5 @@ pub(crate) use limit_algorithm::{
};
pub(crate) use limiter::GlobalRateLimiter;
pub use leaky_bucket::{
EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter, LeakyBucketState,
};
pub use leaky_bucket::{EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter};
pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter};

View File

@@ -8,6 +8,7 @@ use dashmap::DashMap;
use rand::{thread_rng, Rng};
use tokio::time::Instant;
use tracing::info;
use utils::leaky_bucket::LeakyBucketState;
use crate::intern::EndpointIdInt;
@@ -16,7 +17,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter<EndpointIdInt>;
pub struct LeakyBucketRateLimiter<Key> {
map: DashMap<Key, LeakyBucketState, RandomState>,
config: LeakyBucketConfig,
config: utils::leaky_bucket::LeakyBucketConfig,
access_count: AtomicUsize,
}
@@ -29,7 +30,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self {
Self {
map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
config,
config: config.into(),
access_count: AtomicUsize::new(0),
}
}
@@ -42,12 +43,12 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
self.do_gc(now);
}
let mut entry = self.map.entry(key).or_insert_with(|| LeakyBucketState {
time: now,
filled: 0.0,
});
let mut entry = self
.map
.entry(key)
.or_insert_with(|| LeakyBucketState { empty_at: now });
entry.check(&self.config, now, n as f64)
entry.add_tokens(&self.config, now, n as f64).is_ok()
}
fn do_gc(&self, now: Instant) {
@@ -59,7 +60,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let shard = thread_rng().gen_range(0..n);
self.map.shards()[shard]
.write()
.retain(|_, value| !value.get_mut().update(&self.config, now));
.retain(|_, value| !value.get().bucket_is_empty(now));
}
}
@@ -68,11 +69,6 @@ pub struct LeakyBucketConfig {
pub max: f64,
}
pub struct LeakyBucketState {
filled: f64,
time: Instant,
}
#[cfg(test)]
impl LeakyBucketConfig {
pub(crate) fn new(rps: f64, max: f64) -> Self {
@@ -82,40 +78,9 @@ impl LeakyBucketConfig {
}
}
impl LeakyBucketState {
pub(crate) fn new() -> Self {
Self {
filled: 0.0,
time: Instant::now(),
}
}
/// updates the timer and returns true if the bucket is empty
fn update(&mut self, info: &LeakyBucketConfig, now: Instant) -> bool {
let drain = now.duration_since(self.time);
let drain = drain.as_secs_f64() * info.rps;
self.filled = (self.filled - drain).clamp(0.0, info.max);
self.time = now;
self.filled == 0.0
}
pub(crate) fn check(&mut self, info: &LeakyBucketConfig, now: Instant, n: f64) -> bool {
self.update(info, now);
if self.filled + n > info.max {
return false;
}
self.filled += n;
true
}
}
impl Default for LeakyBucketState {
fn default() -> Self {
Self::new()
impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
fn from(config: LeakyBucketConfig) -> Self {
utils::leaky_bucket::LeakyBucketConfig::new(config.rps, config.max)
}
}
@@ -125,48 +90,55 @@ mod tests {
use std::time::Duration;
use tokio::time::Instant;
use utils::leaky_bucket::LeakyBucketState;
use super::{LeakyBucketConfig, LeakyBucketState};
use super::LeakyBucketConfig;
#[tokio::test(start_paused = true)]
async fn check() {
let info = LeakyBucketConfig::new(500.0, 2000.0);
let mut bucket = LeakyBucketState::new();
let config: utils::leaky_bucket::LeakyBucketConfig =
LeakyBucketConfig::new(500.0, 2000.0).into();
assert_eq!(config.cost, Duration::from_millis(2));
assert_eq!(config.bucket_width, Duration::from_secs(4));
let mut bucket = LeakyBucketState {
empty_at: Instant::now(),
};
// should work for 2000 requests this second
for _ in 0..2000 {
assert!(bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
assert!(!bucket.check(&info, Instant::now(), 1.0));
assert_eq!(bucket.filled, 2000.0);
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(bucket.empty_at - Instant::now(), config.bucket_width);
// in 1ms we should drain 0.5 tokens.
// make sure we don't lose any tokens
tokio::time::advance(Duration::from_millis(1)).await;
assert!(!bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
tokio::time::advance(Duration::from_millis(1)).await;
assert!(bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
// in 10ms we should drain 5 tokens
tokio::time::advance(Duration::from_millis(10)).await;
for _ in 0..5 {
assert!(bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
assert!(!bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
// in 10s we should drain 5000 tokens
// but cap is only 2000
tokio::time::advance(Duration::from_secs(10)).await;
for _ in 0..2000 {
assert!(bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
assert!(!bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
// should sustain 500rps
for _ in 0..2000 {
tokio::time::advance(Duration::from_millis(10)).await;
for _ in 0..5 {
assert!(bucket.check(&info, Instant::now(), 1.0));
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
}
}

View File

@@ -162,7 +162,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
"min_resident_size_override": 23,
"timeline_get_throttle": {
"task_kinds": ["PageRequestHandler"],
"fair": True,
"initial": 0,
"refill_interval": "1s",
"refill_amount": 1000,

View File

@@ -1,3 +1,4 @@
import copy
import json
import uuid
@@ -116,3 +117,58 @@ def test_pageserver_getpage_throttle(neon_env_builder: NeonEnvBuilder, pg_bin: P
assert (
duration_secs >= 10 * actual_smgr_query_seconds
), "smgr metrics should not include throttle wait time"
throttle_config_with_field_fair_set = {
"task_kinds": ["PageRequestHandler"],
"fair": True,
"initial": 27,
"refill_interval": "43s",
"refill_amount": 23,
"max": 42,
}
def assert_throttle_config_with_field_fair_set(conf):
"""
Field `fair` is ignored, so, responses don't contain it
"""
without_fair = copy.deepcopy(throttle_config_with_field_fair_set)
without_fair.pop("fair")
assert conf == without_fair
def test_throttle_fair_config_is_settable_but_ignored_in_mgmt_api(neon_env_builder: NeonEnvBuilder):
"""
To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out.
"""
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
# with_fair config should still be settable
ps_http.set_tenant_config(
env.initial_tenant,
{"timeline_get_throttle": throttle_config_with_field_fair_set},
)
conf = ps_http.tenant_config(env.initial_tenant)
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])
assert_throttle_config_with_field_fair_set(
conf.tenant_specific_overrides["timeline_get_throttle"]
)
def test_throttle_fair_config_is_settable_but_ignored_in_config_toml(
neon_env_builder: NeonEnvBuilder,
):
"""
To be removed after https://github.com/neondatabase/neon/pull/8539 is rolled out.
"""
def set_tenant_config(ps_cfg):
ps_cfg["tenant_config"] = {"timeline_get_throttle": throttle_config_with_field_fair_set}
neon_env_builder.pageserver_config_override = set_tenant_config
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
conf = ps_http.tenant_config(env.initial_tenant)
assert_throttle_config_with_field_fair_set(conf.effective_config["timeline_get_throttle"])