Files
neon/libs/utils/src/leaky_bucket.rs
Conrad Ludgate a644f01b6a proxy+pageserver: shared leaky bucket impl (#8539)
In proxy I switched to a leaky-bucket impl using the GCRA algorithm. I
figured I could share the code with pageserver and remove the
leaky_bucket crate dependency with some very basic tokio timers and
queues for fairness.

The underlying algorithm should be fairly clear how it works from the
comments I have left in the code.

---

In benchmarking pageserver, @problame found that the new implementation
fixes a getpage throughput discontinuity in pageserver under the
`pagebench get-page-latest-lsn` benchmark with the clickbench dataset
(`test_perf_olap.py`).
The discontinuity is that for any of `--num-clients={2,3,4}`, getpage
throughput remains 10k.
With `--num-clients=5` and greater, getpage throughput then jumps to the
configured 20k rate limit.
With the changes in this PR, the discontinuity is gone, and we scale
throughput linearly to `--num-clients` until the configured rate limit.

More context in
https://github.com/neondatabase/cloud/issues/16886#issuecomment-2315257641.

closes https://github.com/neondatabase/cloud/issues/16886

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-08-29 11:26:52 +00:00

281 lines
9.7 KiB
Rust

//! This module implements the Generic Cell Rate Algorithm for a simplified
//! version of the Leaky Bucket rate limiting system.
//!
//! # Leaky Bucket
//!
//! If the bucket is full, no new requests are allowed and are throttled/errored.
//! If the bucket is partially full/empty, new requests are added to the bucket in
//! terms of "tokens".
//!
//! Over time, tokens are removed from the bucket, naturally allowing new requests at a steady rate.
//!
//! The bucket size tunes the burst support. The drain rate tunes the steady-rate requests per second.
//!
//! # [GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)
//!
//! GCRA is a continuous rate leaky-bucket impl that stores minimal state and requires
//! no background jobs to drain tokens, as the design utilises timestamps to drain automatically over time.
//!
//! We store an "empty_at" timestamp as the only state. As time progresses, we will naturally approach
//! the empty state. The full-bucket state is calculated from `empty_at - config.bucket_width`.
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::{sync::Mutex, time::Duration};
use tokio::{sync::Notify, time::Instant};
pub struct LeakyBucketConfig {
/// This is the "time cost" of a single request unit.
/// Should loosely represent how long it takes to handle a request unit in active resource time.
/// Loosely speaking this is the inverse of the steady-rate requests-per-second
pub cost: Duration,
/// total size of the bucket
pub bucket_width: Duration,
}
impl LeakyBucketConfig {
pub fn new(rps: f64, bucket_size: f64) -> Self {
let cost = Duration::from_secs_f64(rps.recip());
let bucket_width = cost.mul_f64(bucket_size);
Self { cost, bucket_width }
}
}
pub struct LeakyBucketState {
/// Bucket is represented by `allow_at..empty_at` where `allow_at = empty_at - config.bucket_width`.
///
/// At any given time, `empty_at - now` represents the number of tokens in the bucket, multiplied by the "time_cost".
/// Adding `n` tokens to the bucket is done by moving `empty_at` forward by `n * config.time_cost`.
/// If `now < allow_at`, the bucket is considered filled and cannot accept any more tokens.
/// Draining the bucket will happen naturally as `now` moves forward.
///
/// Let `n` be some "time cost" for the request,
/// If now is after empty_at, the bucket is empty and the empty_at is reset to now,
/// If now is within the `bucket window + n`, we are within time budget.
/// If now is before the `bucket window + n`, we have run out of budget.
///
/// This is inspired by the generic cell rate algorithm (GCRA) and works
/// exactly the same as a leaky-bucket.
pub empty_at: Instant,
}
impl LeakyBucketState {
pub fn with_initial_tokens(config: &LeakyBucketConfig, initial_tokens: f64) -> Self {
LeakyBucketState {
empty_at: Instant::now() + config.cost.mul_f64(initial_tokens),
}
}
pub fn bucket_is_empty(&self, now: Instant) -> bool {
// if self.end is after now, the bucket is not empty
self.empty_at <= now
}
/// Immediately adds tokens to the bucket, if there is space.
///
/// In a scenario where you are waiting for available rate,
/// rather than just erroring immediately, `started` corresponds to when this waiting started.
///
/// `n` is the number of tokens that will be filled in the bucket.
///
/// # Errors
///
/// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
/// there will be space again.
pub fn add_tokens(
&mut self,
config: &LeakyBucketConfig,
started: Instant,
n: f64,
) -> Result<(), Instant> {
let now = Instant::now();
// invariant: started <= now
debug_assert!(started <= now);
// If the bucket was empty when we started our search,
// we should update the `empty_at` value accordingly.
// this prevents us from having negative tokens in the bucket.
let mut empty_at = self.empty_at;
if empty_at < started {
empty_at = started;
}
let n = config.cost.mul_f64(n);
let new_empty_at = empty_at + n;
let allow_at = new_empty_at.checked_sub(config.bucket_width);
// empty_at
// allow_at | new_empty_at
// / | /
// -------o-[---------o-|--]---------
// now1 ^ now2 ^
//
// at now1, the bucket would be completely filled if we add n tokens.
// at now2, the bucket would be partially filled if we add n tokens.
match allow_at {
Some(allow_at) if now < allow_at => Err(allow_at),
_ => {
self.empty_at = new_empty_at;
Ok(())
}
}
}
}
pub struct RateLimiter {
pub config: LeakyBucketConfig,
pub state: Mutex<LeakyBucketState>,
/// a queue to provide this fair ordering.
pub queue: Notify,
}
struct Requeue<'a>(&'a Notify);
impl Drop for Requeue<'_> {
fn drop(&mut self) {
self.0.notify_one();
}
}
impl RateLimiter {
pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
RateLimiter {
state: Mutex::new(LeakyBucketState::with_initial_tokens(
&config,
initial_tokens,
)),
config,
queue: {
let queue = Notify::new();
queue.notify_one();
queue
},
}
}
pub fn steady_rps(&self) -> f64 {
self.config.cost.as_secs_f64().recip()
}
/// returns true if we did throttle
pub async fn acquire(&self, count: usize) -> bool {
let mut throttled = false;
let start = tokio::time::Instant::now();
// wait until we are the first in the queue
let mut notified = std::pin::pin!(self.queue.notified());
if !notified.as_mut().enable() {
throttled = true;
notified.await;
}
// notify the next waiter in the queue when we are done.
let _guard = Requeue(&self.queue);
loop {
let res = self
.state
.lock()
.unwrap()
.add_tokens(&self.config, start, count as f64);
match res {
Ok(()) => return throttled,
Err(ready_at) => {
throttled = true;
tokio::time::sleep_until(ready_at).await;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::time::Instant;
use super::{LeakyBucketConfig, LeakyBucketState};
#[tokio::test(start_paused = true)]
async fn check() {
let config = LeakyBucketConfig {
// average 100rps
cost: Duration::from_millis(10),
// burst up to 100 requests
bucket_width: Duration::from_millis(1000),
};
let mut state = LeakyBucketState {
empty_at: Instant::now(),
};
// supports burst
{
// should work for 100 requests this instant
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
// doesn't overfill
{
// after 1s we should have an empty bucket again.
tokio::time::advance(Duration::from_secs(1)).await;
assert!(state.bucket_is_empty(Instant::now()));
// after 1s more, we should not over count the tokens and allow more than 200 requests.
tokio::time::advance(Duration::from_secs(1)).await;
for _ in 0..100 {
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
// supports sustained rate over a long period
{
tokio::time::advance(Duration::from_secs(1)).await;
// should sustain 100rps
for _ in 0..2000 {
tokio::time::advance(Duration::from_millis(10)).await;
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
}
}
// supports requesting more tokens than can be stored in the bucket
// we just wait a little bit longer upfront.
{
// start the bucket completely empty
tokio::time::advance(Duration::from_secs(5)).await;
assert!(state.bucket_is_empty(Instant::now()));
// requesting 200 tokens of space should take 200*cost = 2s
// but we already have 1s available, so we wait 1s from start.
let start = Instant::now();
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_secs(1));
tokio::time::advance(Duration::from_millis(500)).await;
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
tokio::time::advance(Duration::from_millis(500)).await;
state.add_tokens(&config, start, 200.0).unwrap();
// bucket should be completely full now
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
assert_eq!(ready - Instant::now(), Duration::from_millis(10));
}
}
}