diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index a2f3649e9f..342cbf31c3 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -35,7 +35,6 @@ pub struct Throttle { pub struct Inner { task_kinds: EnumSet, rate_limiter: Arc, - config: Config, } pub type Config = pageserver_api::models::ThrottleConfig; @@ -79,7 +78,7 @@ where refill_amount, max, fair, - } = &config; + } = config; let task_kinds: EnumSet = task_kinds .iter() .filter_map(|s| match TaskKind::from_str(s) { @@ -98,15 +97,14 @@ where task_kinds, rate_limiter: Arc::new( RateLimiterBuilder { - initial: *initial, - interval: *refill_interval, + initial, + refill_interval, refill: refill_amount.get(), - max: *max, - fair: *fair, + max, + fair, } .build(), ), - config, } } pub fn reconfigure(&self, config: Config) { @@ -129,7 +127,7 @@ where /// See [`Config::steady_rps`]. pub fn steady_rps(&self) -> f64 { - self.inner.load().config.steady_rps() + self.inner.load().rate_limiter.steady_rps() } pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option { @@ -171,13 +169,17 @@ where } struct RateLimiter { + epoch: tokio::time::Instant, + /// "time cost" of a single request unit. /// loosely represents how long it takes to handle a request unit in active CPU time. time_cost: Duration, bucket_width: Duration, - /// Bucket is represented by `start..end` where `start = end - config.bucket_width`. + interval: Duration, + + /// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`. /// /// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost". /// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`. @@ -191,7 +193,7 @@ struct RateLimiter { /// /// This is inspired by the generic cell rate algorithm (GCRA) and works /// exactly the same as a leaky-bucket. - end: Mutex, + end: Mutex, queue: Option, } @@ -204,7 +206,7 @@ struct RateLimiterBuilder { /// Tokens to add every `per` duration. refill: usize, /// Interval to add tokens in milliseconds. - interval: Duration, + refill_interval: Duration, /// If the rate limiter is fair or not. fair: bool, } @@ -213,14 +215,16 @@ impl RateLimiterBuilder { fn build(self) -> RateLimiter { let queue = self.fair.then(Notify::new); - let time_cost = self.interval / self.refill as u32; + let time_cost = self.refill_interval / self.refill as u32; let bucket_width = time_cost * (self.max as u32); let initial_allow = time_cost * (self.initial as u32); - let end = tokio::time::Instant::now() + bucket_width - initial_allow; + let end = bucket_width - initial_allow; RateLimiter { + epoch: tokio::time::Instant::now(), time_cost, bucket_width, + interval: self.refill_interval, end: Mutex::new(end), queue, } @@ -228,6 +232,10 @@ impl RateLimiterBuilder { } impl RateLimiter { + fn steady_rps(&self) -> f64 { + self.time_cost.as_secs_f64().recip() + } + /// returns true if not throttled async fn acquire(&self, count: usize) -> bool { let mut not_throttled = true; @@ -252,6 +260,14 @@ impl RateLimiter { loop { let now = tokio::time::Instant::now(); + let now = now - self.epoch; + + // we only "add" new tokens on a fixed interval. + // truncate to the most recent multiple of self.interval. + let now = self + .interval + .mul_f64(now.div_duration_f64(self.interval).trunc()); + let ready_at = { // start end // | start+n | end+n @@ -279,7 +295,7 @@ impl RateLimiter { }; not_throttled = false; - tokio::time::sleep_until(ready_at).await; + tokio::time::sleep_until(self.epoch + ready_at).await; } } }