share impl between proxy and ps

This commit is contained in:
Conrad Ludgate
2024-07-29 14:08:37 +01:00
parent 40d239560f
commit 28b85ca711
4 changed files with 191 additions and 192 deletions

View File

@@ -0,0 +1,113 @@
use std::time::Duration;
use tokio::time::Instant;
pub struct LeakyBucketConfig {
/// Leaky buckets can drain at a fixed interval rate.
/// We track all times as durations since this epoch so we can round down.
pub epoch: Instant,
/// How frequently we drain the bucket.
/// If equal to 0, we drain constantly over time.
/// If greater than 0, we drain at fixed intervals.
pub refill_rate: Duration,
/// "time cost" of a single request unit.
/// loosely represents how long it takes to handle a request unit in active CPU time.
pub time_cost: Duration,
/// total size of the bucket
pub bucket_width: Duration,
}
impl LeakyBucketConfig {
pub fn quantize_instant(&self, now: Instant) -> Duration {
let mut now = now - self.epoch;
if self.refill_rate > Duration::ZERO {
// we only "add" new tokens on a fixed interval.
// truncate to the most recent multiple of self.interval.
now = self
.refill_rate
.mul_f64(now.div_duration_f64(self.refill_rate).trunc());
}
now
}
}
// impl From<LeakyBucketConfig> for LeakyBucketConfig {
// fn from(config: LeakyBucketConfig) -> Self {
// // seconds_per_request = 1/(request_per_second)
// let spr = config.rps.recip();
// Self {
// time_cost: Duration::from_secs_f64(spr),
// bucket_width: Duration::from_secs_f64(config.max * spr),
// }
// }
// }
pub struct LeakyBucketState {
/// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`.
///
/// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`.
/// If `now < start`, the bucket is considered filled and cannot accept any more tokens.
/// Draining the bucket will happen naturally as `now` moves forward.
///
/// Let `n` be some "time cost" for the request,
/// If now is after end, the bucket is empty and the end is reset to now,
/// If now is within the `bucket window + n`, we are within time budget.
/// If now is before the `bucket window + n`, we have run out of budget.
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
pub end: Duration,
}
impl LeakyBucketState {
pub fn new(now: Duration) -> Self {
Self { end: now }
}
pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
config.quantize_instant(now) < self.end
}
/// Immedaitely adds tokens to the bucket, if there is space.
/// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
/// there will be space again.
pub fn add_tokens(
&mut self,
config: &LeakyBucketConfig,
now: Instant,
n: f64,
) -> Result<(), Instant> {
let now = config.quantize_instant(now);
let start = self.end - config.bucket_width;
let n = config.time_cost.mul_f64(n);
// start end
// | start+n | end+n
// | / | /
// ------{o-[---------o-}--]----o----
// now1 ^ now2 ^ ^ now3
//
// at now1, the bucket would be completely filled if we add n tokens.
// at now2, the bucket would be partially filled if we add n tokens.
// at now3, the bucket would start completely empty before we add n tokens.
if self.end + n <= now {
self.end = now + n;
Ok(())
} else if start + n <= now {
self.end += n;
Ok(())
} else {
Err(config.epoch + start + n)
}
}
}

View File

@@ -71,6 +71,7 @@ pub mod postgres_client;
pub mod tracing_span_assert;
pub mod leaky_bucket;
pub mod rate_limit;
/// Simple once-barrier and a guard which keeps barrier awaiting.

View File

@@ -11,6 +11,7 @@ use arc_swap::ArcSwap;
use enumset::EnumSet;
use tokio::sync::Notify;
use tracing::{error, warn};
use utils::leaky_bucket::{LeakyBucketConfig, LeakyBucketState};
use crate::{context::RequestContext, task_mgr::TaskKind};
@@ -93,18 +94,33 @@ where
}
})
.collect();
//
let time_cost = refill_interval / refill_amount.get() as u32;
let bucket_width = time_cost * (max as u32);
// initial tracks how many tokens are available to put in the bucket
// we want how many tokens are currently in the bucket
let initial_tokens = (max - initial) as u32;
let end = time_cost * initial_tokens;
// start a bit early to avoid certain underflow issues
let epoch_offset = 2 * bucket_width + refill_interval;
let rate_limiter = RateLimiter {
config: LeakyBucketConfig {
epoch: tokio::time::Instant::now() - epoch_offset,
refill_rate: refill_interval,
time_cost,
bucket_width,
},
state: Mutex::new(LeakyBucketState::new(end + epoch_offset)),
queue: fair.then(Notify::new),
};
Inner {
task_kinds,
rate_limiter: Arc::new(
RateLimiterBuilder {
initial,
refill_interval,
refill: refill_amount.get(),
max,
fair,
}
.build(),
),
rate_limiter: Arc::new(rate_limiter),
}
}
pub fn reconfigure(&self, config: Config) {
@@ -137,7 +153,7 @@ where
};
let start = std::time::Instant::now();
let did_throttle = !inner.rate_limiter.acquire(key_count).await;
let did_throttle = inner.rate_limiter.acquire(key_count).await;
self.count_accounted.fetch_add(1, Ordering::Relaxed);
if did_throttle {
@@ -169,89 +185,33 @@ where
}
struct RateLimiter {
epoch: tokio::time::Instant,
/// "time cost" of a single request unit.
/// loosely represents how long it takes to handle a request unit in active CPU time.
time_cost: Duration,
bucket_width: Duration,
interval: Duration,
/// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`.
///
/// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`.
/// If `now < start`, the bucket is considered filled and cannot accept any more tokens.
/// Draining the bucket will happen naturally as `now` moves forward.
///
/// Let `n` be some "time cost" for the request,
/// If now is after end, the bucket is empty and the end is reset to now,
/// If now is within the `bucket window + n`, we are within time budget.
/// If now is before the `bucket window + n`, we have run out of budget.
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
end: Mutex<Duration>,
config: LeakyBucketConfig,
state: Mutex<LeakyBucketState>,
/// if this rate limiter is fair,
/// provide a queue to provide this fair ordering.
queue: Option<Notify>,
}
struct RateLimiterBuilder {
/// The max number of tokens.
max: usize,
/// The initial count of tokens.
initial: usize,
/// Tokens to add every `per` duration.
refill: usize,
/// Interval to add tokens in milliseconds.
refill_interval: Duration,
/// If the rate limiter is fair or not.
fair: bool,
}
impl RateLimiterBuilder {
fn build(self) -> RateLimiter {
let queue = self.fair.then(Notify::new);
let time_cost = self.refill_interval / self.refill as u32;
let bucket_width = time_cost * (self.max as u32);
let initial_allow = time_cost * (self.initial as u32);
let end = bucket_width - initial_allow;
RateLimiter {
epoch: tokio::time::Instant::now(),
time_cost,
bucket_width,
interval: self.refill_interval,
end: Mutex::new(end),
queue,
}
}
}
impl RateLimiter {
fn steady_rps(&self) -> f64 {
self.time_cost.as_secs_f64().recip()
self.config.time_cost.as_secs_f64().recip()
}
/// returns true if not throttled
/// returns true if we did throttle
async fn acquire(&self, count: usize) -> bool {
let mut not_throttled = true;
let n = self.time_cost.mul_f64(count as f64);
let mut throttled = false;
// wait until we are the first in the queue
if let Some(queue) = &self.queue {
let mut notified = std::pin::pin!(queue.notified());
if !notified.as_mut().enable() {
not_throttled = false;
throttled = true;
notified.await;
}
}
// notify the next waiter in the queue
// notify the next waiter in the queue when we are done.
scopeguard::defer! {
if let Some(queue) = &self.queue {
queue.notify_one();
@@ -260,42 +220,19 @@ impl RateLimiter {
loop {
let now = tokio::time::Instant::now();
let now = now - self.epoch;
// we only "add" new tokens on a fixed interval.
// truncate to the most recent multiple of self.interval.
let now = self
.interval
.mul_f64(now.div_duration_f64(self.interval).trunc());
let ready_at = {
// start end
// | start+n | end+n
// | / | /
// ------{o-[---------o-}--]----o----
// now1 ^ now2 ^ ^ now3
//
// at now1, the bucket would be completely filled if we add n tokens.
// at now2, the bucket would be partially filled if we add n tokens.
// at now3, the bucket would start completely empty before we add n tokens.
let mut end = self.end.lock().unwrap();
let start = *end - self.bucket_width;
let ready_at = start + n;
if *end + n <= now {
*end = now + n;
return not_throttled;
} else if ready_at <= now {
*end += n;
return not_throttled;
let res = self
.state
.lock()
.unwrap()
.add_tokens(&self.config, now, count as f64);
match res {
Ok(()) => return throttled,
Err(ready_at) => {
throttled = true;
tokio::time::sleep_until(ready_at).await;
}
ready_at
};
not_throttled = false;
tokio::time::sleep_until(self.epoch + ready_at).await;
}
}
}
}

View File

@@ -9,6 +9,7 @@ use dashmap::DashMap;
use rand::{thread_rng, Rng};
use tokio::time::Instant;
use tracing::info;
use utils::leaky_bucket::LeakyBucketState;
use crate::intern::EndpointIdInt;
@@ -17,7 +18,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter<EndpointIdInt>;
pub struct LeakyBucketRateLimiter<Key> {
map: DashMap<Key, LeakyBucketState, RandomState>,
config: LeakyBucketConfigInner,
config: utils::leaky_bucket::LeakyBucketConfig,
access_count: AtomicUsize,
}
@@ -46,9 +47,9 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let mut entry = self
.map
.entry(key)
.or_insert_with(|| LeakyBucketState::new(now));
.or_insert_with(|| LeakyBucketState::new(now - self.config.epoch));
entry.check(&self.config, now, n as f64)
entry.add_tokens(&self.config, now, n as f64).is_ok()
}
fn do_gc(&self, now: Instant) {
@@ -60,7 +61,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
let shard = thread_rng().gen_range(0..n);
self.map.shards()[shard]
.write()
.retain(|_, value| value.get().should_retain(now));
.retain(|_, value| !value.get().bucket_is_empty(&self.config, now));
}
}
@@ -77,75 +78,17 @@ impl LeakyBucketConfig {
}
}
struct LeakyBucketConfigInner {
/// "time cost" of a single request unit.
/// loosely represents how long it takes to handle a request unit in active CPU time.
time_cost: Duration,
bucket_width: Duration,
}
impl From<LeakyBucketConfig> for LeakyBucketConfigInner {
impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
fn from(config: LeakyBucketConfig) -> Self {
// seconds_per_request = 1/(request_per_second)
let spr = config.rps.recip();
Self {
let bucket_width = Duration::from_secs_f64(config.max * spr);
utils::leaky_bucket::LeakyBucketConfig {
time_cost: Duration::from_secs_f64(spr),
bucket_width: Duration::from_secs_f64(config.max * spr),
}
}
}
struct LeakyBucketState {
/// Bucket is represented by `start..end` where `start = end - config.bucket_width`.
///
/// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`.
/// If `now < start`, the bucket is considered filled and cannot accept any more tokens.
/// Draining the bucket will happen naturally as `now` moves forward.
///
/// Let `n` be some "time cost" for the request,
/// If now is after end, the bucket is empty and the end is reset to now,
/// If now is within the `bucket window + n`, we are within time budget.
/// If now is before the `bucket window + n`, we have run out of budget.
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
end: Instant,
}
impl LeakyBucketState {
fn new(now: Instant) -> Self {
Self { end: now }
}
fn should_retain(&self, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
now < self.end
}
fn check(&mut self, config: &LeakyBucketConfigInner, now: Instant, n: f64) -> bool {
let start = self.end - config.bucket_width;
let n = config.time_cost.mul_f64(n);
// start end
// | start+n | end+n
// | / | /
// ------{o-[---------o-}--]----o----
// now1 ^ now2 ^ ^ now3
//
// at now1, the bucket would be completely filled if we add n tokens.
// at now2, the bucket would be partially filled if we add n tokens.
// at now3, the bucket would start completely empty before we add n tokens.
if self.end + n <= now {
self.end = now + n;
true
} else if start + n <= now {
self.end += n;
true
} else {
false
bucket_width,
// start a bit early to avoid certain underflow issues
epoch: Instant::now() - 2 * bucket_width,
refill_rate: Duration::ZERO,
}
}
}
@@ -155,51 +98,56 @@ mod tests {
use std::time::Duration;
use tokio::time::Instant;
use utils::leaky_bucket::LeakyBucketState;
use super::{LeakyBucketConfig, LeakyBucketConfigInner, LeakyBucketState};
use super::LeakyBucketConfig;
#[tokio::test(start_paused = true)]
async fn check() {
let config: LeakyBucketConfigInner = LeakyBucketConfig::new(500.0, 2000.0).into();
let config: utils::leaky_bucket::LeakyBucketConfig =
LeakyBucketConfig::new(500.0, 2000.0).into();
assert_eq!(config.time_cost, Duration::from_millis(2));
assert_eq!(config.bucket_width, Duration::from_secs(4));
let mut bucket = LeakyBucketState::new(Instant::now());
let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch);
// should work for 2000 requests this second
for _ in 0..2000 {
assert!(bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
}
assert!(!bucket.check(&config, Instant::now(), 1.0));
assert_eq!(bucket.end - Instant::now(), config.bucket_width);
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
assert_eq!(
bucket.end - (Instant::now() - config.epoch),
config.bucket_width
);
// in 1ms we should drain 0.5 tokens.
// make sure we don't lose any tokens
tokio::time::advance(Duration::from_millis(1)).await;
assert!(!bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
tokio::time::advance(Duration::from_millis(1)).await;
assert!(bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
// in 10ms we should drain 5 tokens
tokio::time::advance(Duration::from_millis(10)).await;
for _ in 0..5 {
assert!(bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
}
assert!(!bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
// in 10s we should drain 5000 tokens
// but cap is only 2000
tokio::time::advance(Duration::from_secs(10)).await;
for _ in 0..2000 {
assert!(bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
}
assert!(!bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
// should sustain 500rps
for _ in 0..2000 {
tokio::time::advance(Duration::from_millis(10)).await;
for _ in 0..5 {
assert!(bucket.check(&config, Instant::now(), 1.0));
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
}
}
}