diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 181b1626a7..908a046215 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -65,22 +65,13 @@ pub struct RateLimiterConfig { pub(crate) initial_limit: usize, } -impl RateLimiterConfig { - pub(crate) fn create_rate_limit_algorithm(self) -> Box { - match self.algorithm { - RateLimitAlgorithm::Fixed => Box::new(Fixed), - RateLimitAlgorithm::Aimd { conf } => Box::new(conf), - } - } -} - -pub(crate) struct LimiterInner { - alg: Box, +pub(crate) struct LimiterInner { limit: usize, in_flight: usize, + alg: L, } -impl LimiterInner { +impl LimiterInner { fn update_limit(&mut self, latency: Duration, outcome: Option) { if let Some(outcome) = outcome { let sample = Sample { @@ -114,11 +105,11 @@ impl LimiterInner { /// /// The limit will be automatically adjusted based on observed latency (delay) and/or failures /// caused by overload (loss). -pub(crate) struct DynamicLimiter { +pub(crate) struct DynamicLimiter { disabled: bool, - inner: Mutex, // to notify when a token is available ready: Notify, + inner: Mutex>, } /// A concurrency token, required to run a job. @@ -138,22 +129,32 @@ struct LimiterState { limit: usize, } -impl DynamicLimiter { - /// Create a limiter with a given limit control algorithm. - pub(crate) fn new(config: RateLimiterConfig) -> Arc { +impl DynamicLimiter { + pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc { let ready = Notify::new(); ready.notify_one(); Arc::new(Self { - disabled: config.initial_limit == 0, + disabled: initial_limit == 0, inner: Mutex::new(LimiterInner { - alg: config.create_rate_limit_algorithm(), - limit: config.initial_limit, + limit: initial_limit, in_flight: 0, + alg, }), ready, }) } +} + +impl DynamicLimiter { + /// Create a limiter with a given limit control algorithm. + pub(crate) fn new(config: RateLimiterConfig) -> Arc { + let initial_limit = config.initial_limit; + match config.algorithm { + RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed), + RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf), + } + } /// Try to acquire a concurrency [Token], waiting for `duration` if there are none available. pub(crate) async fn acquire_timeout(