mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
hacky squash-merge of conrad's leaky_bucket impl; https://github.com/neondatabase/neon/pull/8539/files#diff-c39dcc3c2c783a8600f9e0603124693444fb6f0fe99e3d057474fd94e264f69c
commitd6d9ad2a57Author: Conrad Ludgate <conradludgate@gmail.com> Date: Tue Jul 30 22:01:48 2024 +0100 fix fix test commit490f475fabAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Tue Jul 30 15:31:33 2024 +0100 fix limitation of requesting more than max tokens commit2dd85f5a84Author: Conrad Ludgate <conrad@neon.tech> Date: Tue Jul 30 10:46:27 2024 +0100 Update libs/utils/src/leaky_bucket.rs Co-authored-by: Joonas Koivunen <joonas@neon.tech> commitc766021d5fAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 16:12:44 2024 +0100 fix fair queue commit300a43db5cAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 14:50:47 2024 +0100 rename commitd73475cc8dAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 14:37:19 2024 +0100 add more tests commit28b85ca711Author: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 14:08:37 2024 +0100 share impl between proxy and ps commit40d239560fAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 13:03:17 2024 +0100 add quantization commit60c3e1347fAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 13:03:03 2024 +0100 replace leaky-bucket crate with gcra impl commitef7e96fb4eAuthor: Conrad Ludgate <conradludgate@gmail.com> Date: Mon Jul 29 11:41:44 2024 +0100 tweak comments commit54c5196f75Author: Conrad Ludgate <conradludgate@gmail.com> Date: Sun Jul 28 23:00:21 2024 +0100 proxy: improve performance of leaky-bucket
This commit is contained in:
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -2944,17 +2944,6 @@ version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "leaky-bucket"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.150"
|
||||
@@ -3707,7 +3696,6 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"itertools 0.10.5",
|
||||
"leaky-bucket",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
@@ -6969,7 +6957,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.26",
|
||||
"jsonwebtoken",
|
||||
"leaky-bucket",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"once_cell",
|
||||
|
||||
@@ -107,7 +107,6 @@ ipnet = "2.9.0"
|
||||
itertools = "0.10"
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
leaky-bucket = "1.0.1"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
|
||||
@@ -26,7 +26,6 @@ hyper = { workspace = true, features = ["full"] }
|
||||
fail.workspace = true
|
||||
futures = { workspace = true}
|
||||
jsonwebtoken.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
|
||||
225
libs/utils/src/leaky_bucket.rs
Normal file
225
libs/utils/src/leaky_bucket.rs
Normal file
@@ -0,0 +1,225 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::Instant;
|
||||
|
||||
pub struct LeakyBucketConfig {
|
||||
/// Leaky buckets can drain at a fixed interval rate.
|
||||
/// We track all times as durations since this epoch so we can round down.
|
||||
pub epoch: Instant,
|
||||
|
||||
/// How frequently we drain the bucket.
|
||||
/// If equal to 0, we drain continuously over time.
|
||||
/// If greater than 0, we drain at fixed intervals.
|
||||
pub drain_interval: Duration,
|
||||
|
||||
/// "time cost" of a single request unit.
|
||||
/// should loosely represent how long it takes to handle a request unit in active resource time.
|
||||
pub cost: Duration,
|
||||
|
||||
/// total size of the bucket
|
||||
pub bucket_width: Duration,
|
||||
}
|
||||
|
||||
impl LeakyBucketConfig {
|
||||
fn prev_multiple_of_drain(&self, mut dur: Duration) -> Duration {
|
||||
if self.drain_interval > Duration::ZERO {
|
||||
let n = dur.div_duration_f64(self.drain_interval).floor();
|
||||
dur = self.drain_interval.mul_f64(n);
|
||||
}
|
||||
dur
|
||||
}
|
||||
|
||||
fn next_multiple_of_drain(&self, mut dur: Duration) -> Duration {
|
||||
if self.drain_interval > Duration::ZERO {
|
||||
let n = dur.div_duration_f64(self.drain_interval).ceil();
|
||||
dur = self.drain_interval.mul_f64(n);
|
||||
}
|
||||
dur
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LeakyBucketState {
|
||||
/// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`.
|
||||
///
|
||||
/// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
|
||||
/// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`.
|
||||
/// If `now < start`, the bucket is considered filled and cannot accept any more tokens.
|
||||
/// Draining the bucket will happen naturally as `now` moves forward.
|
||||
///
|
||||
/// Let `n` be some "time cost" for the request,
|
||||
/// If now is after end, the bucket is empty and the end is reset to now,
|
||||
/// If now is within the `bucket window + n`, we are within time budget.
|
||||
/// If now is before the `bucket window + n`, we have run out of budget.
|
||||
///
|
||||
/// This is inspired by the generic cell rate algorithm (GCRA) and works
|
||||
/// exactly the same as a leaky-bucket.
|
||||
pub end: Duration,
|
||||
}
|
||||
|
||||
impl LeakyBucketState {
|
||||
pub fn new(now: Duration) -> Self {
|
||||
Self { end: now }
|
||||
}
|
||||
|
||||
pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool {
|
||||
// if self.end is after now, the bucket is not empty
|
||||
self.end <= config.prev_multiple_of_drain(now - config.epoch)
|
||||
}
|
||||
|
||||
/// Immediately adds tokens to the bucket, if there is space.
|
||||
///
|
||||
/// In a scenario where you are waiting for available rate,
|
||||
/// rather than just erroring immediately, `started` corresponds to when this waiting started.
|
||||
///
|
||||
/// `n` is the number of tokens that will be filled in the bucket.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
|
||||
/// there will be space again.
|
||||
pub fn add_tokens(
|
||||
&mut self,
|
||||
config: &LeakyBucketConfig,
|
||||
started: Instant,
|
||||
n: f64,
|
||||
) -> Result<(), Instant> {
|
||||
let now = Instant::now();
|
||||
|
||||
// round down to the last time we would have drained the bucket.
|
||||
let now = config.prev_multiple_of_drain(now - config.epoch);
|
||||
let started = config.prev_multiple_of_drain(started - config.epoch);
|
||||
|
||||
// invariant: started <= now
|
||||
debug_assert!(started <= now);
|
||||
|
||||
// If the bucket was empty when we started our search, bump the end up accordingly.
|
||||
let mut end = self.end;
|
||||
if end < started {
|
||||
end = started;
|
||||
}
|
||||
|
||||
let n = config.cost.mul_f64(n);
|
||||
let end_plus_n = end + n;
|
||||
let start_plus_n = end_plus_n.saturating_sub(config.bucket_width);
|
||||
|
||||
// start end
|
||||
// | start+n | end+n
|
||||
// | / | /
|
||||
// ------{o-[---------o-}--]----o----
|
||||
// now1 ^ now2 ^ ^ now3
|
||||
//
|
||||
// at now1, the bucket would be completely filled if we add n tokens.
|
||||
// at now2, the bucket would be partially filled if we add n tokens.
|
||||
// at now3, the bucket would start completely empty before we add n tokens.
|
||||
|
||||
if start_plus_n <= now {
|
||||
self.end = end_plus_n;
|
||||
Ok(())
|
||||
} else {
|
||||
let ready_at = config.next_multiple_of_drain(start_plus_n);
|
||||
Err(config.epoch + ready_at)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::Instant;
|
||||
|
||||
use super::{LeakyBucketConfig, LeakyBucketState};
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn check() {
|
||||
let config = LeakyBucketConfig {
|
||||
epoch: Instant::now(),
|
||||
// drain the bucket every 0.5 seconds.
|
||||
drain_interval: Duration::from_millis(500),
|
||||
// average 100rps
|
||||
cost: Duration::from_millis(10),
|
||||
// burst up to 100 requests
|
||||
bucket_width: Duration::from_millis(1000),
|
||||
};
|
||||
|
||||
let mut state = LeakyBucketState::new(Instant::now() - config.epoch);
|
||||
|
||||
// supports burst
|
||||
{
|
||||
// should work for 100 requests this instant
|
||||
for _ in 0..100 {
|
||||
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
}
|
||||
|
||||
// quantized refill
|
||||
{
|
||||
// after 499ms we should not drain any tokens.
|
||||
tokio::time::advance(Duration::from_millis(499)).await;
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(1));
|
||||
|
||||
// after 500ms we should have drained 50 tokens.
|
||||
tokio::time::advance(Duration::from_millis(1)).await;
|
||||
for _ in 0..50 {
|
||||
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
}
|
||||
|
||||
// doesn't overfill
|
||||
{
|
||||
// after 1s we should have an empty bucket again.
|
||||
tokio::time::advance(Duration::from_secs(1)).await;
|
||||
assert!(state.bucket_is_empty(&config, Instant::now()));
|
||||
|
||||
// after 1s more, we should not over count the tokens and allow more than 200 requests.
|
||||
tokio::time::advance(Duration::from_secs(1)).await;
|
||||
for _ in 0..100 {
|
||||
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
}
|
||||
|
||||
// supports sustained rate over a long period
|
||||
{
|
||||
tokio::time::advance(Duration::from_secs(1)).await;
|
||||
|
||||
// should sustain 100rps
|
||||
for _ in 0..2000 {
|
||||
tokio::time::advance(Duration::from_millis(10)).await;
|
||||
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// supports requesting more tokens than can be stored in the bucket
|
||||
// we just wait a little bit longer upfront.
|
||||
{
|
||||
// start the bucket completely empty
|
||||
tokio::time::advance(Duration::from_secs(5)).await;
|
||||
assert!(state.bucket_is_empty(&config, Instant::now()));
|
||||
|
||||
// requesting 200 tokens of space should take 200*cost = 2s
|
||||
// but we already have 1s available, so we wait 1s from start.
|
||||
let start = Instant::now();
|
||||
|
||||
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_secs(1));
|
||||
|
||||
tokio::time::advance(Duration::from_millis(500)).await;
|
||||
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
|
||||
tokio::time::advance(Duration::from_millis(500)).await;
|
||||
state.add_tokens(&config, start, 200.0).unwrap();
|
||||
|
||||
// bucket should be completely full now
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,6 +71,7 @@ pub mod postgres_client;
|
||||
|
||||
pub mod tracing_span_assert;
|
||||
|
||||
pub mod leaky_bucket;
|
||||
pub mod rate_limit;
|
||||
|
||||
/// Simple once-barrier and a guard which keeps barrier awaiting.
|
||||
|
||||
@@ -36,7 +36,6 @@ humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
leaky-bucket.workspace = true
|
||||
md5.workspace = true
|
||||
nix.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
|
||||
@@ -9,7 +9,9 @@ use std::{
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use enumset::EnumSet;
|
||||
use tokio::sync::Notify;
|
||||
use tracing::{error, warn};
|
||||
use utils::leaky_bucket::{LeakyBucketConfig, LeakyBucketState};
|
||||
|
||||
use crate::{context::RequestContext, task_mgr::TaskKind};
|
||||
|
||||
@@ -33,8 +35,7 @@ pub struct Throttle<M: Metric> {
|
||||
|
||||
pub struct Inner {
|
||||
task_kinds: EnumSet<TaskKind>,
|
||||
rate_limiter: Arc<leaky_bucket::RateLimiter>,
|
||||
config: Config,
|
||||
rate_limiter: Arc<RateLimiter>,
|
||||
}
|
||||
|
||||
pub type Config = pageserver_api::models::ThrottleConfig;
|
||||
@@ -78,7 +79,7 @@ where
|
||||
refill_amount,
|
||||
max,
|
||||
fair,
|
||||
} = &config;
|
||||
} = config;
|
||||
let task_kinds: EnumSet<TaskKind> = task_kinds
|
||||
.iter()
|
||||
.filter_map(|s| match TaskKind::from_str(s) {
|
||||
@@ -93,18 +94,34 @@ where
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// how frequently we drain a single token on average
|
||||
let time_cost = refill_interval / refill_amount.get() as u32;
|
||||
let bucket_width = time_cost * (max as u32);
|
||||
|
||||
// initial tracks how many tokens are available to put in the bucket
|
||||
// we want how many tokens are currently in the bucket
|
||||
let initial_tokens = (max - initial) as u32;
|
||||
let end = time_cost * initial_tokens;
|
||||
|
||||
let rate_limiter = RateLimiter {
|
||||
config: LeakyBucketConfig {
|
||||
epoch: tokio::time::Instant::now(),
|
||||
drain_interval: refill_interval,
|
||||
cost: time_cost,
|
||||
bucket_width,
|
||||
},
|
||||
state: Mutex::new(LeakyBucketState::new(end)),
|
||||
queue: fair.then(|| {
|
||||
let queue = Notify::new();
|
||||
queue.notify_one();
|
||||
queue
|
||||
}),
|
||||
};
|
||||
|
||||
Inner {
|
||||
task_kinds,
|
||||
rate_limiter: Arc::new(
|
||||
leaky_bucket::RateLimiter::builder()
|
||||
.initial(*initial)
|
||||
.interval(*refill_interval)
|
||||
.refill(refill_amount.get())
|
||||
.max(*max)
|
||||
.fair(*fair)
|
||||
.build(),
|
||||
),
|
||||
config,
|
||||
rate_limiter: Arc::new(rate_limiter),
|
||||
}
|
||||
}
|
||||
pub fn reconfigure(&self, config: Config) {
|
||||
@@ -127,7 +144,7 @@ where
|
||||
|
||||
/// See [`Config::steady_rps`].
|
||||
pub fn steady_rps(&self) -> f64 {
|
||||
self.inner.load().config.steady_rps()
|
||||
self.inner.load().rate_limiter.steady_rps()
|
||||
}
|
||||
|
||||
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
|
||||
@@ -136,18 +153,9 @@ where
|
||||
return None;
|
||||
};
|
||||
let start = std::time::Instant::now();
|
||||
let mut did_throttle = false;
|
||||
let acquire = inner.rate_limiter.acquire(key_count);
|
||||
// turn off runtime-induced preemption (aka coop) so our `did_throttle` is accurate
|
||||
let acquire = tokio::task::unconstrained(acquire);
|
||||
let mut acquire = std::pin::pin!(acquire);
|
||||
std::future::poll_fn(|cx| {
|
||||
use std::future::Future;
|
||||
let poll = acquire.as_mut().poll(cx);
|
||||
did_throttle = did_throttle || poll.is_pending();
|
||||
poll
|
||||
})
|
||||
.await;
|
||||
|
||||
let did_throttle = inner.rate_limiter.acquire(key_count).await;
|
||||
|
||||
self.count_accounted.fetch_add(1, Ordering::Relaxed);
|
||||
if did_throttle {
|
||||
self.count_throttled.fetch_add(1, Ordering::Relaxed);
|
||||
@@ -176,3 +184,56 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RateLimiter {
|
||||
config: LeakyBucketConfig,
|
||||
state: Mutex<LeakyBucketState>,
|
||||
|
||||
/// if this rate limiter is fair,
|
||||
/// provide a queue to provide this fair ordering.
|
||||
queue: Option<Notify>,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
fn steady_rps(&self) -> f64 {
|
||||
self.config.cost.as_secs_f64().recip()
|
||||
}
|
||||
|
||||
/// returns true if we did throttle
|
||||
async fn acquire(&self, count: usize) -> bool {
|
||||
let mut throttled = false;
|
||||
|
||||
let start = tokio::time::Instant::now();
|
||||
|
||||
// wait until we are the first in the queue
|
||||
if let Some(queue) = &self.queue {
|
||||
let mut notified = std::pin::pin!(queue.notified());
|
||||
if !notified.as_mut().enable() {
|
||||
throttled = true;
|
||||
notified.await;
|
||||
}
|
||||
}
|
||||
|
||||
// notify the next waiter in the queue when we are done.
|
||||
scopeguard::defer! {
|
||||
if let Some(queue) = &self.queue {
|
||||
queue.notify_one();
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
let res = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.add_tokens(&self.config, start, count as f64);
|
||||
match res {
|
||||
Ok(()) => return throttled,
|
||||
Err(ready_at) => {
|
||||
throttled = true;
|
||||
tokio::time::sleep_until(ready_at).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,9 @@ pub(crate) use limit_algorithm::aimd::Aimd;
|
||||
pub(crate) use limit_algorithm::{
|
||||
DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
|
||||
};
|
||||
pub(crate) use limiter::GlobalRateLimiter;
|
||||
|
||||
pub use leaky_bucket::{
|
||||
EndpointRateLimiter, LeakyBucketConfig, LeakyBucketRateLimiter, LeakyBucketState,
|
||||
pub use limiter::{
|
||||
BucketRateLimiter, GlobalRateLimiter, RateBucketInfo,
|
||||
WakeComputeRateLimiter,
|
||||
};
|
||||
pub use limiter::{BucketRateLimiter, RateBucketInfo, WakeComputeRateLimiter};
|
||||
|
||||
use leaky_bucket::EndpointRateLimiter;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
hash::Hash,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use ahash::RandomState;
|
||||
@@ -8,6 +9,7 @@ use dashmap::DashMap;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
use utils::leaky_bucket::LeakyBucketState;
|
||||
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
@@ -16,7 +18,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter<EndpointIdInt>;
|
||||
|
||||
pub struct LeakyBucketRateLimiter<Key> {
|
||||
map: DashMap<Key, LeakyBucketState, RandomState>,
|
||||
config: LeakyBucketConfig,
|
||||
config: utils::leaky_bucket::LeakyBucketConfig,
|
||||
access_count: AtomicUsize,
|
||||
}
|
||||
|
||||
@@ -29,7 +31,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
|
||||
pub fn new_with_shards(config: LeakyBucketConfig, shards: usize) -> Self {
|
||||
Self {
|
||||
map: DashMap::with_hasher_and_shard_amount(RandomState::new(), shards),
|
||||
config,
|
||||
config: config.into(),
|
||||
access_count: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
@@ -42,12 +44,12 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
|
||||
self.do_gc(now);
|
||||
}
|
||||
|
||||
let mut entry = self.map.entry(key).or_insert_with(|| LeakyBucketState {
|
||||
time: now,
|
||||
filled: 0.0,
|
||||
});
|
||||
let mut entry = self
|
||||
.map
|
||||
.entry(key)
|
||||
.or_insert_with(|| LeakyBucketState::new(now - self.config.epoch));
|
||||
|
||||
entry.check(&self.config, now, n as f64)
|
||||
entry.add_tokens(&self.config, now, n as f64).is_ok()
|
||||
}
|
||||
|
||||
fn do_gc(&self, now: Instant) {
|
||||
@@ -59,7 +61,7 @@ impl<K: Hash + Eq> LeakyBucketRateLimiter<K> {
|
||||
let shard = thread_rng().gen_range(0..n);
|
||||
self.map.shards()[shard]
|
||||
.write()
|
||||
.retain(|_, value| !value.get_mut().update(&self.config, now));
|
||||
.retain(|_, value| !value.get().bucket_is_empty(&self.config, now));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,12 +70,6 @@ pub struct LeakyBucketConfig {
|
||||
pub max: f64,
|
||||
}
|
||||
|
||||
pub struct LeakyBucketState {
|
||||
filled: f64,
|
||||
time: Instant,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl LeakyBucketConfig {
|
||||
pub(crate) fn new(rps: f64, max: f64) -> Self {
|
||||
assert!(rps > 0.0, "rps must be positive");
|
||||
@@ -82,41 +78,18 @@ impl LeakyBucketConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl LeakyBucketState {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
filled: 0.0,
|
||||
time: Instant::now(),
|
||||
impl From<LeakyBucketConfig> for utils::leaky_bucket::LeakyBucketConfig {
|
||||
fn from(config: LeakyBucketConfig) -> Self {
|
||||
// seconds_per_request = 1/(request_per_second)
|
||||
let spr = config.rps.recip();
|
||||
let bucket_width = Duration::from_secs_f64(config.max * spr);
|
||||
utils::leaky_bucket::LeakyBucketConfig {
|
||||
epoch: Instant::now(),
|
||||
cost: Duration::from_secs_f64(spr),
|
||||
bucket_width,
|
||||
drain_interval: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
/// updates the timer and returns true if the bucket is empty
|
||||
fn update(&mut self, info: &LeakyBucketConfig, now: Instant) -> bool {
|
||||
let drain = now.duration_since(self.time);
|
||||
let drain = drain.as_secs_f64() * info.rps;
|
||||
|
||||
self.filled = (self.filled - drain).clamp(0.0, info.max);
|
||||
self.time = now;
|
||||
|
||||
self.filled == 0.0
|
||||
}
|
||||
|
||||
pub(crate) fn check(&mut self, info: &LeakyBucketConfig, now: Instant, n: f64) -> bool {
|
||||
self.update(info, now);
|
||||
|
||||
if self.filled + n > info.max {
|
||||
return false;
|
||||
}
|
||||
self.filled += n;
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LeakyBucketState {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -125,48 +98,56 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::Instant;
|
||||
use utils::leaky_bucket::LeakyBucketState;
|
||||
|
||||
use super::{LeakyBucketConfig, LeakyBucketState};
|
||||
use super::LeakyBucketConfig;
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn check() {
|
||||
let info = LeakyBucketConfig::new(500.0, 2000.0);
|
||||
let mut bucket = LeakyBucketState::new();
|
||||
let config: utils::leaky_bucket::LeakyBucketConfig =
|
||||
LeakyBucketConfig::new(500.0, 2000.0).into();
|
||||
assert_eq!(config.cost, Duration::from_millis(2));
|
||||
assert_eq!(config.bucket_width, Duration::from_secs(4));
|
||||
|
||||
let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch);
|
||||
|
||||
// should work for 2000 requests this second
|
||||
for _ in 0..2000 {
|
||||
assert!(bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(!bucket.check(&info, Instant::now(), 1.0));
|
||||
assert_eq!(bucket.filled, 2000.0);
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(
|
||||
bucket.end - (Instant::now() - config.epoch),
|
||||
config.bucket_width
|
||||
);
|
||||
|
||||
// in 1ms we should drain 0.5 tokens.
|
||||
// make sure we don't lose any tokens
|
||||
tokio::time::advance(Duration::from_millis(1)).await;
|
||||
assert!(!bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
tokio::time::advance(Duration::from_millis(1)).await;
|
||||
assert!(bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
|
||||
// in 10ms we should drain 5 tokens
|
||||
tokio::time::advance(Duration::from_millis(10)).await;
|
||||
for _ in 0..5 {
|
||||
assert!(bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(!bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
|
||||
// in 10s we should drain 5000 tokens
|
||||
// but cap is only 2000
|
||||
tokio::time::advance(Duration::from_secs(10)).await;
|
||||
for _ in 0..2000 {
|
||||
assert!(bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(!bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
|
||||
// should sustain 500rps
|
||||
for _ in 0..2000 {
|
||||
tokio::time::advance(Duration::from_millis(10)).await;
|
||||
for _ in 0..5 {
|
||||
assert!(bucket.check(&info, Instant::now(), 1.0));
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user