use flag-bearer

This commit is contained in:
Conrad Ludgate
2025-04-01 14:21:35 +01:00
parent 0579533ac8
commit b0411e612a
3 changed files with 55 additions and 40 deletions

28
Cargo.lock generated
View File

@@ -2248,6 +2248,17 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flag-bearer"
version = "0.1.0-rc.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8feaa1b7a5ad6e6dd7791d42d36c1a25004f1c25eae9ab7b904c864109d8260"
dependencies = [
"parking_lot 0.12.1",
"pin-list",
"pin-project-lite",
]
[[package]]
name = "flagset"
version = "0.4.6"
@@ -4554,6 +4565,16 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-list"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3c0987a7464afc0d593f13429732ef87e9a6c8e7909a1a22faeff7e1d2159d"
dependencies = [
"pin-project-lite",
"pinned-aliasable",
]
[[package]]
name = "pin-project"
version = "1.1.9"
@@ -4586,6 +4607,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pinned-aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d0f9ae89bf0ed03b69ac1f3f7ea2e6e09b4fa5448011df2e67d581c2b850b7b"
[[package]]
name = "pkcs1"
version = "0.7.5"
@@ -5079,6 +5106,7 @@ dependencies = [
"ed25519-dalek",
"env_logger",
"fallible-iterator",
"flag-bearer",
"flate2",
"framed-websockets",
"futures",

View File

@@ -103,6 +103,7 @@ uuid.workspace = true
x509-cert.workspace = true
redis.workspace = true
zerocopy.workspace = true
flag-bearer = { version = "0.1.0-rc.4" }
# jwt stuff
jose-jwa = "0.1.2"

View File

@@ -1,10 +1,8 @@
//! Algorithms for controlling concurrency limits.
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use tokio::sync::Notify;
use flag_bearer::{Semaphore, SemaphoreState, Uncloseable};
use tokio::time::Instant;
use tokio::time::error::Elapsed;
@@ -82,20 +80,26 @@ impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
self.limit = self.alg.update(self.limit, sample);
}
}
}
fn take(&mut self, ready: &Notify) -> Option<()> {
impl<L: ?Sized> SemaphoreState for LimiterInner<L> {
type Params = ();
type Permit = ();
type Closeable = Uncloseable;
fn acquire(&mut self, p: Self::Params) -> Result<Self::Permit, Self::Params> {
if self.in_flight < self.limit {
self.in_flight += 1;
if self.in_flight < self.limit {
ready.notify_one();
}
Some(())
Ok(p)
} else {
None
Err(p)
}
}
fn release(&mut self, _p: Self::Permit) {
self.in_flight -= 1;
}
}
/// Limits the number of concurrent jobs.
@@ -107,9 +111,7 @@ impl<L: ?Sized + LimitAlgorithm> LimiterInner<L> {
/// caused by overload (loss).
pub(crate) struct DynamicLimiter<L: ?Sized = dyn LimitAlgorithm> {
disabled: bool,
// to notify when a token is available
ready: Notify,
inner: Mutex<LimiterInner<L>>,
sem: Semaphore<LimiterInner<L>>,
}
/// A concurrency token, required to run a job.
@@ -131,17 +133,13 @@ struct LimiterState {
impl<L> DynamicLimiter<L> {
pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
disabled: initial_limit == 0,
inner: Mutex::new(LimiterInner {
sem: Semaphore::new_fifo(LimiterInner {
limit: initial_limit,
in_flight: 0,
alg,
}),
ready,
})
}
}
@@ -170,18 +168,12 @@ impl DynamicLimiter {
// If the rate limiter is disabled, we can always acquire a token.
Token::disabled()
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Token::new(self.clone());
}
notified.set(self.ready.notified());
match self.sem.acquire(()).await {
Err(close) => close.never(),
Ok(permit) => {
permit.take();
Token::new(self.clone())
}
notified.as_mut().await;
ready = true;
}
}
}
@@ -202,22 +194,16 @@ impl DynamicLimiter {
return;
}
let mut inner = self.inner.lock();
inner.update_limit(start.elapsed(), outcome);
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
// At least 1 permit is now available
self.ready.notify_one();
}
self.sem.with_state(|s| {
s.update_limit(start.elapsed(), outcome);
s.release(());
});
}
/// The current state of the limiter.
#[cfg(test)]
fn state(&self) -> LimiterState {
let inner = self.inner.lock();
LimiterState { limit: inner.limit }
self.sem.with_state(|s| LimiterState { limit: s.limit })
}
}