mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
utils: leaky bucket should only report throttled if the notify queue is blocked on sleep (#9072)
## Problem Seems that PS might be too eager in reporting throttled tasks ## Summary of changes Introduce a sleep counter. If the sleep counter increases, then the acquire tasks was throttled.
This commit is contained in:
@@ -21,7 +21,13 @@
|
||||
//!
|
||||
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
|
||||
|
||||
use std::{sync::Mutex, time::Duration};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicU64, Ordering},
|
||||
Mutex,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::{sync::Notify, time::Instant};
|
||||
|
||||
@@ -128,6 +134,7 @@ impl LeakyBucketState {
|
||||
|
||||
pub struct RateLimiter {
|
||||
pub config: LeakyBucketConfig,
|
||||
pub sleep_counter: AtomicU64,
|
||||
pub state: Mutex<LeakyBucketState>,
|
||||
/// a queue to provide this fair ordering.
|
||||
pub queue: Notify,
|
||||
@@ -144,6 +151,7 @@ impl Drop for Requeue<'_> {
|
||||
impl RateLimiter {
|
||||
pub fn with_initial_tokens(config: LeakyBucketConfig, initial_tokens: f64) -> Self {
|
||||
RateLimiter {
|
||||
sleep_counter: AtomicU64::new(0),
|
||||
state: Mutex::new(LeakyBucketState::with_initial_tokens(
|
||||
&config,
|
||||
initial_tokens,
|
||||
@@ -163,15 +171,16 @@ impl RateLimiter {
|
||||
|
||||
/// returns true if we did throttle
|
||||
pub async fn acquire(&self, count: usize) -> bool {
|
||||
let mut throttled = false;
|
||||
|
||||
let start = tokio::time::Instant::now();
|
||||
|
||||
let start_count = self.sleep_counter.load(Ordering::Acquire);
|
||||
let mut end_count = start_count;
|
||||
|
||||
// wait until we are the first in the queue
|
||||
let mut notified = std::pin::pin!(self.queue.notified());
|
||||
if !notified.as_mut().enable() {
|
||||
throttled = true;
|
||||
notified.await;
|
||||
end_count = self.sleep_counter.load(Ordering::Acquire);
|
||||
}
|
||||
|
||||
// notify the next waiter in the queue when we are done.
|
||||
@@ -184,9 +193,22 @@ impl RateLimiter {
|
||||
.unwrap()
|
||||
.add_tokens(&self.config, start, count as f64);
|
||||
match res {
|
||||
Ok(()) => return throttled,
|
||||
Ok(()) => return end_count > start_count,
|
||||
Err(ready_at) => {
|
||||
throttled = true;
|
||||
struct Increment<'a>(&'a AtomicU64);
|
||||
|
||||
impl Drop for Increment<'_> {
|
||||
fn drop(&mut self) {
|
||||
self.0.fetch_add(1, Ordering::AcqRel);
|
||||
}
|
||||
}
|
||||
|
||||
// increment the counter after we finish sleeping (or cancel this task).
|
||||
// this ensures that tasks that have already started the acquire will observe
|
||||
// the new sleep count when they are allowed to resume on the notify.
|
||||
let _inc = Increment(&self.sleep_counter);
|
||||
end_count += 1;
|
||||
|
||||
tokio::time::sleep_until(ready_at).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user