diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs index a43c93ec59..3e1eaef42f 100644 --- a/libs/utils/src/leaky_bucket.rs +++ b/libs/utils/src/leaky_bucket.rs @@ -22,6 +22,7 @@ //! Another explaination can be found here: use std::{ + fmt::Debug, sync::Mutex, task::{Poll, Waker}, time::Duration, @@ -31,20 +32,38 @@ use pin_list::{Node, NodeData, PinList}; use tokio::time::Instant; pub struct LeakyBucketConfig { + pub epoch: Instant, + /// This is the "time cost" of a single request unit. /// Should loosely represent how long it takes to handle a request unit in active resource time. /// Loosely speaking this is the inverse of the steady-rate requests-per-second - pub cost: Duration, + pub cost: Ns, /// total size of the bucket - pub bucket_width: Duration, + pub bucket_width: Ns, } impl LeakyBucketConfig { pub fn new(rps: f64, bucket_size: f64) -> Self { let cost = Duration::from_secs_f64(rps.recip()); let bucket_width = cost.mul_f64(bucket_size); - Self { cost, bucket_width } + Self { + epoch: Instant::now(), + cost: Ns(cost.as_nanos() as u64), + bucket_width: Ns(bucket_width.as_nanos() as u64), + } + } + + pub fn to_epoch(&self, t: Instant) -> InstantNs { + InstantNs((t - self.epoch).into()) + } + + pub fn from_epoch(&self, t: InstantNs) -> Instant { + self.epoch + t + } + + pub fn now(&self) -> InstantNs { + self.to_epoch(Instant::now()) } } @@ -63,17 +82,17 @@ pub struct LeakyBucketState { /// /// This is inspired by the generic cell rate algorithm (GCRA) and works /// exactly the same as a leaky-bucket. - pub empty_at: Instant, + pub empty_at: InstantNs, } impl LeakyBucketState { pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self { LeakyBucketState { - empty_at: Instant::now() + config.cost.mul_f64(initial_tokens), + empty_at: InstantNs(config.cost * initial_tokens), } } - pub fn bucket_is_empty(&self, now: Instant) -> bool { + pub fn bucket_is_empty(&self, now: InstantNs) -> bool { // if self.end is after now, the bucket is not empty self.empty_at <= now } @@ -95,8 +114,22 @@ impl LeakyBucketState { started: Instant, n: f64, ) -> Result<(), Instant> { - let now = Instant::now(); + self.add_tokens_fast( + config.bucket_width, + config.to_epoch(started), + config.to_epoch(Instant::now()), + config.cost * n, + ) + .map_err(|ready_at| config.from_epoch(ready_at)) + } + pub fn add_tokens_fast( + &mut self, + bucket_width: Ns, + started: InstantNs, + now: InstantNs, + n: Ns, + ) -> Result<(), InstantNs> { // invariant: started <= now debug_assert!(started <= now); @@ -108,9 +141,8 @@ impl LeakyBucketState { empty_at = started; } - let n = config.cost.mul_f64(n); let new_empty_at = empty_at + n; - let allow_at = new_empty_at.checked_sub(config.bucket_width); + let allow_at = new_empty_at.checked_sub(bucket_width); // empty_at // allow_at | new_empty_at @@ -131,6 +163,73 @@ impl LeakyBucketState { } } +// u64 nanoseconds allows for 584 years +#[derive(PartialEq, PartialOrd, Copy, Clone)] +pub struct Ns(u64); + +impl Debug for Ns { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Duration::from_nanos(self.0).fmt(f) + } +} + +impl Ns { + fn as_secs_f64(self) -> f64 { + (self.0 as f64) / 1_000_000_000.0 + } +} + +impl From for Ns { + fn from(value: Duration) -> Self { + Self(value.as_nanos() as u64) + } +} + +impl std::ops::Mul for Ns { + type Output = Ns; + fn mul(self, rhs: f64) -> Ns { + Ns((self.0 as f64 * rhs) as u64) + } +} + +impl std::ops::Mul for Ns { + type Output = Ns; + fn mul(self, rhs: u64) -> Ns { + Ns(self.0 * rhs) + } +} + +// ns since some epoch +#[derive(PartialEq, PartialOrd, Copy, Clone)] +pub struct InstantNs(Ns); + +impl std::ops::Add for InstantNs { + type Output = InstantNs; + fn add(self, rhs: Ns) -> InstantNs { + Self(Ns(self.0 .0 + rhs.0)) + } +} + +impl std::ops::Sub for InstantNs { + type Output = Ns; + fn sub(self, rhs: InstantNs) -> Ns { + Ns(self.0 .0 - rhs.0 .0) + } +} + +impl InstantNs { + fn checked_sub(self, rhs: Ns) -> Option { + self.0 .0.checked_sub(rhs.0).map(Ns).map(Self) + } +} + +impl std::ops::Add for Instant { + type Output = Instant; + fn add(self, rhs: InstantNs) -> Instant { + self + Duration::from_nanos(rhs.0 .0) + } +} + pub struct RateLimiter { config: LeakyBucketConfig, queue: Mutex, @@ -151,8 +250,8 @@ impl RateLimiter { limiter: self, sleep_counter: 0, state: None, - count, - start: tokio::time::Instant::now(), + count: self.config.cost * (count as u64), + start: self.config.to_epoch(Instant::now()), } } } @@ -160,10 +259,12 @@ impl RateLimiter { type RateLimitQueue = dyn pin_list::Types< Id = pin_list::id::Checked, // the waker that lets us wake the next in the queue - Protected = (Waker, Instant, usize), + // the instant is our acquire start time + // the duration is our GCRA token cost + Protected = (Waker, InstantNs, Ns), // the token that gives us access to the rate limit state, along with when it should be ready. // if None, then we were granted access already by the leader - Removed = Option<(LeakyBucketState, Instant)>, + Removed = Option<(LeakyBucketState, InstantNs)>, // the sleep count at the start of the enqueue Unprotected = u64, >; @@ -172,11 +273,11 @@ pin_project_lite::pin_project! { struct Enqueued<'a> { #[pin] entry: Node, - state: Option<(LeakyBucketState, Instant)>, + state: Option<(LeakyBucketState, InstantNs)>, sleep_counter: u64, limiter: &'a RateLimiter, - start: Instant, - count: usize, + start: InstantNs, + count: Ns, } impl<'a> PinnedDrop for Enqueued<'a> { @@ -204,18 +305,21 @@ pin_project_lite::pin_project! { }; let mut cursor = q.queue.cursor_front_mut(); + let now = this.limiter.config.to_epoch(Instant::now()); loop { match cursor.protected() { - Some((_waker, start, count)) => { - match state.add_tokens(&this.limiter.config, *start, *count as f64) { + Some((_waker, start, n)) => { + match state.add_tokens_fast(this.limiter.config.bucket_width, *start, now, *n) { Ok(()) => { - cursor.remove_current(None) + let (waker, _, _) = cursor.remove_current(None) .map_err(|_| {}).expect("we have just checked that the current node is in the list"); + waker.wake(); }, // next in the queue has to sleep Err(ready_at) => { - cursor.remove_current(Some((state, ready_at))) + let (waker, _, _) = cursor.remove_current(Some((state, ready_at))) .map_err(|_| {}).expect("we have just checked that the current node is in the list"); + waker.wake(); break; } } @@ -314,13 +418,15 @@ impl RateLimiter { return start_count < *entry.sleep_counter; }; + let mut now = self.config.to_epoch(Instant::now()); loop { - if *ready_at > Instant::now() { + if *ready_at > now { *entry.sleep_counter += 1; - tokio::time::sleep_until(*ready_at).await; + tokio::time::sleep_until(self.config.from_epoch(*ready_at)).await; + now = self.config.to_epoch(Instant::now()); } - match state.add_tokens(&self.config, *entry.start, count as f64) { + match state.add_tokens_fast(self.config.bucket_width, *entry.start, now, *entry.count) { Ok(()) => return start_count < *entry.sleep_counter, // we might hit this branch if we were the first in the queue // and the limit happened to be exhausted already @@ -340,16 +446,10 @@ mod tests { #[tokio::test(start_paused = true)] async fn check() { - let config = LeakyBucketConfig { - // average 100rps - cost: Duration::from_millis(10), - // burst up to 100 requests - bucket_width: Duration::from_millis(1000), - }; - - let mut state = LeakyBucketState { - empty_at: Instant::now(), - }; + // average 100rps + // burst up to 100 requests + let config = LeakyBucketConfig::new(100.0, 100.0); + let mut state = LeakyBucketState::with_initial_tokens(&config, 0.0); // supports burst { @@ -365,7 +465,7 @@ mod tests { { // after 1s we should have an empty bucket again. tokio::time::advance(Duration::from_secs(1)).await; - assert!(state.bucket_is_empty(Instant::now())); + assert!(state.bucket_is_empty(config.now())); // after 1s more, we should not over count the tokens and allow more than 200 requests. tokio::time::advance(Duration::from_secs(1)).await; @@ -392,7 +492,7 @@ mod tests { { // start the bucket completely empty tokio::time::advance(Duration::from_secs(5)).await; - assert!(state.bucket_is_empty(Instant::now())); + assert!(state.bucket_is_empty(config.now())); // requesting 200 tokens of space should take 200*cost = 2s // but we already have 1s available, so we wait 1s from start. diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index bf4d85f2e4..83b21874d3 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -6,9 +6,8 @@ use std::{ use ahash::RandomState; use dashmap::DashMap; use rand::{thread_rng, Rng}; -use tokio::time::Instant; use tracing::info; -use utils::leaky_bucket::LeakyBucketState; +use utils::leaky_bucket::{InstantNs, LeakyBucketState}; use crate::intern::EndpointIdInt; @@ -37,7 +36,7 @@ impl LeakyBucketRateLimiter { /// Check that number of connections to the endpoint is below `max_rps` rps. pub(crate) fn check(&self, key: K, n: u32) -> bool { - let now = Instant::now(); + let now = self.config.now(); if self.access_count.fetch_add(1, Ordering::AcqRel) % 2048 == 0 { self.do_gc(now); @@ -48,10 +47,17 @@ impl LeakyBucketRateLimiter { .entry(key) .or_insert_with(|| LeakyBucketState { empty_at: now }); - entry.add_tokens(&self.config, now, n as f64).is_ok() + entry + .add_tokens_fast( + self.config.bucket_width, + now, + now, + self.config.cost * (n as u64), + ) + .is_ok() } - fn do_gc(&self, now: Instant) { + fn do_gc(&self, now: InstantNs) { info!( "cleaning up bucket rate limiter, current size = {}", self.map.len() @@ -90,7 +96,7 @@ mod tests { use std::time::Duration; use tokio::time::Instant; - use utils::leaky_bucket::LeakyBucketState; + use utils::leaky_bucket::{LeakyBucketState, Ns}; use super::LeakyBucketConfig; @@ -98,11 +104,11 @@ mod tests { async fn check() { let config: utils::leaky_bucket::LeakyBucketConfig = LeakyBucketConfig::new(500.0, 2000.0).into(); - assert_eq!(config.cost, Duration::from_millis(2)); - assert_eq!(config.bucket_width, Duration::from_secs(4)); + assert_eq!(config.cost, Ns::from(Duration::from_millis(2))); + assert_eq!(config.bucket_width, Ns::from(Duration::from_secs(4))); let mut bucket = LeakyBucketState { - empty_at: Instant::now(), + empty_at: config.now(), }; // should work for 2000 requests this second @@ -110,7 +116,7 @@ mod tests { bucket.add_tokens(&config, Instant::now(), 1.0).unwrap(); } bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err(); - assert_eq!(bucket.empty_at - Instant::now(), config.bucket_width); + assert_eq!(bucket.empty_at - config.now(), config.bucket_width); // in 1ms we should drain 0.5 tokens. // make sure we don't lose any tokens