mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
Follow up to #9803 See https://github.com/neondatabase/cloud/issues/14378 In collaboration with @cloneable and @awarus, we sifted through logs and simply demoted some logs to debug. This is not at all finished and there are more logs to review, but we ran out of time in the session we organised. In any slightly more nuanced cases, we didn't touch the log, instead leaving a TODO comment. I've also slightly refactored the sql-over-http body read/length reject code. I can split that into a separate PR. It just felt natural after I switched to `read_body_with_limit` as we discussed during the meet.
271 lines
7.9 KiB
Rust
271 lines
7.9 KiB
Rust
use super::{LimitAlgorithm, Outcome, Sample};
|
|
|
|
/// Loss-based congestion avoidance.
|
|
///
|
|
/// Additive-increase, multiplicative decrease.
|
|
///
|
|
/// Adds available currency when:
|
|
/// 1. no load-based errors are observed, and
|
|
/// 2. the utilisation of the current limit is high.
|
|
///
|
|
/// Reduces available concurrency by a factor when load-based errors are detected.
|
|
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
|
|
pub(crate) struct Aimd {
|
|
/// Minimum limit for AIMD algorithm.
|
|
pub(crate) min: usize,
|
|
/// Maximum limit for AIMD algorithm.
|
|
pub(crate) max: usize,
|
|
/// Decrease AIMD decrease by value in case of error.
|
|
pub(crate) dec: f32,
|
|
/// Increase AIMD increase by value in case of success.
|
|
pub(crate) inc: usize,
|
|
/// A threshold below which the limit won't be increased.
|
|
pub(crate) utilisation: f32,
|
|
}
|
|
|
|
impl LimitAlgorithm for Aimd {
|
|
fn update(&self, old_limit: usize, sample: Sample) -> usize {
|
|
match sample.outcome {
|
|
Outcome::Success => {
|
|
let utilisation = sample.in_flight as f32 / old_limit as f32;
|
|
|
|
if utilisation > self.utilisation {
|
|
let limit = old_limit + self.inc;
|
|
let new_limit = limit.clamp(self.min, self.max);
|
|
if new_limit > old_limit {
|
|
tracing::info!(old_limit, new_limit, "limit increased");
|
|
} else {
|
|
tracing::debug!(old_limit, new_limit, "limit clamped at max");
|
|
}
|
|
|
|
new_limit
|
|
} else {
|
|
old_limit
|
|
}
|
|
}
|
|
Outcome::Overload => {
|
|
let new_limit = old_limit as f32 * self.dec;
|
|
|
|
// Floor instead of round, so the limit reduces even with small numbers.
|
|
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
|
|
let new_limit = new_limit.floor() as usize;
|
|
|
|
let new_limit = new_limit.clamp(self.min, self.max);
|
|
if new_limit < old_limit {
|
|
tracing::info!(old_limit, new_limit, "limit decreased");
|
|
} else {
|
|
tracing::debug!(old_limit, new_limit, "limit clamped at min");
|
|
}
|
|
new_limit
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::time::Duration;
|
|
|
|
use super::*;
|
|
use crate::rate_limiter::limit_algorithm::{
|
|
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
|
|
};
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn increase_decrease() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 1,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 2,
|
|
inc: 10,
|
|
dec: 0.5,
|
|
utilisation: 0.8,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
token.release(Outcome::Success);
|
|
|
|
assert_eq!(limiter.state().limit(), 2);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
token.release(Outcome::Success);
|
|
assert_eq!(limiter.state().limit(), 2);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
token.release(Outcome::Overload);
|
|
assert_eq!(limiter.state().limit(), 1);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
token.release(Outcome::Overload);
|
|
assert_eq!(limiter.state().limit(), 1);
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn should_decrease_limit_on_overload() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 10,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 1500,
|
|
inc: 10,
|
|
dec: 0.5,
|
|
utilisation: 0.8,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(100))
|
|
.await
|
|
.unwrap();
|
|
token.release(Outcome::Overload);
|
|
|
|
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn acquire_timeout_times_out() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 1,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 2,
|
|
inc: 10,
|
|
dec: 0.5,
|
|
utilisation: 0.8,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
let now = tokio::time::Instant::now();
|
|
limiter
|
|
.acquire_timeout(Duration::from_secs(1))
|
|
.await
|
|
.err()
|
|
.unwrap();
|
|
|
|
assert!(now.elapsed() >= Duration::from_secs(1));
|
|
|
|
token.release(Outcome::Success);
|
|
|
|
assert_eq!(limiter.state().limit(), 2);
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 4,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 1500,
|
|
inc: 1,
|
|
dec: 0.5,
|
|
utilisation: 0.5,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
let _token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
let _token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
|
|
token.release(Outcome::Success);
|
|
assert_eq!(limiter.state().limit(), 5, "success: increase");
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 4,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 1500,
|
|
inc: 10,
|
|
dec: 0.5,
|
|
utilisation: 0.5,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
|
|
token.release(Outcome::Success);
|
|
assert_eq!(
|
|
limiter.state().limit(),
|
|
4,
|
|
"success: ignore when < half limit"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(start_paused = true)]
|
|
async fn should_not_change_limit_when_no_outcome() {
|
|
let config = RateLimiterConfig {
|
|
initial_limit: 10,
|
|
algorithm: RateLimitAlgorithm::Aimd {
|
|
conf: Aimd {
|
|
min: 1,
|
|
max: 1500,
|
|
inc: 10,
|
|
dec: 0.5,
|
|
utilisation: 0.5,
|
|
},
|
|
},
|
|
};
|
|
|
|
let limiter = DynamicLimiter::new(config);
|
|
|
|
let token = limiter
|
|
.acquire_timeout(Duration::from_millis(1))
|
|
.await
|
|
.unwrap();
|
|
drop(token);
|
|
assert_eq!(limiter.state().limit(), 10, "ignore");
|
|
}
|
|
}
|