From d73475cc8de2fa5f5cf3ea3367ca8eb63d6ee014 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 29 Jul 2024 14:37:19 +0100 Subject: [PATCH] add more tests --- libs/utils/src/leaky_bucket.rs | 111 +++++++++++++++++++++---- pageserver/src/tenant/throttle.rs | 9 +- proxy/src/rate_limiter/leaky_bucket.rs | 3 +- 3 files changed, 97 insertions(+), 26 deletions(-) diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs index 7e175a21c4..0728e18194 100644 --- a/libs/utils/src/leaky_bucket.rs +++ b/libs/utils/src/leaky_bucket.rs @@ -36,17 +36,6 @@ impl LeakyBucketConfig { } } -// impl From for LeakyBucketConfig { -// fn from(config: LeakyBucketConfig) -> Self { -// // seconds_per_request = 1/(request_per_second) -// let spr = config.rps.recip(); -// Self { -// time_cost: Duration::from_secs_f64(spr), -// bucket_width: Duration::from_secs_f64(config.max * spr), -// } -// } -// } - pub struct LeakyBucketState { /// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`. /// @@ -72,7 +61,7 @@ impl LeakyBucketState { pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool { // if self.end is after now, the bucket is not empty - config.quantize_instant(now) < self.end + config.quantize_instant(now) <= self.end } /// Immedaitely adds tokens to the bucket, if there is space. @@ -86,10 +75,11 @@ impl LeakyBucketState { ) -> Result<(), Instant> { let now = config.quantize_instant(now); - let start = self.end - config.bucket_width; - let n = config.time_cost.mul_f64(n); + let end_plus_n = self.end + n; + let start_plus_n = end_plus_n.saturating_sub(config.bucket_width); + // start end // | start+n | end+n // | / | / @@ -100,14 +90,99 @@ impl LeakyBucketState { // at now2, the bucket would be partially filled if we add n tokens. // at now3, the bucket would start completely empty before we add n tokens. - if self.end + n <= now { + if end_plus_n <= now { self.end = now + n; Ok(()) - } else if start + n <= now { - self.end += n; + } else if start_plus_n <= now { + self.end = end_plus_n; Ok(()) } else { - Err(config.epoch + start + n) + let mut ready_at = start_plus_n; + + // we need the ready_at.next_multiple_of(config.refill_rate) + if config.refill_rate > Duration::ZERO { + ready_at = config + .refill_rate + .mul_f64(ready_at.div_duration_f64(config.refill_rate).ceil()); + } + + Err(config.epoch + ready_at) + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::Instant; + + use super::{LeakyBucketConfig, LeakyBucketState}; + + #[tokio::test(start_paused = true)] + async fn check() { + let config = LeakyBucketConfig { + epoch: Instant::now(), + // refill every 0.5 seconds. + refill_rate: Duration::from_millis(500), + // average 100rps + time_cost: Duration::from_millis(10), + // burst up to 100 requests + bucket_width: Duration::from_millis(1000), + }; + + let mut state = LeakyBucketState::new(Instant::now() - config.epoch); + + // supports burst + { + // should work for 100 requests this instant + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // quantized refill + { + // after 499ms we should not drain any tokens. + tokio::time::advance(Duration::from_millis(499)).await; + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(1)); + + // after 500ms we should have drained 50 tokens. + tokio::time::advance(Duration::from_millis(1)).await; + for _ in 0..50 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // doesn't overfill + { + // after 1s we should have an empty bucket again. + tokio::time::advance(Duration::from_secs(1)).await; + assert!(state.bucket_is_empty(&config, Instant::now())); + + // after 1s more, we should not over count the tokens and allow more than 200 requests. + tokio::time::advance(Duration::from_secs(1)).await; + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // supports sustained rate over a long period + { + tokio::time::advance(Duration::from_secs(1)).await; + + // should sustain 100rps + for _ in 0..2000 { + tokio::time::advance(Duration::from_millis(10)).await; + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } } } } diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index d7fb54a210..33ac602320 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -95,7 +95,7 @@ where }) .collect(); - // + // how frequently we drain a single token on average let time_cost = refill_interval / refill_amount.get() as u32; let bucket_width = time_cost * (max as u32); @@ -104,17 +104,14 @@ where let initial_tokens = (max - initial) as u32; let end = time_cost * initial_tokens; - // start a bit early to avoid certain underflow issues - let epoch_offset = 2 * bucket_width + refill_interval; - let rate_limiter = RateLimiter { config: LeakyBucketConfig { - epoch: tokio::time::Instant::now() - epoch_offset, + epoch: tokio::time::Instant::now(), refill_rate: refill_interval, time_cost, bucket_width, }, - state: Mutex::new(LeakyBucketState::new(end + epoch_offset)), + state: Mutex::new(LeakyBucketState::new(end)), queue: fair.then(Notify::new), }; diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 49ab3c967b..63116dd99f 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -84,10 +84,9 @@ impl From for utils::leaky_bucket::LeakyBucketConfig { let spr = config.rps.recip(); let bucket_width = Duration::from_secs_f64(config.max * spr); utils::leaky_bucket::LeakyBucketConfig { + epoch: Instant::now(), time_cost: Duration::from_secs_f64(spr), bucket_width, - // start a bit early to avoid certain underflow issues - epoch: Instant::now() - 2 * bucket_width, refill_rate: Duration::ZERO, } }