From 300a43db5c3ecd156a1196654bae6a700d1c0525 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 29 Jul 2024 14:50:47 +0100 Subject: [PATCH] rename --- libs/utils/src/leaky_bucket.rs | 53 ++++++++++++-------------- pageserver/src/tenant/throttle.rs | 6 +-- proxy/src/rate_limiter/leaky_bucket.rs | 6 +-- 3 files changed, 30 insertions(+), 35 deletions(-) diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs index 0728e18194..661518ab07 100644 --- a/libs/utils/src/leaky_bucket.rs +++ b/libs/utils/src/leaky_bucket.rs @@ -8,31 +8,33 @@ pub struct LeakyBucketConfig { pub epoch: Instant, /// How frequently we drain the bucket. - /// If equal to 0, we drain constantly over time. + /// If equal to 0, we drain continuously over time. /// If greater than 0, we drain at fixed intervals. - pub refill_rate: Duration, + pub drain_interval: Duration, /// "time cost" of a single request unit. - /// loosely represents how long it takes to handle a request unit in active CPU time. - pub time_cost: Duration, + /// should loosely represents how long it takes to handle a request unit in active resource time. + pub cost: Duration, /// total size of the bucket pub bucket_width: Duration, } impl LeakyBucketConfig { - pub fn quantize_instant(&self, now: Instant) -> Duration { - let mut now = now - self.epoch; - - if self.refill_rate > Duration::ZERO { - // we only "add" new tokens on a fixed interval. - // truncate to the most recent multiple of self.interval. - now = self - .refill_rate - .mul_f64(now.div_duration_f64(self.refill_rate).trunc()); + fn prev_multiple_of_drain(&self, mut dur: Duration) -> Duration { + if self.drain_interval > Duration::ZERO { + let n = dur.div_duration_f64(self.drain_interval).floor(); + dur = self.drain_interval.mul_f64(n); } + dur + } - now + fn next_multiple_of_drain(&self, mut dur: Duration) -> Duration { + if self.drain_interval > Duration::ZERO { + let n = dur.div_duration_f64(self.drain_interval).ceil(); + dur = self.drain_interval.mul_f64(n); + } + dur } } @@ -61,7 +63,7 @@ impl LeakyBucketState { pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool { // if self.end is after now, the bucket is not empty - config.quantize_instant(now) <= self.end + config.prev_multiple_of_drain(now - config.epoch) <= self.end } /// Immedaitely adds tokens to the bucket, if there is space. @@ -73,9 +75,10 @@ impl LeakyBucketState { now: Instant, n: f64, ) -> Result<(), Instant> { - let now = config.quantize_instant(now); + // round down to the last time we would have drained the bucket. + let now = config.prev_multiple_of_drain(now - config.epoch); - let n = config.time_cost.mul_f64(n); + let n = config.cost.mul_f64(n); let end_plus_n = self.end + n; let start_plus_n = end_plus_n.saturating_sub(config.bucket_width); @@ -97,15 +100,7 @@ impl LeakyBucketState { self.end = end_plus_n; Ok(()) } else { - let mut ready_at = start_plus_n; - - // we need the ready_at.next_multiple_of(config.refill_rate) - if config.refill_rate > Duration::ZERO { - ready_at = config - .refill_rate - .mul_f64(ready_at.div_duration_f64(config.refill_rate).ceil()); - } - + let ready_at = config.next_multiple_of_drain(start_plus_n); Err(config.epoch + ready_at) } } @@ -123,10 +118,10 @@ mod tests { async fn check() { let config = LeakyBucketConfig { epoch: Instant::now(), - // refill every 0.5 seconds. - refill_rate: Duration::from_millis(500), + // drain the bucket every 0.5 seconds. + drain_interval: Duration::from_millis(500), // average 100rps - time_cost: Duration::from_millis(10), + cost: Duration::from_millis(10), // burst up to 100 requests bucket_width: Duration::from_millis(1000), }; diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 33ac602320..ce4fa49ac8 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -107,8 +107,8 @@ where let rate_limiter = RateLimiter { config: LeakyBucketConfig { epoch: tokio::time::Instant::now(), - refill_rate: refill_interval, - time_cost, + drain_interval: refill_interval, + cost: time_cost, bucket_width, }, state: Mutex::new(LeakyBucketState::new(end)), @@ -192,7 +192,7 @@ struct RateLimiter { impl RateLimiter { fn steady_rps(&self) -> f64 { - self.config.time_cost.as_secs_f64().recip() + self.config.cost.as_secs_f64().recip() } /// returns true if we did throttle diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 63116dd99f..3ff4c2aced 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -85,9 +85,9 @@ impl From for utils::leaky_bucket::LeakyBucketConfig { let bucket_width = Duration::from_secs_f64(config.max * spr); utils::leaky_bucket::LeakyBucketConfig { epoch: Instant::now(), - time_cost: Duration::from_secs_f64(spr), + cost: Duration::from_secs_f64(spr), bucket_width, - refill_rate: Duration::ZERO, + drain_interval: Duration::ZERO, } } } @@ -105,7 +105,7 @@ mod tests { async fn check() { let config: utils::leaky_bucket::LeakyBucketConfig = LeakyBucketConfig::new(500.0, 2000.0).into(); - assert_eq!(config.time_cost, Duration::from_millis(2)); + assert_eq!(config.cost, Duration::from_millis(2)); assert_eq!(config.bucket_width, Duration::from_secs(4)); let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch);