Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
0579533ac8 remove dynamic limiter internal Box<dyn> 2025-04-02 12:45:04 +01:00
Conrad Ludgate
99c0da9607 proxy: simplify dynamic limiter impl 2025-04-02 12:40:45 +01:00

View File

@@ -65,23 +65,13 @@ pub struct RateLimiterConfig {
pub(crate) initial_limit: usize,
}
impl RateLimiterConfig {
pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub(crate) struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
pub(crate) struct LimiterInner<L: ?Sized> {
limit: usize,
in_flight: usize,
alg: L,
}
impl LimiterInner {
impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
@@ -94,14 +84,13 @@ impl LimiterInner {
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available >= 1 {
self.available -= 1;
if self.in_flight < self.limit {
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available >= 1 {
if self.in_flight < self.limit {
ready.notify_one();
}
Some(())
} else {
None
@@ -116,11 +105,11 @@ impl LimiterInner {
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub(crate) struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
disabled: bool,
// to notify when a token is available
ready: Notify,
inner: Mutex<LimiterInner<L>>,
}
/// A concurrency token, required to run a job.
@@ -140,37 +129,46 @@ struct LimiterState {
limit: usize,
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
impl<L> DynamicLimiter<L> {
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
disabled: initial_limit == 0,
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
limit: initial_limit,
in_flight: 0,
alg,
}),
ready,
config,
})
}
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
let initial_limit = config.initial_limit;
match config.algorithm {
RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed),
RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf),
}
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
pub(crate) async fn acquire_timeout(
self: &Arc<Self>,
duration: Duration,
) -> Result<Token, Elapsed> {
tokio::time::timeout(duration, self.acquire()).await?
tokio::time::timeout(duration, self.acquire()).await
}
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
async fn acquire(self: &Arc<Self>) -> Token {
if self.disabled {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
@@ -178,7 +176,7 @@ impl DynamicLimiter {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
break Token::new(self.clone());
}
notified.set(self.ready.notified());
}
@@ -200,7 +198,7 @@ impl DynamicLimiter {
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
if self.disabled {
return;
}
@@ -210,7 +208,6 @@ impl DynamicLimiter {
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}