From 54f767e93f26bbcd45689f319c3201d134f606dc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 Aug 2024 12:24:14 +0000 Subject: [PATCH] hacky squash-merge of conrad's leaky_bucket impl; https://github.com/neondatabase/neon/pull/8539/files#diff-c39dcc3c2c783a8600f9e0603124693444fb6f0fe99e3d057474fd94e264f69c commit d6d9ad2a573da5b1d38f466e1bb12b4483d9b732 Author: Conrad Ludgate Date: Tue Jul 30 22:01:48 2024 +0100 fix fix test commit 490f475fab3467a68de2434babc5f472a4aea2f9 Author: Conrad Ludgate Date: Tue Jul 30 15:31:33 2024 +0100 fix limitation of requesting more than max tokens commit 2dd85f5a84a3534690f9d2512f910c9324ae7178 Author: Conrad Ludgate Date: Tue Jul 30 10:46:27 2024 +0100 Update libs/utils/src/leaky_bucket.rs Co-authored-by: Joonas Koivunen commit c766021d5fae623f66698cfd67420f978ba682ed Author: Conrad Ludgate Date: Mon Jul 29 16:12:44 2024 +0100 fix fair queue commit 300a43db5c3ecd156a1196654bae6a700d1c0525 Author: Conrad Ludgate Date: Mon Jul 29 14:50:47 2024 +0100 rename commit d73475cc8de2fa5f5cf3ea3367ca8eb63d6ee014 Author: Conrad Ludgate Date: Mon Jul 29 14:37:19 2024 +0100 add more tests commit 28b85ca7110d90a700240663d7567c588fd7a0ca Author: Conrad Ludgate Date: Mon Jul 29 14:08:37 2024 +0100 share impl between proxy and ps commit 40d239560f72d99573a55f0f0729f06d3f95a1db Author: Conrad Ludgate Date: Mon Jul 29 13:03:17 2024 +0100 add quantization commit 60c3e1347f915fb42e5231d0637d457a3fc9a2d8 Author: Conrad Ludgate Date: Mon Jul 29 13:03:03 2024 +0100 replace leaky-bucket crate with gcra impl commit ef7e96fb4e4f7739db202a587b20725d705cba07 Author: Conrad Ludgate Date: Mon Jul 29 11:41:44 2024 +0100 tweak comments commit 54c5196f75bbcd4502aa6e686d2721d41d7d5c98 Author: Conrad Ludgate Date: Sun Jul 28 23:00:21 2024 +0100 proxy: improve performance of leaky-bucket --- Cargo.lock | 13 -- Cargo.toml | 1 - libs/utils/Cargo.toml | 1 - libs/utils/src/leaky_bucket.rs | 225 +++++++++++++++++++++++++ libs/utils/src/lib.rs | 1 + pageserver/Cargo.toml | 1 - pageserver/src/tenant/throttle.rs | 113 ++++++++++--- proxy/src/rate_limiter.rs | 10 +- proxy/src/rate_limiter/leaky_bucket.rs | 101 +++++------ 9 files changed, 359 insertions(+), 107 deletions(-) create mode 100644 libs/utils/src/leaky_bucket.rs diff --git a/Cargo.lock b/Cargo.lock index 441ca1ff86..248b691966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2944,17 +2944,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "leaky-bucket" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853" -dependencies = [ - "parking_lot 0.12.1", - "tokio", - "tracing", -] - [[package]] name = "libc" version = "0.2.150" @@ -3707,7 +3696,6 @@ dependencies = [ "humantime-serde", "hyper 0.14.26", "itertools 0.10.5", - "leaky-bucket", "md5", "metrics", "nix 0.27.1", @@ -6969,7 +6957,6 @@ dependencies = [ "humantime", "hyper 0.14.26", "jsonwebtoken", - "leaky-bucket", "metrics", "nix 0.27.1", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index e038c0b4ff..56951c9942 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,6 @@ ipnet = "2.9.0" itertools = "0.10" jsonwebtoken = "9" lasso = "0.7" -leaky-bucket = "1.0.1" libc = "0.2" md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 6e593eeac1..777fb95ece 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -26,7 +26,6 @@ hyper = { workspace = true, features = ["full"] } fail.workspace = true futures = { workspace = true} jsonwebtoken.workspace = true -leaky-bucket.workspace = true nix.workspace = true once_cell.workspace = true pin-project-lite.workspace = true diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs new file mode 100644 index 0000000000..509190a449 --- /dev/null +++ b/libs/utils/src/leaky_bucket.rs @@ -0,0 +1,225 @@ +use std::time::Duration; + +use tokio::time::Instant; + +pub struct LeakyBucketConfig { + /// Leaky buckets can drain at a fixed interval rate. + /// We track all times as durations since this epoch so we can round down. + pub epoch: Instant, + + /// How frequently we drain the bucket. + /// If equal to 0, we drain continuously over time. + /// If greater than 0, we drain at fixed intervals. + pub drain_interval: Duration, + + /// "time cost" of a single request unit. + /// should loosely represent how long it takes to handle a request unit in active resource time. + pub cost: Duration, + + /// total size of the bucket + pub bucket_width: Duration, +} + +impl LeakyBucketConfig { + fn prev_multiple_of_drain(&self, mut dur: Duration) -> Duration { + if self.drain_interval > Duration::ZERO { + let n = dur.div_duration_f64(self.drain_interval).floor(); + dur = self.drain_interval.mul_f64(n); + } + dur + } + + fn next_multiple_of_drain(&self, mut dur: Duration) -> Duration { + if self.drain_interval > Duration::ZERO { + let n = dur.div_duration_f64(self.drain_interval).ceil(); + dur = self.drain_interval.mul_f64(n); + } + dur + } +} + +pub struct LeakyBucketState { + /// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`. + /// + /// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost". + /// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`. + /// If `now < start`, the bucket is considered filled and cannot accept any more tokens. + /// Draining the bucket will happen naturally as `now` moves forward. + /// + /// Let `n` be some "time cost" for the request, + /// If now is after end, the bucket is empty and the end is reset to now, + /// If now is within the `bucket window + n`, we are within time budget. + /// If now is before the `bucket window + n`, we have run out of budget. + /// + /// This is inspired by the generic cell rate algorithm (GCRA) and works + /// exactly the same as a leaky-bucket. + pub end: Duration, +} + +impl LeakyBucketState { + pub fn new(now: Duration) -> Self { + Self { end: now } + } + + pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool { + // if self.end is after now, the bucket is not empty + self.end <= config.prev_multiple_of_drain(now - config.epoch) + } + + /// Immediately adds tokens to the bucket, if there is space. + /// + /// In a scenario where you are waiting for available rate, + /// rather than just erroring immediately, `started` corresponds to when this waiting started. + /// + /// `n` is the number of tokens that will be filled in the bucket. + /// + /// # Errors + /// + /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when + /// there will be space again. + pub fn add_tokens( + &mut self, + config: &LeakyBucketConfig, + started: Instant, + n: f64, + ) -> Result<(), Instant> { + let now = Instant::now(); + + // round down to the last time we would have drained the bucket. + let now = config.prev_multiple_of_drain(now - config.epoch); + let started = config.prev_multiple_of_drain(started - config.epoch); + + // invariant: started <= now + debug_assert!(started <= now); + + // If the bucket was empty when we started our search, bump the end up accordingly. + let mut end = self.end; + if end < started { + end = started; + } + + let n = config.cost.mul_f64(n); + let end_plus_n = end + n; + let start_plus_n = end_plus_n.saturating_sub(config.bucket_width); + + // start end + // | start+n | end+n + // | / | / + // ------{o-[---------o-}--]----o---- + // now1 ^ now2 ^ ^ now3 + // + // at now1, the bucket would be completely filled if we add n tokens. + // at now2, the bucket would be partially filled if we add n tokens. + // at now3, the bucket would start completely empty before we add n tokens. + + if start_plus_n <= now { + self.end = end_plus_n; + Ok(()) + } else { + let ready_at = config.next_multiple_of_drain(start_plus_n); + Err(config.epoch + ready_at) + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::Instant; + + use super::{LeakyBucketConfig, LeakyBucketState}; + + #[tokio::test(start_paused = true)] + async fn check() { + let config = LeakyBucketConfig { + epoch: Instant::now(), + // drain the bucket every 0.5 seconds. + drain_interval: Duration::from_millis(500), + // average 100rps + cost: Duration::from_millis(10), + // burst up to 100 requests + bucket_width: Duration::from_millis(1000), + }; + + let mut state = LeakyBucketState::new(Instant::now() - config.epoch); + + // supports burst + { + // should work for 100 requests this instant + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // quantized refill + { + // after 499ms we should not drain any tokens. + tokio::time::advance(Duration::from_millis(499)).await; + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(1)); + + // after 500ms we should have drained 50 tokens. + tokio::time::advance(Duration::from_millis(1)).await; + for _ in 0..50 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // doesn't overfill + { + // after 1s we should have an empty bucket again. + tokio::time::advance(Duration::from_secs(1)).await; + assert!(state.bucket_is_empty(&config, Instant::now())); + + // after 1s more, we should not over count the tokens and allow more than 200 requests. + tokio::time::advance(Duration::from_secs(1)).await; + for _ in 0..100 { + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + + // supports sustained rate over a long period + { + tokio::time::advance(Duration::from_secs(1)).await; + + // should sustain 100rps + for _ in 0..2000 { + tokio::time::advance(Duration::from_millis(10)).await; + state.add_tokens(&config, Instant::now(), 1.0).unwrap(); + } + } + + // supports requesting more tokens than can be stored in the bucket + // we just wait a little bit longer upfront. + { + // start the bucket completely empty + tokio::time::advance(Duration::from_secs(5)).await; + assert!(state.bucket_is_empty(&config, Instant::now())); + + // requesting 200 tokens of space should take 200*cost = 2s + // but we already have 1s available, so we wait 1s from start. + let start = Instant::now(); + + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_secs(1)); + + tokio::time::advance(Duration::from_millis(500)).await; + let ready = state.add_tokens(&config, start, 200.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + + tokio::time::advance(Duration::from_millis(500)).await; + state.add_tokens(&config, start, 200.0).unwrap(); + + // bucket should be completely full now + let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!(ready - Instant::now(), Duration::from_millis(500)); + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index f4fc0ba57b..218dd468b1 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -71,6 +71,7 @@ pub mod postgres_client; pub mod tracing_span_assert; +pub mod leaky_bucket; pub mod rate_limit; /// Simple once-barrier and a guard which keeps barrier awaiting. diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0e748ee3db..7276a14c41 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -36,7 +36,6 @@ humantime.workspace = true humantime-serde.workspace = true hyper.workspace = true itertools.workspace = true -leaky-bucket.workspace = true md5.workspace = true nix.workspace = true # hack to get the number of worker threads tokio uses diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index f3f3d5e3ae..eae3fe5517 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -9,7 +9,9 @@ use std::{ use arc_swap::ArcSwap; use enumset::EnumSet; +use tokio::sync::Notify; use tracing::{error, warn}; +use utils::leaky_bucket::{LeakyBucketConfig, LeakyBucketState}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -33,8 +35,7 @@ pub struct Throttle { pub struct Inner { task_kinds: EnumSet, - rate_limiter: Arc, - config: Config, + rate_limiter: Arc, } pub type Config = pageserver_api::models::ThrottleConfig; @@ -78,7 +79,7 @@ where refill_amount, max, fair, - } = &config; + } = config; let task_kinds: EnumSet = task_kinds .iter() .filter_map(|s| match TaskKind::from_str(s) { @@ -93,18 +94,34 @@ where } }) .collect(); + + // how frequently we drain a single token on average + let time_cost = refill_interval / refill_amount.get() as u32; + let bucket_width = time_cost * (max as u32); + + // initial tracks how many tokens are available to put in the bucket + // we want how many tokens are currently in the bucket + let initial_tokens = (max - initial) as u32; + let end = time_cost * initial_tokens; + + let rate_limiter = RateLimiter { + config: LeakyBucketConfig { + epoch: tokio::time::Instant::now(), + drain_interval: refill_interval, + cost: time_cost, + bucket_width, + }, + state: Mutex::new(LeakyBucketState::new(end)), + queue: fair.then(|| { + let queue = Notify::new(); + queue.notify_one(); + queue + }), + }; + Inner { task_kinds, - rate_limiter: Arc::new( - leaky_bucket::RateLimiter::builder() - .initial(*initial) - .interval(*refill_interval) - .refill(refill_amount.get()) - .max(*max) - .fair(*fair) - .build(), - ), - config, + rate_limiter: Arc::new(rate_limiter), } } pub fn reconfigure(&self, config: Config) { @@ -127,7 +144,7 @@ where /// See [`Config::steady_rps`]. pub fn steady_rps(&self) -> f64 { - self.inner.load().config.steady_rps() + self.inner.load().rate_limiter.steady_rps() } pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option { @@ -136,18 +153,9 @@ where return None; }; let start = std::time::Instant::now(); - let mut did_throttle = false; - let acquire = inner.rate_limiter.acquire(key_count); - // turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate - let acquire = tokio::task::unconstrained(acquire); - let mut acquire = std::pin::pin!(acquire); - std::future::poll_fn(|cx| { - use std::future::Future; - let poll = acquire.as_mut().poll(cx); - did_throttle = did_throttle || poll.is_pending(); - poll - }) - .await; + + let did_throttle = inner.rate_limiter.acquire(key_count).await; + self.count_accounted.fetch_add(1, Ordering::Relaxed); if did_throttle { self.count_throttled.fetch_add(1, Ordering::Relaxed); @@ -176,3 +184,56 @@ where } } } + +struct RateLimiter { + config: LeakyBucketConfig, + state: Mutex, + + /// if this rate limiter is fair, + /// provide a queue to provide this fair ordering. + queue: Option, +} + +impl RateLimiter { + fn steady_rps(&self) -> f64 { + self.config.cost.as_secs_f64().recip() + } + + /// returns true if we did throttle + async fn acquire(&self, count: usize) -> bool { + let mut throttled = false; + + let start = tokio::time::Instant::now(); + + // wait until we are the first in the queue + if let Some(queue) = &self.queue { + let mut notified = std::pin::pin!(queue.notified()); + if !notified.as_mut().enable() { + throttled = true; + notified.await; + } + } + + // notify the next waiter in the queue when we are done. + scopeguard::defer! { + if let Some(queue) = &self.queue { + queue.notify_one(); + } + }; + + loop { + let res = self + .state + .lock() + .unwrap() + .add_tokens(&self.config, start, count as f64); + match res { + Ok(()) => return throttled, + Err(ready_at) => { + throttled = true; + tokio::time::sleep_until(ready_at).await; + } + } + } + } +} diff --git a/proxy/src/rate_limiter.rs b/proxy/src/rate_limiter.rs index e5f5867998..9f0e6eacf8 100644 --- a/proxy/src/rate_limiter.rs +++ b/proxy/src/rate_limiter.rs @@ -8,9 +8,9 @@ pub(crate) use limit_algorithm::aimd::Aimd; pub(crate) use limit_algorithm::{ DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token, }; -pub(crate) use limiter::GlobalRateLimiter; - -pub use leaky_bucket::{ - EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter, LeakyBucketState, +pub use limiter::{ + BucketRateLimiter, GlobalRateLimiter, RateBucketInfo, + WakeComputeRateLimiter, }; -pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter}; + +use leaky_bucket::EndpointRateLimiter; diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index fa8cb75256..f2acf5cb19 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -1,6 +1,7 @@ use std::{ hash::Hash, sync::atomic::{AtomicUsize, Ordering}, + time::Duration, }; use ahash::RandomState; @@ -8,6 +9,7 @@ use dashmap::DashMap; use rand::{thread_rng, Rng}; use tokio::time::Instant; use tracing::info; +use utils::leaky_bucket::LeakyBucketState; use crate::intern::EndpointIdInt; @@ -16,7 +18,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter; pub struct LeakyBucketRateLimiter { map: DashMap, - config: LeakyBucketConfig, + config: utils::leaky_bucket::LeakyBucketConfig, access_count: AtomicUsize, } @@ -29,7 +31,7 @@ impl LeakyBucketRateLimiter { pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self { Self { map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards), - config, + config: config.into(), access_count: AtomicUsize::new(0), } } @@ -42,12 +44,12 @@ impl LeakyBucketRateLimiter { self.do_gc(now); } - let mut entry = self.map.entry(key).or_insert_with(|| LeakyBucketState { - time: now, - filled: 0.0, - }); + let mut entry = self + .map + .entry(key) + .or_insert_with(|| LeakyBucketState::new(now - self.config.epoch)); - entry.check(&self.config, now, n as f64) + entry.add_tokens(&self.config, now, n as f64).is_ok() } fn do_gc(&self, now: Instant) { @@ -59,7 +61,7 @@ impl LeakyBucketRateLimiter { let shard = thread_rng().gen_range(0..n); self.map.shards()[shard] .write() - .retain(|_, value| !value.get_mut().update(&self.config, now)); + .retain(|_, value| !value.get().bucket_is_empty(&self.config, now)); } } @@ -68,12 +70,6 @@ pub struct LeakyBucketConfig { pub max: f64, } -pub struct LeakyBucketState { - filled: f64, - time: Instant, -} - -#[cfg(test)] impl LeakyBucketConfig { pub(crate) fn new(rps: f64, max: f64) -> Self { assert!(rps > 0.0, "rps must be positive"); @@ -82,41 +78,18 @@ impl LeakyBucketConfig { } } -impl LeakyBucketState { - pub(crate) fn new() -> Self { - Self { - filled: 0.0, - time: Instant::now(), +impl From for utils::leaky_bucket::LeakyBucketConfig { + fn from(config: LeakyBucketConfig) -> Self { + // seconds_per_request = 1/(request_per_second) + let spr = config.rps.recip(); + let bucket_width = Duration::from_secs_f64(config.max * spr); + utils::leaky_bucket::LeakyBucketConfig { + epoch: Instant::now(), + cost: Duration::from_secs_f64(spr), + bucket_width, + drain_interval: Duration::ZERO, } } - - /// updates the timer and returns true if the bucket is empty - fn update(&mut self, info: &LeakyBucketConfig, now: Instant) -> bool { - let drain = now.duration_since(self.time); - let drain = drain.as_secs_f64() * info.rps; - - self.filled = (self.filled - drain).clamp(0.0, info.max); - self.time = now; - - self.filled == 0.0 - } - - pub(crate) fn check(&mut self, info: &LeakyBucketConfig, now: Instant, n: f64) -> bool { - self.update(info, now); - - if self.filled + n > info.max { - return false; - } - self.filled += n; - - true - } -} - -impl Default for LeakyBucketState { - fn default() -> Self { - Self::new() - } } #[cfg(test)] @@ -125,48 +98,56 @@ mod tests { use std::time::Duration; use tokio::time::Instant; + use utils::leaky_bucket::LeakyBucketState; - use super::{LeakyBucketConfig, LeakyBucketState}; + use super::LeakyBucketConfig; #[tokio::test(start_paused = true)] async fn check() { - let info = LeakyBucketConfig::new(500.0, 2000.0); - let mut bucket = LeakyBucketState::new(); + let config: utils::leaky_bucket::LeakyBucketConfig = + LeakyBucketConfig::new(500.0, 2000.0).into(); + assert_eq!(config.cost, Duration::from_millis(2)); + assert_eq!(config.bucket_width, Duration::from_secs(4)); + + let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch); // should work for 2000 requests this second for _ in 0..2000 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); - assert_eq!(bucket.filled, 2000.0); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); + assert_eq!( + bucket.end - (Instant::now() - config.epoch), + config.bucket_width + ); // in 1ms we should drain 0.5 tokens. // make sure we don't lose any tokens tokio::time::advance(Duration::from_millis(1)).await; - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); tokio::time::advance(Duration::from_millis(1)).await; - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); // in 10ms we should drain 5 tokens tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // in 10s we should drain 5000 tokens // but cap is only 2000 tokio::time::advance(Duration::from_secs(10)).await; for _ in 0..2000 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } - assert!(!bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); // should sustain 500rps for _ in 0..2000 { tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&info, Instant::now(), 1.0)); + bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } } }