Merge pull request #7980 from neondatabase/rc/proxy/2024-06-06

Proxy release 2024-06-06
This commit is contained in:
Anna Khanova
2024-06-06 13:14:40 +02:00
committed by GitHub
106 changed files with 4955 additions and 2246 deletions

View File

@@ -452,7 +452,7 @@ pub struct ApiLocks<K> {
#[derive(Debug, thiserror::Error)]
pub enum ApiLockError {
#[error("permit could not be acquired")]
#[error("timeout acquiring resource permit")]
TimeoutError(#[from] tokio::time::error::Elapsed),
}
@@ -504,7 +504,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
.clone()
}
};
let permit = semaphore.acquire_deadline(now + self.timeout).await;
let permit = semaphore.acquire_timeout(self.timeout).await;
self.metrics
.semaphore_acquire_seconds

View File

@@ -3,7 +3,7 @@ use parking_lot::Mutex;
use std::{pin::pin, sync::Arc, time::Duration};
use tokio::{
sync::Notify,
time::{error::Elapsed, timeout_at, Instant},
time::{error::Elapsed, Instant},
};
use self::aimd::Aimd;
@@ -80,7 +80,7 @@ pub struct LimiterInner {
}
impl LimiterInner {
fn update(&mut self, latency: Duration, outcome: Option<Outcome>) {
fn update_limit(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
latency,
@@ -92,12 +92,12 @@ impl LimiterInner {
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available > 1 {
if self.available >= 1 {
self.available -= 1;
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available > 1 {
if self.available >= 1 {
ready.notify_one();
}
Some(())
@@ -157,16 +157,12 @@ impl DynamicLimiter {
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
///
/// Returns `None` if there are none available after `duration`.
pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
self.acquire_deadline(Instant::now() + duration).await
tokio::time::timeout(duration, self.acquire()).await?
}
/// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available.
///
/// Returns `None` if there are none available after `deadline`.
pub async fn acquire_deadline(self: &Arc<Self>, deadline: Instant) -> Result<Token, Elapsed> {
/// Try to acquire a concurrency [Token].
async fn acquire(self: &Arc<Self>) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
@@ -174,22 +170,16 @@ impl DynamicLimiter {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
let mut limit = None;
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
}
limit = Some(inner.limit);
}
match timeout_at(deadline, notified.as_mut()).await {
Ok(()) => ready = true,
Err(e) => {
let limit = limit.unwrap_or_else(|| self.inner.lock().limit);
tracing::info!(limit, "could not acquire token in time");
break Err(e);
} else {
notified.set(self.ready.notified());
}
}
notified.as_mut().await;
ready = true;
}
}
}
@@ -208,14 +198,14 @@ impl DynamicLimiter {
let mut inner = self.inner.lock();
inner.update(start.elapsed(), outcome);
inner.update_limit(start.elapsed(), outcome);
inner.in_flight -= 1;
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}
inner.in_flight -= 1;
}
/// The current state of the limiter.

View File

@@ -51,7 +51,9 @@ impl LimitAlgorithm for Aimd {
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
let limit = limit.floor() as usize;
limit.clamp(self.min, self.max)
let limit = limit.clamp(self.min, self.max);
tracing::info!(limit, "limit decreased");
limit
}
}
}
@@ -67,6 +69,53 @@ mod tests {
use super::*;
#[tokio::test(start_paused = true)]
async fn increase_decrease() {
let config = RateLimiterConfig {
initial_limit: 1,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 2,
inc: 10,
dec: 0.5,
utilisation: 0.8,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(limiter.state().limit(), 2);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(limiter.state().limit(), 2);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Overload);
assert_eq!(limiter.state().limit(), 1);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Overload);
assert_eq!(limiter.state().limit(), 1);
}
#[tokio::test(start_paused = true)]
async fn should_decrease_limit_on_overload() {
let config = RateLimiterConfig {
@@ -85,7 +134,7 @@ mod tests {
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.acquire_timeout(Duration::from_millis(100))
.await
.unwrap();
token.release(Outcome::Overload);
@@ -93,6 +142,41 @@ mod tests {
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
}
#[tokio::test(start_paused = true)]
async fn acquire_timeout_times_out() {
let config = RateLimiterConfig {
initial_limit: 1,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 2,
inc: 10,
dec: 0.5,
utilisation: 0.8,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
let now = tokio::time::Instant::now();
limiter
.acquire_timeout(Duration::from_secs(1))
.await
.err()
.unwrap();
assert!(now.elapsed() >= Duration::from_secs(1));
token.release(Outcome::Success);
assert_eq!(limiter.state().limit(), 2);
}
#[tokio::test(start_paused = true)]
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
let config = RateLimiterConfig {