This commit is contained in:
Conrad Ludgate
2024-07-29 14:50:47 +01:00
parent d73475cc8d
commit 300a43db5c
3 changed files with 30 additions and 35 deletions

View File

@@ -8,31 +8,33 @@ pub struct LeakyBucketConfig {
pub epoch: Instant,
/// How frequently we drain the bucket.
/// If equal to 0, we drain constantly over time.
/// If equal to 0, we drain continuously over time.
/// If greater than 0, we drain at fixed intervals.
pub refill_rate: Duration,
pub drain_interval: Duration,
/// "time cost" of a single request unit.
/// loosely represents how long it takes to handle a request unit in active CPU time.
pub time_cost: Duration,
/// should loosely represents how long it takes to handle a request unit in active resource time.
pub cost: Duration,
/// total size of the bucket
pub bucket_width: Duration,
}
impl LeakyBucketConfig {
pub fn quantize_instant(&self, now: Instant) -> Duration {
let mut now = now - self.epoch;
if self.refill_rate > Duration::ZERO {
// we only "add" new tokens on a fixed interval.
// truncate to the most recent multiple of self.interval.
now = self
.refill_rate
.mul_f64(now.div_duration_f64(self.refill_rate).trunc());
fn prev_multiple_of_drain(&self, mut dur: Duration) -> Duration {
if self.drain_interval > Duration::ZERO {
let n = dur.div_duration_f64(self.drain_interval).floor();
dur = self.drain_interval.mul_f64(n);
}
dur
}
now
fn next_multiple_of_drain(&self, mut dur: Duration) -> Duration {
if self.drain_interval > Duration::ZERO {
let n = dur.div_duration_f64(self.drain_interval).ceil();
dur = self.drain_interval.mul_f64(n);
}
dur
}
}
@@ -61,7 +63,7 @@ impl LeakyBucketState {
pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
config.quantize_instant(now) <= self.end
config.prev_multiple_of_drain(now - config.epoch) <= self.end
}
/// Immedaitely adds tokens to the bucket, if there is space.
@@ -73,9 +75,10 @@ impl LeakyBucketState {
now: Instant,
n: f64,
) -> Result<(), Instant> {
let now = config.quantize_instant(now);
// round down to the last time we would have drained the bucket.
let now = config.prev_multiple_of_drain(now - config.epoch);
let n = config.time_cost.mul_f64(n);
let n = config.cost.mul_f64(n);
let end_plus_n = self.end + n;
let start_plus_n = end_plus_n.saturating_sub(config.bucket_width);
@@ -97,15 +100,7 @@ impl LeakyBucketState {
self.end = end_plus_n;
Ok(())
} else {
let mut ready_at = start_plus_n;
// we need the ready_at.next_multiple_of(config.refill_rate)
if config.refill_rate > Duration::ZERO {
ready_at = config
.refill_rate
.mul_f64(ready_at.div_duration_f64(config.refill_rate).ceil());
}
let ready_at = config.next_multiple_of_drain(start_plus_n);
Err(config.epoch + ready_at)
}
}
@@ -123,10 +118,10 @@ mod tests {
async fn check() {
let config = LeakyBucketConfig {
epoch: Instant::now(),
// refill every 0.5 seconds.
refill_rate: Duration::from_millis(500),
// drain the bucket every 0.5 seconds.
drain_interval: Duration::from_millis(500),
// average 100rps
time_cost: Duration::from_millis(10),
cost: Duration::from_millis(10),
// burst up to 100 requests
bucket_width: Duration::from_millis(1000),
};

View File

@@ -107,8 +107,8 @@ where
let rate_limiter = RateLimiter {
config: LeakyBucketConfig {
epoch: tokio::time::Instant::now(),
refill_rate: refill_interval,
time_cost,
drain_interval: refill_interval,
cost: time_cost,
bucket_width,
},
state: Mutex::new(LeakyBucketState::new(end)),
@@ -192,7 +192,7 @@ struct RateLimiter {
impl RateLimiter {
fn steady_rps(&self) -> f64 {
self.config.time_cost.as_secs_f64().recip()
self.config.cost.as_secs_f64().recip()
}
/// returns true if we did throttle

View File

@@ -85,9 +85,9 @@ impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
let bucket_width = Duration::from_secs_f64(config.max * spr);
utils::leaky_bucket::LeakyBucketConfig {
epoch: Instant::now(),
time_cost: Duration::from_secs_f64(spr),
cost: Duration::from_secs_f64(spr),
bucket_width,
refill_rate: Duration::ZERO,
drain_interval: Duration::ZERO,
}
}
}
@@ -105,7 +105,7 @@ mod tests {
async fn check() {
let config: utils::leaky_bucket::LeakyBucketConfig =
LeakyBucketConfig::new(500.0, 2000.0).into();
assert_eq!(config.time_cost, Duration::from_millis(2));
assert_eq!(config.cost, Duration::from_millis(2));
assert_eq!(config.bucket_width, Duration::from_secs(4));
let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch);