add quantization

This commit is contained in:
Conrad Ludgate
2024-07-29 13:03:17 +01:00
parent 60c3e1347f
commit 40d239560f

View File

@@ -35,7 +35,6 @@ pub struct Throttle<M: Metric> {
pub struct Inner {
task_kinds: EnumSet<TaskKind>,
rate_limiter: Arc<RateLimiter>,
config: Config,
}
pub type Config = pageserver_api::models::ThrottleConfig;
@@ -79,7 +78,7 @@ where
refill_amount,
max,
fair,
} = &config;
} = config;
let task_kinds: EnumSet<TaskKind> = task_kinds
.iter()
.filter_map(|s| match TaskKind::from_str(s) {
@@ -98,15 +97,14 @@ where
task_kinds,
rate_limiter: Arc::new(
RateLimiterBuilder {
initial: *initial,
interval: *refill_interval,
initial,
refill_interval,
refill: refill_amount.get(),
max: *max,
fair: *fair,
max,
fair,
}
.build(),
),
config,
}
}
pub fn reconfigure(&self, config: Config) {
@@ -129,7 +127,7 @@ where
/// See [`Config::steady_rps`].
pub fn steady_rps(&self) -> f64 {
self.inner.load().config.steady_rps()
self.inner.load().rate_limiter.steady_rps()
}
pub async fn throttle(&self, ctx: &RequestContext, key_count: usize) -> Option<Duration> {
@@ -171,13 +169,17 @@ where
}
struct RateLimiter {
epoch: tokio::time::Instant,
/// "time cost" of a single request unit.
/// loosely represents how long it takes to handle a request unit in active CPU time.
time_cost: Duration,
bucket_width: Duration,
/// Bucket is represented by `start..end` where `start = end - config.bucket_width`.
interval: Duration,
/// Bucket is represented by `start..end` where `end = epoch + end` and `start = end - config.bucket_width`.
///
/// At any given time, `end - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `end` forward by `n * config.time_cost`.
@@ -191,7 +193,7 @@ struct RateLimiter {
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
end: Mutex<tokio::time::Instant>,
end: Mutex<Duration>,
queue: Option<Notify>,
}
@@ -204,7 +206,7 @@ struct RateLimiterBuilder {
/// Tokens to add every `per` duration.
refill: usize,
/// Interval to add tokens in milliseconds.
interval: Duration,
refill_interval: Duration,
/// If the rate limiter is fair or not.
fair: bool,
}
@@ -213,14 +215,16 @@ impl RateLimiterBuilder {
fn build(self) -> RateLimiter {
let queue = self.fair.then(Notify::new);
let time_cost = self.interval / self.refill as u32;
let time_cost = self.refill_interval / self.refill as u32;
let bucket_width = time_cost * (self.max as u32);
let initial_allow = time_cost * (self.initial as u32);
let end = tokio::time::Instant::now() + bucket_width - initial_allow;
let end = bucket_width - initial_allow;
RateLimiter {
epoch: tokio::time::Instant::now(),
time_cost,
bucket_width,
interval: self.refill_interval,
end: Mutex::new(end),
queue,
}
@@ -228,6 +232,10 @@ impl RateLimiterBuilder {
}
impl RateLimiter {
fn steady_rps(&self) -> f64 {
self.time_cost.as_secs_f64().recip()
}
/// returns true if not throttled
async fn acquire(&self, count: usize) -> bool {
let mut not_throttled = true;
@@ -252,6 +260,14 @@ impl RateLimiter {
loop {
let now = tokio::time::Instant::now();
let now = now - self.epoch;
// we only "add" new tokens on a fixed interval.
// truncate to the most recent multiple of self.interval.
let now = self
.interval
.mul_f64(now.div_duration_f64(self.interval).trunc());
let ready_at = {
// start end
// | start+n | end+n
@@ -279,7 +295,7 @@ impl RateLimiter {
};
not_throttled = false;
tokio::time::sleep_until(ready_at).await;
tokio::time::sleep_until(self.epoch + ready_at).await;
}
}
}