mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
fix limitation of requesting more than max tokens
This commit is contained in:
@@ -66,21 +66,40 @@ impl LeakyBucketState {
|
||||
config.prev_multiple_of_drain(now - config.epoch) <= self.end
|
||||
}
|
||||
|
||||
/// Immedaitely adds tokens to the bucket, if there is space.
|
||||
/// Immediately adds tokens to the bucket, if there is space.
|
||||
///
|
||||
/// In a scenario where you are waiting for available rate,
|
||||
/// rather than just erroring immediately, `started` corresponds to when this waiting started.
|
||||
///
|
||||
/// `n` is the number of tokens that will be filled in the bucket.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If there is not enough space, no tokens are added. Instead, an error is returned with the time when
|
||||
/// there will be space again.
|
||||
pub fn add_tokens(
|
||||
&mut self,
|
||||
config: &LeakyBucketConfig,
|
||||
now: Instant,
|
||||
started: Instant,
|
||||
n: f64,
|
||||
) -> Result<(), Instant> {
|
||||
let now = Instant::now();
|
||||
|
||||
// round down to the last time we would have drained the bucket.
|
||||
let now = config.prev_multiple_of_drain(now - config.epoch);
|
||||
let started = config.prev_multiple_of_drain(started - config.epoch);
|
||||
|
||||
// invariant: started <= now
|
||||
debug_assert!(started <= now);
|
||||
|
||||
// If the bucket was empty when we started our search, bump the end up accordingly.
|
||||
let mut end = self.end;
|
||||
if end < started {
|
||||
end = started;
|
||||
}
|
||||
|
||||
let n = config.cost.mul_f64(n);
|
||||
|
||||
let end_plus_n = self.end + n;
|
||||
let end_plus_n = end + n;
|
||||
let start_plus_n = end_plus_n.saturating_sub(config.bucket_width);
|
||||
|
||||
// start end
|
||||
@@ -93,10 +112,7 @@ impl LeakyBucketState {
|
||||
// at now2, the bucket would be partially filled if we add n tokens.
|
||||
// at now3, the bucket would start completely empty before we add n tokens.
|
||||
|
||||
if end_plus_n <= now {
|
||||
self.end = now + n;
|
||||
Ok(())
|
||||
} else if start_plus_n <= now {
|
||||
if start_plus_n <= now {
|
||||
self.end = end_plus_n;
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -179,5 +195,31 @@ mod tests {
|
||||
state.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// supports requesting more tokens than can be stored in the bucket
|
||||
// we just wait a little bit longer upfront.
|
||||
{
|
||||
// start the bucket completely empty
|
||||
tokio::time::advance(Duration::from_secs(5)).await;
|
||||
assert!(state.bucket_is_empty(&config, Instant::now()));
|
||||
|
||||
// requesting 200 tokens of space should take 200*cost = 2s
|
||||
// but we already have 1s available, so we wait 1s from start.
|
||||
let start = Instant::now();
|
||||
|
||||
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_secs(1));
|
||||
|
||||
tokio::time::advance(Duration::from_millis(500)).await;
|
||||
let ready = state.add_tokens(&config, start, 200.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
|
||||
tokio::time::advance(Duration::from_millis(500)).await;
|
||||
state.add_tokens(&config, start, 200.0).unwrap();
|
||||
|
||||
// bucket should be completely full now
|
||||
let ready = state.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(ready - Instant::now(), Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,6 +203,8 @@ impl RateLimiter {
|
||||
async fn acquire(&self, count: usize) -> bool {
|
||||
let mut throttled = false;
|
||||
|
||||
let start = tokio::time::Instant::now();
|
||||
|
||||
// wait until we are the first in the queue
|
||||
if let Some(queue) = &self.queue {
|
||||
let mut notified = std::pin::pin!(queue.notified());
|
||||
@@ -220,13 +222,11 @@ impl RateLimiter {
|
||||
};
|
||||
|
||||
loop {
|
||||
let now = tokio::time::Instant::now();
|
||||
|
||||
let res = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap()
|
||||
.add_tokens(&self.config, now, count as f64);
|
||||
.add_tokens(&self.config, start, count as f64);
|
||||
match res {
|
||||
Ok(()) => return throttled,
|
||||
Err(ready_at) => {
|
||||
|
||||
@@ -112,9 +112,9 @@ mod tests {
|
||||
|
||||
// should work for 2000 requests this second
|
||||
for _ in 0..2000 {
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
assert_eq!(
|
||||
bucket.end - (Instant::now() - config.epoch),
|
||||
config.bucket_width
|
||||
@@ -123,30 +123,30 @@ mod tests {
|
||||
// in 1ms we should drain 0.5 tokens.
|
||||
// make sure we don't lose any tokens
|
||||
tokio::time::advance(Duration::from_millis(1)).await;
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
tokio::time::advance(Duration::from_millis(1)).await;
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
|
||||
// in 10ms we should drain 5 tokens
|
||||
tokio::time::advance(Duration::from_millis(10)).await;
|
||||
for _ in 0..5 {
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
|
||||
// in 10s we should drain 5000 tokens
|
||||
// but cap is only 2000
|
||||
tokio::time::advance(Duration::from_secs(10)).await;
|
||||
for _ in 0..2000 {
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_err());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap_err();
|
||||
|
||||
// should sustain 500rps
|
||||
for _ in 0..2000 {
|
||||
tokio::time::advance(Duration::from_millis(10)).await;
|
||||
for _ in 0..5 {
|
||||
assert!(bucket.add_tokens(&config, Instant::now(), 1.0).is_ok());
|
||||
bucket.add_tokens(&config, Instant::now(), 1.0).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user