add more tests

This commit is contained in:
Conrad Ludgate
2024-07-29 14:37:19 +01:00
parent 28b85ca711
commit d73475cc8d
3 changed files with 97 additions and 26 deletions

View File

@@ -36,17 +36,6 @@ impl LeakyBucketConfig {
}
}
// impl From<LeakyBucketConfig> for LeakyBucketConfig {
// fn from(config: LeakyBucketConfig) -> Self {
// // seconds_per_request = 1/(request_per_second)
// let spr = config.rps.recip();
// Self {
// time_cost: Duration::from_secs_f64(spr),
// bucket_width: Duration::from_secs_f64(config.max * spr),
// }
// }
// }
pub struct LeakyBucketState {
/// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`.
///
@@ -72,7 +61,7 @@ impl LeakyBucketState {
pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
config.quantize_instant(now) < self.end
config.quantize_instant(now) <= self.end
}
/// Immedaitely adds tokens to the bucket, if there is space.
@@ -86,10 +75,11 @@ impl LeakyBucketState {
) -> Result<(), Instant> {
let now = config.quantize_instant(now);
let start = self.end - config.bucket_width;
let n = config.time_cost.mul_f64(n);
let end_plus_n = self.end + n;
let start_plus_n = end_plus_n.saturating_sub(config.bucket_width);
// start end
// | start+n | end+n
// | / | /
@@ -100,14 +90,99 @@ impl LeakyBucketState {
// at now2, the bucket would be partially filled if we add n tokens.
// at now3, the bucket would start completely empty before we add n tokens.
if self.end + n <= now {
if end_plus_n <= now {
self.end = now + n;
Ok(())
} else if start + n <= now {
self.end += n;
} else if start_plus_n <= now {
self.end = end_plus_n;
Ok(())
} else {
Err(config.epoch + start + n)
let mut ready_at = start_plus_n;
// we need the ready_at.next_multiple_of(config.refill_rate)
if config.refill_rate > Duration::ZERO {
ready_at = config
.refill_rate
.mul_f64(ready_at.div_duration_f64(config.refill_rate).ceil());
}
Err(config.epoch + ready_at)
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::Instant;
use super::{LeakyBucketConfig, LeakyBucketState};
#[tokio::test(start_paused = true)]
async fn check() {
let config = LeakyBucketConfig {
epoch: Instant::now(),
// refill every 0.5 seconds.
refill_rate: Duration::from_millis(500),
// average 100rps
time_cost: Duration::from_millis(10),
// burst up to 100 requests
bucket_width: Duration::from_millis(1000),
};
let mut state = LeakyBucketState::new(Instant::now() - config.epoch);
// supports burst
{
// should work for 100 requests this instant
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
}
// quantized refill
{
// after 499ms we should not drain any tokens.
tokio::time::advance(Duration::from_millis(499)).await;
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(1));
// after 500ms we should have drained 50 tokens.
tokio::time::advance(Duration::from_millis(1)).await;
for _ in 0..50 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
}
// doesn't overfill
{
// after 1s we should have an empty bucket again.
tokio::time::advance(Duration::from_secs(1)).await;
assert!(state.bucket_is_empty(&config, Instant::now()));
// after 1s more, we should not over count the tokens and allow more than 200 requests.
tokio::time::advance(Duration::from_secs(1)).await;
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
}
// supports sustained rate over a long period
{
tokio::time::advance(Duration::from_secs(1)).await;
// should sustain 100rps
for _ in 0..2000 {
tokio::time::advance(Duration::from_millis(10)).await;
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
}
}
}

View File

@@ -95,7 +95,7 @@ where
})
.collect();
//
// how frequently we drain a single token on average
let time_cost = refill_interval / refill_amount.get() as u32;
let bucket_width = time_cost * (max as u32);
@@ -104,17 +104,14 @@ where
let initial_tokens = (max - initial) as u32;
let end = time_cost * initial_tokens;
// start a bit early to avoid certain underflow issues
let epoch_offset = 2 * bucket_width + refill_interval;
let rate_limiter = RateLimiter {
config: LeakyBucketConfig {
epoch: tokio::time::Instant::now() - epoch_offset,
epoch: tokio::time::Instant::now(),
refill_rate: refill_interval,
time_cost,
bucket_width,
},
state: Mutex::new(LeakyBucketState::new(end + epoch_offset)),
state: Mutex::new(LeakyBucketState::new(end)),
queue: fair.then(Notify::new),
};

View File

@@ -84,10 +84,9 @@ impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
let spr = config.rps.recip();
let bucket_width = Duration::from_secs_f64(config.max * spr);
utils::leaky_bucket::LeakyBucketConfig {
epoch: Instant::now(),
time_cost: Duration::from_secs_f64(spr),
bucket_width,
// start a bit early to avoid certain underflow issues
epoch: Instant::now() - 2 * bucket_width,
refill_rate: Duration::ZERO,
}
}