From b0411e612a9bd2b5a632f26895abbff66dfefb31 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 1 Apr 2025 14:21:35 +0100 Subject: [PATCH] use flag-bearer --- Cargo.lock | 28 ++++++++++ proxy/Cargo.toml | 1 + proxy/src/rate_limiter/limit_algorithm.rs | 66 +++++++++-------------- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 194ad90d52..58b402d38e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,6 +2248,17 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flag-bearer" +version = "0.1.0-rc.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8feaa1b7a5ad6e6dd7791d42d36c1a25004f1c25eae9ab7b904c864109d8260" +dependencies = [ + "parking_lot 0.12.1", + "pin-list", + "pin-project-lite", +] + [[package]] name = "flagset" version = "0.4.6" @@ -4554,6 +4565,16 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-list" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3c0987a7464afc0d593f13429732ef87e9a6c8e7909a1a22faeff7e1d2159d" +dependencies = [ + "pin-project-lite", + "pinned-aliasable", +] + [[package]] name = "pin-project" version = "1.1.9" @@ -4586,6 +4607,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pinned-aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0f9ae89bf0ed03b69ac1f3f7ea2e6e09b4fa5448011df2e67d581c2b850b7b" + [[package]] name = "pkcs1" version = "0.7.5" @@ -5079,6 +5106,7 @@ dependencies = [ "ed25519-dalek", "env_logger", "fallible-iterator", + "flag-bearer", "flate2", "framed-websockets", "futures", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 2cec510d82..ec9edf994c 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -103,6 +103,7 @@ uuid.workspace = true x509-cert.workspace = true redis.workspace = true zerocopy.workspace = true +flag-bearer = { version = "0.1.0-rc.4" } # jwt stuff jose-jwa = "0.1.2" diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 908a046215..59e53b2720 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -1,10 +1,8 @@ //! Algorithms for controlling concurrency limits. -use std::pin::pin; use std::sync::Arc; use std::time::Duration; -use parking_lot::Mutex; -use tokio::sync::Notify; +use flag_bearer::{Semaphore, SemaphoreState, Uncloseable}; use tokio::time::Instant; use tokio::time::error::Elapsed; @@ -82,20 +80,26 @@ impl LimiterInner { self.limit = self.alg.update(self.limit, sample); } } +} - fn take(&mut self, ready: &Notify) -> Option<()> { +impl SemaphoreState for LimiterInner { + type Params = (); + type Permit = (); + type Closeable = Uncloseable; + + fn acquire(&mut self, p: Self::Params) -> Result { if self.in_flight < self.limit { self.in_flight += 1; - if self.in_flight < self.limit { - ready.notify_one(); - } - - Some(()) + Ok(p) } else { - None + Err(p) } } + + fn release(&mut self, _p: Self::Permit) { + self.in_flight -= 1; + } } /// Limits the number of concurrent jobs. @@ -107,9 +111,7 @@ impl LimiterInner { /// caused by overload (loss). pub(crate) struct DynamicLimiter { disabled: bool, - // to notify when a token is available - ready: Notify, - inner: Mutex>, + sem: Semaphore>, } /// A concurrency token, required to run a job. @@ -131,17 +133,13 @@ struct LimiterState { impl DynamicLimiter { pub(crate) fn new_inner(initial_limit: usize, alg: L) -> Arc { - let ready = Notify::new(); - ready.notify_one(); - Arc::new(Self { disabled: initial_limit == 0, - inner: Mutex::new(LimiterInner { + sem: Semaphore::new_fifo(LimiterInner { limit: initial_limit, in_flight: 0, alg, }), - ready, }) } } @@ -170,18 +168,12 @@ impl DynamicLimiter { // If the rate limiter is disabled, we can always acquire a token. Token::disabled() } else { - let mut notified = pin!(self.ready.notified()); - let mut ready = notified.as_mut().enable(); - loop { - if ready { - let mut inner = self.inner.lock(); - if inner.take(&self.ready).is_some() { - break Token::new(self.clone()); - } - notified.set(self.ready.notified()); + match self.sem.acquire(()).await { + Err(close) => close.never(), + Ok(permit) => { + permit.take(); + Token::new(self.clone()) } - notified.as_mut().await; - ready = true; } } } @@ -202,22 +194,16 @@ impl DynamicLimiter { return; } - let mut inner = self.inner.lock(); - - inner.update_limit(start.elapsed(), outcome); - - inner.in_flight -= 1; - if inner.in_flight < inner.limit { - // At least 1 permit is now available - self.ready.notify_one(); - } + self.sem.with_state(|s| { + s.update_limit(start.elapsed(), outcome); + s.release(()); + }); } /// The current state of the limiter. #[cfg(test)] fn state(&self) -> LimiterState { - let inner = self.inner.lock(); - LimiterState { limit: inner.limit } + self.sem.with_state(|s| LimiterState { limit: s.limit }) } }