From 28b85ca7110d90a700240663d7567c588fd7a0ca Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 29 Jul 2024 14:08:37 +0100 Subject: [PATCH] share impl between proxy and ps --- libs/utils/src/leaky_bucket.rs | 113 ++++++++++++++++++ libs/utils/src/lib.rs | 1 + pageserver/src/tenant/throttle.rs | 157 ++++++++----------------- proxy/src/rate_limiter/leaky_bucket.rs | 112 +++++------------- 4 files changed, 191 insertions(+), 192 deletions(-) create mode 100644 libs/utils/src/leaky_bucket.rs diff --git a/libs/utils/src/leaky_bucket.rs b/libs/utils/src/leaky_bucket.rs new file mode 100644 index 0000000000..7e175a21c4 --- /dev/null +++ b/libs/utils/src/leaky_bucket.rs @@ -0,0 +1,113 @@ +use std::time::Duration; + +use tokio::time::Instant; + +pub struct LeakyBucketConfig { + /// Leaky buckets can drain at a fixed interval rate. + /// We track all times as durations since this epoch so we can round down. + pub epoch: Instant, + + /// How frequently we drain the bucket. + /// If equal to 0, we drain constantly over time. + /// If greater than 0, we drain at fixed intervals. + pub refill_rate: Duration, + + /// "time cost" of a single request unit. + /// loosely represents how long it takes to handle a request unit in active CPU time. + pub time_cost: Duration, + + /// total size of the bucket + pub bucket_width: Duration, +} + +impl LeakyBucketConfig { + pub fn quantize_instant(&self, now: Instant) -> Duration { + let mut now = now - self.epoch; + + if self.refill_rate > Duration::ZERO { + // we only "add" new tokens on a fixed interval. + // truncate to the most recent multiple of self.interval. + now = self + .refill_rate + .mul_f64(now.div_duration_f64(self.refill_rate).trunc()); + } + + now + } +} + +// impl From for LeakyBucketConfig { +// fn from(config: LeakyBucketConfig) -> Self { +// // seconds_per_request = 1/(request_per_second) +// let spr = config.rps.recip(); +// Self { +// time_cost: Duration::from_secs_f64(spr), +// bucket_width: Duration::from_secs_f64(config.max * spr), +// } +// } +// } + +pub struct LeakyBucketState { + /// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`. + /// + /// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost". + /// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`. + /// If `now < start`, the bucket is considered filled and cannot accept any more tokens. + /// Draining the bucket will happen naturally as `now` moves forward. + /// + /// Let `n` be some "time cost" for the request, + /// If now is after end, the bucket is empty and the end is reset to now, + /// If now is within the `bucket window + n`, we are within time budget. + /// If now is before the `bucket window + n`, we have run out of budget. + /// + /// This is inspired by the generic cell rate algorithm (GCRA) and works + /// exactly the same as a leaky-bucket. + pub end: Duration, +} + +impl LeakyBucketState { + pub fn new(now: Duration) -> Self { + Self { end: now } + } + + pub fn bucket_is_empty(&self, config: &LeakyBucketConfig, now: Instant) -> bool { + // if self.end is after now, the bucket is not empty + config.quantize_instant(now) < self.end + } + + /// Immedaitely adds tokens to the bucket, if there is space. + /// If there is not enough space, no tokens are added. Instead, an error is returned with the time when + /// there will be space again. + pub fn add_tokens( + &mut self, + config: &LeakyBucketConfig, + now: Instant, + n: f64, + ) -> Result<(), Instant> { + let now = config.quantize_instant(now); + + let start = self.end - config.bucket_width; + + let n = config.time_cost.mul_f64(n); + + // start end + // | start+n | end+n + // | / | / + // ------{o-[---------o-}--]----o---- + // now1 ^ now2 ^ ^ now3 + // + // at now1, the bucket would be completely filled if we add n tokens. + // at now2, the bucket would be partially filled if we add n tokens. + // at now3, the bucket would start completely empty before we add n tokens. + + if self.end + n <= now { + self.end = now + n; + Ok(()) + } else if start + n <= now { + self.end += n; + Ok(()) + } else { + Err(config.epoch + start + n) + } + } +} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index a46d68ef33..112fac208a 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -71,6 +71,7 @@ pub mod postgres_client; pub mod tracing_span_assert; +pub mod leaky_bucket; pub mod rate_limit; /// Simple once-barrier and a guard which keeps barrier awaiting. diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 342cbf31c3..d7fb54a210 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -11,6 +11,7 @@ use arc_swap::ArcSwap; use enumset::EnumSet; use tokio::sync::Notify; use tracing::{error, warn}; +use utils::leaky_bucket::{LeakyBucketConfig, LeakyBucketState}; use crate::{context::RequestContext, task_mgr::TaskKind}; @@ -93,18 +94,33 @@ where } }) .collect(); + + // + let time_cost = refill_interval / refill_amount.get() as u32; + let bucket_width = time_cost * (max as u32); + + // initial tracks how many tokens are available to put in the bucket + // we want how many tokens are currently in the bucket + let initial_tokens = (max - initial) as u32; + let end = time_cost * initial_tokens; + + // start a bit early to avoid certain underflow issues + let epoch_offset = 2 * bucket_width + refill_interval; + + let rate_limiter = RateLimiter { + config: LeakyBucketConfig { + epoch: tokio::time::Instant::now() - epoch_offset, + refill_rate: refill_interval, + time_cost, + bucket_width, + }, + state: Mutex::new(LeakyBucketState::new(end + epoch_offset)), + queue: fair.then(Notify::new), + }; + Inner { task_kinds, - rate_limiter: Arc::new( - RateLimiterBuilder { - initial, - refill_interval, - refill: refill_amount.get(), - max, - fair, - } - .build(), - ), + rate_limiter: Arc::new(rate_limiter), } } pub fn reconfigure(&self, config: Config) { @@ -137,7 +153,7 @@ where }; let start = std::time::Instant::now(); - let did_throttle = !inner.rate_limiter.acquire(key_count).await; + let did_throttle = inner.rate_limiter.acquire(key_count).await; self.count_accounted.fetch_add(1, Ordering::Relaxed); if did_throttle { @@ -169,89 +185,33 @@ where } struct RateLimiter { - epoch: tokio::time::Instant, - - /// "time cost" of a single request unit. - /// loosely represents how long it takes to handle a request unit in active CPU time. - time_cost: Duration, - - bucket_width: Duration, - - interval: Duration, - - /// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`. - /// - /// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost". - /// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`. - /// If `now < start`, the bucket is considered filled and cannot accept any more tokens. - /// Draining the bucket will happen naturally as `now` moves forward. - /// - /// Let `n` be some "time cost" for the request, - /// If now is after end, the bucket is empty and the end is reset to now, - /// If now is within the `bucket window + n`, we are within time budget. - /// If now is before the `bucket window + n`, we have run out of budget. - /// - /// This is inspired by the generic cell rate algorithm (GCRA) and works - /// exactly the same as a leaky-bucket. - end: Mutex, + config: LeakyBucketConfig, + state: Mutex, + /// if this rate limiter is fair, + /// provide a queue to provide this fair ordering. queue: Option, } -struct RateLimiterBuilder { - /// The max number of tokens. - max: usize, - /// The initial count of tokens. - initial: usize, - /// Tokens to add every `per` duration. - refill: usize, - /// Interval to add tokens in milliseconds. - refill_interval: Duration, - /// If the rate limiter is fair or not. - fair: bool, -} - -impl RateLimiterBuilder { - fn build(self) -> RateLimiter { - let queue = self.fair.then(Notify::new); - - let time_cost = self.refill_interval / self.refill as u32; - let bucket_width = time_cost * (self.max as u32); - let initial_allow = time_cost * (self.initial as u32); - let end = bucket_width - initial_allow; - - RateLimiter { - epoch: tokio::time::Instant::now(), - time_cost, - bucket_width, - interval: self.refill_interval, - end: Mutex::new(end), - queue, - } - } -} - impl RateLimiter { fn steady_rps(&self) -> f64 { - self.time_cost.as_secs_f64().recip() + self.config.time_cost.as_secs_f64().recip() } - /// returns true if not throttled + /// returns true if we did throttle async fn acquire(&self, count: usize) -> bool { - let mut not_throttled = true; - - let n = self.time_cost.mul_f64(count as f64); + let mut throttled = false; // wait until we are the first in the queue if let Some(queue) = &self.queue { let mut notified = std::pin::pin!(queue.notified()); if !notified.as_mut().enable() { - not_throttled = false; + throttled = true; notified.await; } } - // notify the next waiter in the queue + // notify the next waiter in the queue when we are done. scopeguard::defer! { if let Some(queue) = &self.queue { queue.notify_one(); @@ -260,42 +220,19 @@ impl RateLimiter { loop { let now = tokio::time::Instant::now(); - let now = now - self.epoch; - // we only "add" new tokens on a fixed interval. - // truncate to the most recent multiple of self.interval. - let now = self - .interval - .mul_f64(now.div_duration_f64(self.interval).trunc()); - - let ready_at = { - // start end - // | start+n | end+n - // | / | / - // ------{o-[---------o-}--]----o---- - // now1 ^ now2 ^ ^ now3 - // - // at now1, the bucket would be completely filled if we add n tokens. - // at now2, the bucket would be partially filled if we add n tokens. - // at now3, the bucket would start completely empty before we add n tokens. - - let mut end = self.end.lock().unwrap(); - let start = *end - self.bucket_width; - let ready_at = start + n; - - if *end + n <= now { - *end = now + n; - return not_throttled; - } else if ready_at <= now { - *end += n; - return not_throttled; + let res = self + .state + .lock() + .unwrap() + .add_tokens(&self.config, now, count as f64); + match res { + Ok(()) => return throttled, + Err(ready_at) => { + throttled = true; + tokio::time::sleep_until(ready_at).await; } - - ready_at - }; - - not_throttled = false; - tokio::time::sleep_until(self.epoch + ready_at).await; + } } } } diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index c69f941c80..49ab3c967b 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -9,6 +9,7 @@ use dashmap::DashMap; use rand::{thread_rng, Rng}; use tokio::time::Instant; use tracing::info; +use utils::leaky_bucket::LeakyBucketState; use crate::intern::EndpointIdInt; @@ -17,7 +18,7 @@ pub type EndpointRateLimiter = LeakyBucketRateLimiter; pub struct LeakyBucketRateLimiter { map: DashMap, - config: LeakyBucketConfigInner, + config: utils::leaky_bucket::LeakyBucketConfig, access_count: AtomicUsize, } @@ -46,9 +47,9 @@ impl LeakyBucketRateLimiter { let mut entry = self .map .entry(key) - .or_insert_with(|| LeakyBucketState::new(now)); + .or_insert_with(|| LeakyBucketState::new(now - self.config.epoch)); - entry.check(&self.config, now, n as f64) + entry.add_tokens(&self.config, now, n as f64).is_ok() } fn do_gc(&self, now: Instant) { @@ -60,7 +61,7 @@ impl LeakyBucketRateLimiter { let shard = thread_rng().gen_range(0..n); self.map.shards()[shard] .write() - .retain(|_, value| value.get().should_retain(now)); + .retain(|_, value| !value.get().bucket_is_empty(&self.config, now)); } } @@ -77,75 +78,17 @@ impl LeakyBucketConfig { } } -struct LeakyBucketConfigInner { - /// "time cost" of a single request unit. - /// loosely represents how long it takes to handle a request unit in active CPU time. - time_cost: Duration, - bucket_width: Duration, -} - -impl From for LeakyBucketConfigInner { +impl From for utils::leaky_bucket::LeakyBucketConfig { fn from(config: LeakyBucketConfig) -> Self { // seconds_per_request = 1/(request_per_second) let spr = config.rps.recip(); - Self { + let bucket_width = Duration::from_secs_f64(config.max * spr); + utils::leaky_bucket::LeakyBucketConfig { time_cost: Duration::from_secs_f64(spr), - bucket_width: Duration::from_secs_f64(config.max * spr), - } - } -} - -struct LeakyBucketState { - /// Bucket is represented by `start..end` where `start = end - config.bucket_width`. - /// - /// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost". - /// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`. - /// If `now < start`, the bucket is considered filled and cannot accept any more tokens. - /// Draining the bucket will happen naturally as `now` moves forward. - /// - /// Let `n` be some "time cost" for the request, - /// If now is after end, the bucket is empty and the end is reset to now, - /// If now is within the `bucket window + n`, we are within time budget. - /// If now is before the `bucket window + n`, we have run out of budget. - /// - /// This is inspired by the generic cell rate algorithm (GCRA) and works - /// exactly the same as a leaky-bucket. - end: Instant, -} - -impl LeakyBucketState { - fn new(now: Instant) -> Self { - Self { end: now } - } - - fn should_retain(&self, now: Instant) -> bool { - // if self.end is after now, the bucket is not empty - now < self.end - } - - fn check(&mut self, config: &LeakyBucketConfigInner, now: Instant, n: f64) -> bool { - let start = self.end - config.bucket_width; - - let n = config.time_cost.mul_f64(n); - - // start end - // | start+n | end+n - // | / | / - // ------{o-[---------o-}--]----o---- - // now1 ^ now2 ^ ^ now3 - // - // at now1, the bucket would be completely filled if we add n tokens. - // at now2, the bucket would be partially filled if we add n tokens. - // at now3, the bucket would start completely empty before we add n tokens. - - if self.end + n <= now { - self.end = now + n; - true - } else if start + n <= now { - self.end += n; - true - } else { - false + bucket_width, + // start a bit early to avoid certain underflow issues + epoch: Instant::now() - 2 * bucket_width, + refill_rate: Duration::ZERO, } } } @@ -155,51 +98,56 @@ mod tests { use std::time::Duration; use tokio::time::Instant; + use utils::leaky_bucket::LeakyBucketState; - use super::{LeakyBucketConfig, LeakyBucketConfigInner, LeakyBucketState}; + use super::LeakyBucketConfig; #[tokio::test(start_paused = true)] async fn check() { - let config: LeakyBucketConfigInner = LeakyBucketConfig::new(500.0, 2000.0).into(); + let config: utils::leaky_bucket::LeakyBucketConfig = + LeakyBucketConfig::new(500.0, 2000.0).into(); assert_eq!(config.time_cost, Duration::from_millis(2)); assert_eq!(config.bucket_width, Duration::from_secs(4)); - let mut bucket = LeakyBucketState::new(Instant::now()); + let mut bucket = LeakyBucketState::new(Instant::now() - config.epoch); // should work for 2000 requests this second for _ in 0..2000 { - assert!(bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); } - assert!(!bucket.check(&config, Instant::now(), 1.0)); - assert_eq!(bucket.end - Instant::now(), config.bucket_width); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); + assert_eq!( + bucket.end - (Instant::now() - config.epoch), + config.bucket_width + ); // in 1ms we should drain 0.5 tokens. // make sure we don't lose any tokens tokio::time::advance(Duration::from_millis(1)).await; - assert!(!bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); tokio::time::advance(Duration::from_millis(1)).await; - assert!(bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); // in 10ms we should drain 5 tokens tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); } - assert!(!bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); // in 10s we should drain 5000 tokens // but cap is only 2000 tokio::time::advance(Duration::from_secs(10)).await; for _ in 0..2000 { - assert!(bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); } - assert!(!bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err()); // should sustain 500rps for _ in 0..2000 { tokio::time::advance(Duration::from_millis(10)).await; for _ in 0..5 { - assert!(bucket.check(&config, Instant::now(), 1.0)); + assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok()); } } }