proxy: simplify dynamic limiter impl

This commit is contained in:
Conrad Ludgate
2025-04-02 12:40:45 +01:00
parent c179d098ef
commit 99c0da9607

View File

@@ -76,7 +76,6 @@ impl RateLimiterConfig {
pub(crate) struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
limit: usize,
in_flight: usize,
}
@@ -94,14 +93,13 @@ impl LimiterInner {
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available >= 1 {
self.available -= 1;
if self.in_flight < self.limit {
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available >= 1 {
if self.in_flight < self.limit {
ready.notify_one();
}
Some(())
} else {
None
@@ -117,7 +115,7 @@ impl LimiterInner {
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub(crate) struct DynamicLimiter {
config: RateLimiterConfig,
disabled: bool,
inner: Mutex<LimiterInner>,
// to notify when a token is available
ready: Notify,
@@ -147,14 +145,13 @@ impl DynamicLimiter {
ready.notify_one();
Arc::new(Self {
disabled: config.initial_limit == 0,
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
in_flight: 0,
}),
ready,
config,
})
}
@@ -163,14 +160,14 @@ impl DynamicLimiter {
self: &Arc<Self>,
duration: Duration,
) -> Result<Token, Elapsed> {
tokio::time::timeout(duration, self.acquire()).await?
tokio::time::timeout(duration, self.acquire()).await
}
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
async fn acquire(self: &Arc<Self>) -> Token {
if self.disabled {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
@@ -178,7 +175,7 @@ impl DynamicLimiter {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
break Token::new(self.clone());
}
notified.set(self.ready.notified());
}
@@ -200,7 +197,7 @@ impl DynamicLimiter {
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
if self.disabled {
return;
}
@@ -210,7 +207,6 @@ impl DynamicLimiter {
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}