From 490f475fab3467a68de2434babc5f472a4aea2f9 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 30 Jul 2024 15:31:33 +0100 Subject: [PATCH] fix limitation of requesting more than max tokens --- libs/utils/src/leaky_bucket.rs | 58 ++++++++++++++++++++++---- pageserver/src/tenant/throttle.rs | 6 +-- proxy/src/rate_limiter/leaky_bucket.rs | 18 ++++---- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs index 96cacb25c5..418024c9dc 100644 --- a/libs/utils/src/leaky_bucket.rs +++ b/libs/utils/src/leaky_bucket.rs @@ -66,21 +66,40 @@ impl LeakyBucketState { config.prev_multiple_of_drain(now - config.epoch) <= self.end } - /// Immedaitely adds tokens to the bucket, if there is space. + /// Immediately adds tokens to the bucket, if there is space. + /// + /// In a scenario where you are waiting for available rate, + /// rather than just erroring immediately, `started` corresponds to when this waiting started. + /// + /// `n` is the number of tokens that will be filled in the bucket. + /// + /// # Errors + /// /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when /// there will be space again. pub fn add_tokens( &mut self, config: &LeakyBucketConfig, - now: Instant, + started: Instant, n: f64, ) -> Result<(), Instant> { + let now = Instant::now(); + // round down to the last time we would have drained the bucket. let now = config.prev_multiple_of_drain(now - config.epoch); + let started = config.prev_multiple_of_drain(started - config.epoch); + + // invariant: started <= now + debug_assert!(started <= now); + + // If the bucket was empty when we started our search, bump the end up accordingly. + let mut end = self.end; + if end < started { + end = started; + } let n = config.cost.mul_f64(n); - - let end_plus_n = self.end + n; + let end_plus_n = end + n; let start_plus_n = end_plus_n.saturating_sub(config.bucket_width); // start end @@ -93,10 +112,7 @@ impl LeakyBucketState { // at now2, the bucket would be partially filled if we add n tokens. // at now3, the bucket would start completely empty before we add n tokens. - if end_plus_n <= now { - self.end = now + n; - Ok(()) - } else if start_plus_n <= now { + if start_plus_n <= now { self.end = end_plus_n; Ok(()) } else { @@ -179,5 +195,31 @@ mod tests { state.add_tokens(&config, Instant::now(), 1.0).unwrap(); } } + + // supports requesting more tokens than can be stored in the bucket + // we just wait a little bit longer upfront. + { + // start the bucket completely empty + tokio::time::advance(Duration::from_secs(5)).await; + assert!(state.bucket_is_empty(&config, Instant::now())); + + // requesting 200 tokens of space should take 200*cost = 2s + // but we already have 1s available, so we wait 1s from start. + let start = Instant::now(); + + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_secs(1)); + + tokio::time::advance(Duration::from_millis(500)).await; + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + + tokio::time::advance(Duration::from_millis(500)).await; + state.add_tokens(&config, start, 200.0).unwrap(); + + // bucket should be completely full now + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } } } diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 220458757d..eae3fe5517 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -203,6 +203,8 @@ impl RateLimiter { async fn acquire(&self, count: usize) -> bool { let mut throttled = false; + let start = tokio::time::Instant::now(); + // wait until we are the first in the queue if let Some(queue) = &self.queue { let mut notified = std::pin::pin!(queue.notified()); @@ -220,13 +222,11 @@ impl RateLimiter { }; loop { - let now = tokio::time::Instant::now(); - let res = self .state .lock() .unwrap() - .add_tokens(&self.config, now, count as f64); + .add_tokens(&self.config, start, count as f64); match res { Ok(()) => return throttled, Err(ready_at) => { diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 3ff4c2aced..38b9145f89 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -112,9 +112,9 @@ mod tests { // should work for 2000 requests this second for _ in 0..2000 { - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); assert_eq!( bucket.end - (Instant::now() - config.epoch), config.bucket_width @@ -123,30 +123,30 @@ mod tests { // in 1ms we should drain 0.5 tokens. // make sure we don't lose any tokens tokio::time::advance(Duration::from_millis(1)).await; - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); tokio::time::advance(Duration::from_millis(1)).await; - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); // in 10ms we should drain 5 tokens tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // in 10s we should drain 5000 tokens // but cap is only 2000 tokio::time::advance(Duration::from_secs(10)).await; for _ in 0..2000 { - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // should sustain 500rps for _ in 0..2000 { tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } } }