From 00032c9d9fff0dab5c69e612166c10e5245b43a4 Mon Sep 17 00:00:00 2001 From: Anna Khanova <32508607+khanova@users.noreply.github.com> Date: Tue, 4 Jun 2024 06:07:54 +0200 Subject: [PATCH] [proxy] Fix dynamic rate limiter (#7950) ## Problem There was a bug in dynamic rate limiter, which exhausted CPU in proxy and proxy wasn't able to accept any connections. ## Summary of changes 1. `if self.available > 1` -> `if self.available >= 1` 2. remove `timeout_at` to use just timeout 3. remove potential infinite loops which can exhaust CPUs. --- proxy/src/console/provider.rs | 4 +- proxy/src/rate_limiter/limit_algorithm.rs | 38 +++----- .../src/rate_limiter/limit_algorithm/aimd.rs | 88 ++++++++++++++++++- 3 files changed, 102 insertions(+), 28 deletions(-) diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 4d074f98a5..634ec9042c 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -452,7 +452,7 @@ pub struct ApiLocks { #[derive(Debug, thiserror::Error)] pub enum ApiLockError { - #[error("permit could not be acquired")] + #[error("timeout acquiring resource permit")] TimeoutError(#[from] tokio::time::error::Elapsed), } @@ -504,7 +504,7 @@ impl ApiLocks { .clone() } }; - let permit = semaphore.acquire_deadline(now + self.timeout).await; + let permit = semaphore.acquire_timeout(self.timeout).await; self.metrics .semaphore_acquire_seconds diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 072fdb80b0..3842ce269e 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -3,7 +3,7 @@ use parking_lot::Mutex; use std::{pin::pin, sync::Arc, time::Duration}; use tokio::{ sync::Notify, - time::{error::Elapsed, timeout_at, Instant}, + time::{error::Elapsed, Instant}, }; use self::aimd::Aimd; @@ -80,7 +80,7 @@ pub struct LimiterInner { } impl LimiterInner { - fn update(&mut self, latency: Duration, outcome: Option) { + fn update_limit(&mut self, latency: Duration, outcome: Option) { if let Some(outcome) = outcome { let sample = Sample { latency, @@ -92,12 +92,12 @@ impl LimiterInner { } fn take(&mut self, ready: &Notify) -> Option<()> { - if self.available > 1 { + if self.available >= 1 { self.available -= 1; self.in_flight += 1; // tell the next in the queue that there is a permit ready - if self.available > 1 { + if self.available >= 1 { ready.notify_one(); } Some(()) @@ -157,16 +157,12 @@ impl DynamicLimiter { } /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available. - /// - /// Returns `None` if there are none available after `duration`. pub async fn acquire_timeout(self: &Arc, duration: Duration) -> Result { - self.acquire_deadline(Instant::now() + duration).await + tokio::time::timeout(duration, self.acquire()).await? } - /// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available. - /// - /// Returns `None` if there are none available after `deadline`. - pub async fn acquire_deadline(self: &Arc, deadline: Instant) -> Result { + /// Try to acquire a concurrency [Token]. + async fn acquire(self: &Arc) -> Result { if self.config.initial_limit == 0 { // If the rate limiter is disabled, we can always acquire a token. Ok(Token::disabled()) @@ -174,22 +170,16 @@ impl DynamicLimiter { let mut notified = pin!(self.ready.notified()); let mut ready = notified.as_mut().enable(); loop { - let mut limit = None; if ready { let mut inner = self.inner.lock(); if inner.take(&self.ready).is_some() { break Ok(Token::new(self.clone())); - } - limit = Some(inner.limit); - } - match timeout_at(deadline, notified.as_mut()).await { - Ok(()) => ready = true, - Err(e) => { - let limit = limit.unwrap_or_else(|| self.inner.lock().limit); - tracing::info!(limit, "could not acquire token in time"); - break Err(e); + } else { + notified.set(self.ready.notified()); } } + notified.as_mut().await; + ready = true; } } } @@ -208,14 +198,14 @@ impl DynamicLimiter { let mut inner = self.inner.lock(); - inner.update(start.elapsed(), outcome); + inner.update_limit(start.elapsed(), outcome); + + inner.in_flight -= 1; if inner.in_flight < inner.limit { inner.available = inner.limit - inner.in_flight; // At least 1 permit is now available self.ready.notify_one(); } - - inner.in_flight -= 1; } /// The current state of the limiter. diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index 370d4be802..ccc9c42420 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -51,7 +51,9 @@ impl LimitAlgorithm for Aimd { // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1 let limit = limit.floor() as usize; - limit.clamp(self.min, self.max) + let limit = limit.clamp(self.min, self.max); + tracing::info!(limit, "limit decreased"); + limit } } } @@ -67,6 +69,53 @@ mod tests { use super::*; + #[tokio::test(start_paused = true)] + async fn increase_decrease() { + let config = RateLimiterConfig { + initial_limit: 1, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 2, + inc: 10, + dec: 0.5, + utilisation: 0.8, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + token.release(Outcome::Success); + + assert_eq!(limiter.state().limit(), 2); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + token.release(Outcome::Success); + assert_eq!(limiter.state().limit(), 2); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + token.release(Outcome::Overload); + assert_eq!(limiter.state().limit(), 1); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + token.release(Outcome::Overload); + assert_eq!(limiter.state().limit(), 1); + } + #[tokio::test(start_paused = true)] async fn should_decrease_limit_on_overload() { let config = RateLimiterConfig { @@ -85,7 +134,7 @@ mod tests { let limiter = DynamicLimiter::new(config); let token = limiter - .acquire_timeout(Duration::from_millis(1)) + .acquire_timeout(Duration::from_millis(100)) .await .unwrap(); token.release(Outcome::Overload); @@ -93,6 +142,41 @@ mod tests { assert_eq!(limiter.state().limit(), 5, "overload: decrease"); } + #[tokio::test(start_paused = true)] + async fn acquire_timeout_times_out() { + let config = RateLimiterConfig { + initial_limit: 1, + algorithm: RateLimitAlgorithm::Aimd { + conf: Aimd { + min: 1, + max: 2, + inc: 10, + dec: 0.5, + utilisation: 0.8, + }, + }, + }; + + let limiter = DynamicLimiter::new(config); + + let token = limiter + .acquire_timeout(Duration::from_millis(1)) + .await + .unwrap(); + let now = tokio::time::Instant::now(); + limiter + .acquire_timeout(Duration::from_secs(1)) + .await + .err() + .unwrap(); + + assert!(now.elapsed() >= Duration::from_secs(1)); + + token.release(Outcome::Success); + + assert_eq!(limiter.state().limit(), 2); + } + #[tokio::test(start_paused = true)] async fn should_increase_limit_on_success_when_using_gt_util_threshold() { let config = RateLimiterConfig {