From 99c0da9607a5323b909bb2d313f510332a753909 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 2 Apr 2025 12:40:45 +0100 Subject: [PATCH] proxy: simplify dynamic limiter impl --- proxy/src/rate_limiter/limit_algorithm.rs | 26 ++++++++++------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index f8eeb89f05..181b1626a7 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -76,7 +76,6 @@ impl RateLimiterConfig { pub(crate) struct LimiterInner { alg: Box, - available: usize, limit: usize, in_flight: usize, } @@ -94,14 +93,13 @@ impl LimiterInner { } fn take(&mut self, ready: &Notify) -> Option<()> { - if self.available >= 1 { - self.available -= 1; + if self.in_flight < self.limit { self.in_flight += 1; - // tell the next in the queue that there is a permit ready - if self.available >= 1 { + if self.in_flight < self.limit { ready.notify_one(); } + Some(()) } else { None @@ -117,7 +115,7 @@ impl LimiterInner { /// The limit will be automatically adjusted based on observed latency (delay) and/or failures /// caused by overload (loss). pub(crate) struct DynamicLimiter { - config: RateLimiterConfig, + disabled: bool, inner: Mutex, // to notify when a token is available ready: Notify, @@ -147,14 +145,13 @@ impl DynamicLimiter { ready.notify_one(); Arc::new(Self { + disabled: config.initial_limit == 0, inner: Mutex::new(LimiterInner { alg: config.create_rate_limit_algorithm(), - available: config.initial_limit, limit: config.initial_limit, in_flight: 0, }), ready, - config, }) } @@ -163,14 +160,14 @@ impl DynamicLimiter { self: &Arc, duration: Duration, ) -> Result { - tokio::time::timeout(duration, self.acquire()).await? + tokio::time::timeout(duration, self.acquire()).await } /// Try to acquire a concurrency [Token]. - async fn acquire(self: &Arc) -> Result { - if self.config.initial_limit == 0 { + async fn acquire(self: &Arc) -> Token { + if self.disabled { // If the rate limiter is disabled, we can always acquire a token. - Ok(Token::disabled()) + Token::disabled() } else { let mut notified = pin!(self.ready.notified()); let mut ready = notified.as_mut().enable(); @@ -178,7 +175,7 @@ impl DynamicLimiter { if ready { let mut inner = self.inner.lock(); if inner.take(&self.ready).is_some() { - break Ok(Token::new(self.clone())); + break Token::new(self.clone()); } notified.set(self.ready.notified()); } @@ -200,7 +197,7 @@ impl DynamicLimiter { } else { tracing::debug!("outcome is {:?}", outcome); } - if self.config.initial_limit == 0 { + if self.disabled { return; } @@ -210,7 +207,6 @@ impl DynamicLimiter { inner.in_flight -= 1; if inner.in_flight < inner.limit { - inner.available = inner.limit - inner.in_flight; // At least 1 permit is now available self.ready.notify_one(); }