Compare commits

...

3 Commits

Author SHA1 Message Date
Conrad Ludgate
b0411e612a use flag-bearer 2025-04-02 12:51:27 +01:00
Conrad Ludgate
0579533ac8 remove dynamic limiter internal Box<dyn> 2025-04-02 12:45:04 +01:00
Conrad Ludgate
99c0da9607 proxy: simplify dynamic limiter impl 2025-04-02 12:40:45 +01:00
3 changed files with 83 additions and 71 deletions

28
Cargo.lock generated
View File

@@ -2248,6 +2248,17 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flag-bearer"
version = "0.1.0-rc.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8feaa1b7a5ad6e6dd7791d42d36c1a25004f1c25eae9ab7b904c864109d8260"
dependencies = [
"parking_lot 0.12.1",
"pin-list",
"pin-project-lite",
]
[[package]]
name = "flagset"
version = "0.4.6"
@@ -4554,6 +4565,16 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-list"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3c0987a7464afc0d593f13429732ef87e9a6c8e7909a1a22faeff7e1d2159d"
dependencies = [
"pin-project-lite",
"pinned-aliasable",
]
[[package]]
name = "pin-project"
version = "1.1.9"
@@ -4586,6 +4607,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pinned-aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0f9ae89bf0ed03b69ac1f3f7ea2e6e09b4fa5448011df2e67d581c2b850b7b"
[[package]]
name = "pkcs1"
version = "0.7.5"
@@ -5079,6 +5106,7 @@ dependencies = [
"ed25519-dalek",
"env_logger",
"fallible-iterator",
"flag-bearer",
"flate2",
"framed-websockets",
"futures",

View File

@@ -103,6 +103,7 @@ uuid.workspace = true
x509-cert.workspace = true
redis.workspace = true
zerocopy.workspace = true
flag-bearer = { version = "0.1.0-rc.4" }
# jwt stuff
jose-jwa = "0.1.2"

View File

@@ -1,10 +1,8 @@
//! Algorithms for controlling concurrency limits.
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::Notify;
use flag_bearer::{Semaphore, SemaphoreState, Uncloseable};
use tokio::time::Instant;
use tokio::time::error::Elapsed;
@@ -65,23 +63,13 @@ pub struct RateLimiterConfig {
pub(crate) initial_limit: usize,
}
impl RateLimiterConfig {
pub(crate) fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub(crate) struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
pub(crate) struct LimiterInner<L: ?Sized> {
limit: usize,
in_flight: usize,
alg: L,
}
impl LimiterInner {
impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
@@ -92,21 +80,26 @@ impl LimiterInner {
self.limit = self.alg.update(self.limit, sample);
}
}
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available >= 1 {
self.available -= 1;
impl<L: ?Sized> SemaphoreState for LimiterInner<L> {
type Params = ();
type Permit = ();
type Closeable = Uncloseable;
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
if self.in_flight < self.limit {
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available >= 1 {
ready.notify_one();
}
Some(())
Ok(p)
} else {
None
Err(p)
}
}
fn release(&mut self, _p: Self::Permit) {
self.in_flight -= 1;
}
}
/// Limits the number of concurrent jobs.
@@ -116,11 +109,9 @@ impl LimiterInner {
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub(crate) struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
// to notify when a token is available
ready: Notify,
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
disabled: bool,
sem: Semaphore<LimiterInner<L>>,
}
/// A concurrency token, required to run a job.
@@ -140,22 +131,27 @@ struct LimiterState {
limit: usize,
}
impl<L> DynamicLimiter<L> {
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
Arc::new(Self {
disabled: initial_limit == 0,
sem: Semaphore::new_fifo(LimiterInner {
limit: initial_limit,
in_flight: 0,
alg,
}),
})
}
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub(crate) fn new(config: RateLimiterConfig) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
in_flight: 0,
}),
ready,
config,
})
let initial_limit = config.initial_limit;
match config.algorithm {
RateLimitAlgorithm::Fixed => DynamicLimiter::new_inner(initial_limit, Fixed),
RateLimitAlgorithm::Aimd { conf } => DynamicLimiter::new_inner(initial_limit, conf),
}
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
@@ -163,27 +159,21 @@ impl DynamicLimiter {
self: &Arc<Self>,
duration: Duration,
) -> Result<Token, Elapsed> {
tokio::time::timeout(duration, self.acquire()).await?
tokio::time::timeout(duration, self.acquire()).await
}
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
async fn acquire(self: &Arc<Self>) -> Token {
if self.disabled {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
}
notified.set(self.ready.notified());
match self.sem.acquire(()).await {
Err(close) => close.never(),
Ok(permit) => {
permit.take();
Token::new(self.clone())
}
notified.as_mut().await;
ready = true;
}
}
}
@@ -200,27 +190,20 @@ impl DynamicLimiter {
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
if self.disabled {
return;
}
let mut inner = self.inner.lock();
inner.update_limit(start.elapsed(), outcome);
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}
self.sem.with_state(|s| {
s.update_limit(start.elapsed(), outcome);
s.release(());
});
}
/// The current state of the limiter.
#[cfg(test)]
fn state(&self) -> LimiterState {
let inner = self.inner.lock();
LimiterState { limit: inner.limit }
self.sem.with_state(|s| LimiterState { limit: s.limit })
}
}