proxy: reintroduce dynamic limiter for compute lock (#7737)

## Problem

Computes that are healthy can manage many connection attempts at a time.
Unhealthy computes cannot. We initially handled this with a fixed
concurrency limit, but it seems this inhibits pgbench.

## Summary of changes

Support AIMD for connect_to_compute lock to allow varying the
concurrency limit based on compute health
This commit is contained in:
Conrad Ludgate
2024-05-29 11:17:05 +01:00
committed by GitHub
parent 14df69d0e3
commit c8cebecabf
11 changed files with 563 additions and 40 deletions

View File

@@ -38,6 +38,7 @@ hmac.workspace = true
hostname.workspace = true
http.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }

View File

@@ -557,14 +557,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let config::ConcurrencyLockOptions {
shards,
permits,
limiter,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(console::locks::ApiLocks::new(
"wake_compute_lock",
permits,
limiter,
shards,
timeout,
epoch,
@@ -603,14 +603,19 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let config::ConcurrencyLockOptions {
shards,
permits,
limiter,
epoch,
timeout,
} = args.connect_compute_lock.parse()?;
info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)");
info!(
?limiter,
shards,
?epoch,
"Using NodeLocks (connect_compute)"
);
let connect_compute_locks = console::locks::ApiLocks::new(
"connect_compute_lock",
permits,
limiter,
shards,
timeout,
epoch,

View File

@@ -1,7 +1,7 @@
use crate::{
auth::{self, backend::AuthRateLimiter},
console::locks::ApiLocks,
rate_limiter::RateBucketInfo,
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
scram::threadpool::ThreadPool,
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
Host,
@@ -580,14 +580,18 @@ impl RetryConfig {
}
/// Helper for cmdline cache options parsing.
#[derive(serde::Deserialize)]
pub struct ConcurrencyLockOptions {
/// The number of shards the lock map should have
pub shards: usize,
/// The number of allowed concurrent requests for each endpoitn
pub permits: usize,
#[serde(flatten)]
pub limiter: RateLimiterConfig,
/// Garbage collection epoch
#[serde(deserialize_with = "humantime_serde::deserialize")]
pub epoch: Duration,
/// Lock timeout
#[serde(deserialize_with = "humantime_serde::deserialize")]
pub timeout: Duration,
}
@@ -596,13 +600,18 @@ impl ConcurrencyLockOptions {
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
/// Default options for [`crate::console::provider::ApiLocks`].
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
"shards=64,permits=10,epoch=10m,timeout=10ms";
"shards=64,permits=100,epoch=10m,timeout=10ms";
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
/// Parse lock options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
fn parse(options: &str) -> anyhow::Result<Self> {
let options = options.trim();
if options.starts_with('{') && options.ends_with('}') {
return Ok(serde_json::from_str(options)?);
}
let mut shards = None;
let mut permits = None;
let mut epoch = None;
@@ -629,9 +638,13 @@ impl ConcurrencyLockOptions {
shards = Some(2);
}
let permits = permits.context("missing `permits`")?;
let out = Self {
shards: shards.context("missing `shards`")?,
permits: permits.context("missing `permits`")?,
limiter: RateLimiterConfig {
algorithm: RateLimitAlgorithm::Fixed,
initial_limit: permits,
},
epoch: epoch.context("missing `epoch`")?,
timeout: timeout.context("missing `timeout`")?,
};
@@ -657,6 +670,8 @@ impl FromStr for ConcurrencyLockOptions {
#[cfg(test)]
mod tests {
use crate::rate_limiter::Aimd;
use super::*;
#[test]
@@ -684,36 +699,68 @@ mod tests {
fn test_parse_lock_options() -> anyhow::Result<()> {
let ConcurrencyLockOptions {
epoch,
permits,
limiter,
shards,
timeout,
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
assert_eq!(epoch, Duration::from_secs(10 * 60));
assert_eq!(timeout, Duration::from_secs(1));
assert_eq!(shards, 32);
assert_eq!(permits, 4);
assert_eq!(limiter.initial_limit, 4);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
let ConcurrencyLockOptions {
epoch,
permits,
limiter,
shards,
timeout,
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
assert_eq!(epoch, Duration::from_secs(60));
assert_eq!(timeout, Duration::from_millis(100));
assert_eq!(shards, 16);
assert_eq!(permits, 8);
assert_eq!(limiter.initial_limit, 8);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
let ConcurrencyLockOptions {
epoch,
permits,
limiter,
shards,
timeout,
} = "permits=0".parse()?;
assert_eq!(epoch, Duration::ZERO);
assert_eq!(timeout, Duration::ZERO);
assert_eq!(shards, 2);
assert_eq!(permits, 0);
assert_eq!(limiter.initial_limit, 0);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
Ok(())
}
#[test]
fn test_parse_json_lock_options() -> anyhow::Result<()> {
let ConcurrencyLockOptions {
epoch,
limiter,
shards,
timeout,
} = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
.parse()?;
assert_eq!(epoch, Duration::from_secs(10 * 60));
assert_eq!(timeout, Duration::from_secs(1));
assert_eq!(shards, 32);
assert_eq!(limiter.initial_limit, 44);
assert_eq!(
limiter.algorithm,
RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 5,
max: 500,
dec: 0.9,
inc: 10,
utilisation: 0.8
}
},
);
Ok(())
}

View File

@@ -15,11 +15,11 @@ use crate::{
error::ReportableError,
intern::ProjectIdInt,
metrics::ApiLockMetrics,
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
scram, EndpointCacheKey,
};
use dashmap::DashMap;
use std::{hash::Hash, sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
@@ -443,8 +443,8 @@ impl ApiCaches {
/// Various caches for [`console`](super).
pub struct ApiLocks<K> {
name: &'static str,
node_locks: DashMap<K, Arc<Semaphore>>,
permits: usize,
node_locks: DashMap<K, Arc<DynamicLimiter>>,
config: RateLimiterConfig,
timeout: Duration,
epoch: std::time::Duration,
metrics: &'static ApiLockMetrics,
@@ -452,8 +452,6 @@ pub struct ApiLocks<K> {
#[derive(Debug, thiserror::Error)]
pub enum ApiLockError {
#[error("lock was closed")]
AcquireError(#[from] tokio::sync::AcquireError),
#[error("permit could not be acquired")]
TimeoutError(#[from] tokio::time::error::Elapsed),
}
@@ -461,7 +459,6 @@ pub enum ApiLockError {
impl ReportableError for ApiLockError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service,
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
}
}
@@ -470,7 +467,7 @@ impl ReportableError for ApiLockError {
impl<K: Hash + Eq + Clone> ApiLocks<K> {
pub fn new(
name: &'static str,
permits: usize,
config: RateLimiterConfig,
shards: usize,
timeout: Duration,
epoch: std::time::Duration,
@@ -479,7 +476,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
Ok(Self {
name,
node_locks: DashMap::with_shard_amount(shards),
permits,
config,
timeout,
epoch,
metrics,
@@ -487,8 +484,10 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
}
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
if self.permits == 0 {
return Ok(WakeComputePermit { permit: None });
if self.config.initial_limit == 0 {
return Ok(WakeComputePermit {
permit: Token::disabled(),
});
}
let now = Instant::now();
let semaphore = {
@@ -500,24 +499,22 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
.entry(key.clone())
.or_insert_with(|| {
self.metrics.semaphores_registered.inc();
Arc::new(Semaphore::new(self.permits))
DynamicLimiter::new(self.config)
})
.clone()
}
};
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
let permit = semaphore.acquire_deadline(now + self.timeout).await;
self.metrics
.semaphore_acquire_seconds
.observe(now.elapsed().as_secs_f64());
Ok(WakeComputePermit {
permit: Some(permit??),
})
Ok(WakeComputePermit { permit: permit? })
}
pub async fn garbage_collect_worker(&self) {
if self.permits == 0 {
if self.config.initial_limit == 0 {
return;
}
let mut interval =
@@ -547,12 +544,21 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
}
pub struct WakeComputePermit {
// None if the lock is disabled
permit: Option<OwnedSemaphorePermit>,
permit: Token,
}
impl WakeComputePermit {
pub fn should_check_cache(&self) -> bool {
self.permit.is_some()
!self.permit.is_disabled()
}
pub fn release(self, outcome: Outcome) {
self.permit.release(outcome)
}
pub fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
match res {
Ok(_) => self.release(Outcome::Success),
Err(_) => self.release(Outcome::Overload),
}
res
}
}

View File

@@ -301,7 +301,7 @@ impl super::Api for Api {
}
}
let mut node = self.do_wake_compute(ctx, user_info).await?;
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
ctx.set_project(node.aux.clone());
let cold_start_info = node.aux.cold_start_info;
info!("woken up a compute node");

View File

@@ -84,8 +84,8 @@ impl ConnectMechanism for TcpMechanism<'_> {
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host()?;
let _permit = self.locks.get_permit(&host).await?;
node_info.connect(ctx, timeout).await
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, timeout).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {

View File

@@ -1,2 +1,6 @@
mod limit_algorithm;
mod limiter;
pub use limit_algorithm::{
aimd::Aimd, DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
};
pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};

View File

@@ -0,0 +1,275 @@
//! Algorithms for controlling concurrency limits.
use parking_lot::Mutex;
use std::{pin::pin, sync::Arc, time::Duration};
use tokio::{
sync::Notify,
time::{error::Elapsed, timeout_at, Instant},
};
use self::aimd::Aimd;
pub mod aimd;
/// Whether a job succeeded or failed as a result of congestion/overload.
///
/// Errors not considered to be caused by overload should be ignored.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
/// The job succeeded, or failed in a way unrelated to overload.
Success,
/// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
/// was observed.
Overload,
}
/// An algorithm for controlling a concurrency limit.
pub trait LimitAlgorithm: Send + Sync + 'static {
/// Update the concurrency limit in response to a new job completion.
fn update(&self, old_limit: usize, sample: Sample) -> usize;
}
/// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub struct Sample {
pub(crate) latency: Duration,
/// Jobs in flight when the sample was taken.
pub(crate) in_flight: usize,
pub(crate) outcome: Outcome,
}
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum RateLimitAlgorithm {
#[default]
Fixed,
Aimd {
#[serde(flatten)]
conf: Aimd,
},
}
pub struct Fixed;
impl LimitAlgorithm for Fixed {
fn update(&self, old_limit: usize, _sample: Sample) -> usize {
old_limit
}
}
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
pub struct RateLimiterConfig {
#[serde(flatten)]
pub algorithm: RateLimitAlgorithm,
pub initial_limit: usize,
}
impl RateLimiterConfig {
pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
limit: usize,
in_flight: usize,
}
impl LimiterInner {
fn update(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
latency,
in_flight: self.in_flight,
outcome,
};
self.limit = self.alg.update(self.limit, sample);
}
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available > 1 {
self.available -= 1;
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available > 1 {
ready.notify_one();
}
Some(())
} else {
None
}
}
}
/// Limits the number of concurrent jobs.
///
/// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
/// token once the job is finished.
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
// to notify when a token is available
ready: Notify,
}
/// A concurrency token, required to run a job.
///
/// Release the token back to the [`DynamicLimiter`] after the job is complete.
pub struct Token {
start: Instant,
limiter: Option<Arc<DynamicLimiter>>,
}
/// A snapshot of the state of the [`DynamicLimiter`].
///
/// Not guaranteed to be consistent under high concurrency.
#[derive(Debug, Clone, Copy)]
pub struct LimiterState {
limit: usize,
in_flight: usize,
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub fn new(config: RateLimiterConfig) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
in_flight: 0,
}),
ready,
config,
})
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
///
/// Returns `None` if there are none available after `duration`.
pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
self.acquire_deadline(Instant::now() + duration).await
}
/// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available.
///
/// Returns `None` if there are none available after `deadline`.
pub async fn acquire_deadline(self: &Arc<Self>, deadline: Instant) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
let mut limit = None;
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
}
limit = Some(inner.limit);
}
match timeout_at(deadline, notified.as_mut()).await {
Ok(()) => ready = true,
Err(e) => {
let limit = limit.unwrap_or_else(|| self.inner.lock().limit);
tracing::info!(limit, "could not acquire token in time");
break Err(e);
}
}
}
}
}
/// Return the concurrency [Token], along with the outcome of the job.
///
/// The [Outcome] of the job, and the time taken to perform it, may be used
/// to update the concurrency limit.
///
/// Set the outcome to `None` to ignore the job.
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
tracing::info!("outcome is {:?}", outcome);
if self.config.initial_limit == 0 {
return;
}
let mut inner = self.inner.lock();
inner.update(start.elapsed(), outcome);
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}
inner.in_flight -= 1;
}
/// The current state of the limiter.
pub fn state(&self) -> LimiterState {
let inner = self.inner.lock();
LimiterState {
limit: inner.limit,
in_flight: inner.in_flight,
}
}
}
impl Token {
fn new(limiter: Arc<DynamicLimiter>) -> Self {
Self {
start: Instant::now(),
limiter: Some(limiter),
}
}
pub fn disabled() -> Self {
Self {
start: Instant::now(),
limiter: None,
}
}
pub fn is_disabled(&self) -> bool {
self.limiter.is_none()
}
pub fn release(mut self, outcome: Outcome) {
self.release_mut(Some(outcome))
}
pub fn release_mut(&mut self, outcome: Option<Outcome>) {
if let Some(limiter) = self.limiter.take() {
limiter.release_inner(self.start, outcome);
}
}
}
impl Drop for Token {
fn drop(&mut self) {
self.release_mut(None)
}
}
impl LimiterState {
/// The current concurrency limit.
pub fn limit(&self) -> usize {
self.limit
}
/// The number of jobs in flight.
pub fn in_flight(&self) -> usize {
self.in_flight
}
}

View File

@@ -0,0 +1,184 @@
use std::usize;
use super::{LimitAlgorithm, Outcome, Sample};
/// Loss-based congestion avoidance.
///
/// Additive-increase, multiplicative decrease.
///
/// Adds available currency when:
/// 1. no load-based errors are observed, and
/// 2. the utilisation of the current limit is high.
///
/// Reduces available concurrency by a factor when load-based errors are detected.
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
pub struct Aimd {
/// Minimum limit for AIMD algorithm.
pub min: usize,
/// Maximum limit for AIMD algorithm.
pub max: usize,
/// Decrease AIMD decrease by value in case of error.
pub dec: f32,
/// Increase AIMD increase by value in case of success.
pub inc: usize,
/// A threshold below which the limit won't be increased.
pub utilisation: f32,
}
impl LimitAlgorithm for Aimd {
fn update(&self, old_limit: usize, sample: Sample) -> usize {
use Outcome::*;
match sample.outcome {
Success => {
let utilisation = sample.in_flight as f32 / old_limit as f32;
if utilisation > self.utilisation {
let limit = old_limit + self.inc;
let increased_limit = limit.clamp(self.min, self.max);
if increased_limit > old_limit {
tracing::info!(increased_limit, "limit increased");
}
increased_limit
} else {
old_limit
}
}
Overload => {
let limit = old_limit as f32 * self.dec;
// Floor instead of round, so the limit reduces even with small numbers.
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
let limit = limit.floor() as usize;
limit.clamp(self.min, self.max)
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::rate_limiter::limit_algorithm::{
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
};
use super::*;
#[tokio::test(start_paused = true)]
async fn should_decrease_limit_on_overload() {
let config = RateLimiterConfig {
initial_limit: 10,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.8,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Overload);
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
}
#[tokio::test(start_paused = true)]
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
let config = RateLimiterConfig {
initial_limit: 4,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 1,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
let _token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
let _token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(limiter.state().limit(), 5, "success: increase");
}
#[tokio::test(start_paused = true)]
async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
let config = RateLimiterConfig {
initial_limit: 4,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(
limiter.state().limit(),
4,
"success: ignore when < half limit"
);
}
#[tokio::test(start_paused = true)]
async fn should_not_change_limit_when_no_outcome() {
let config = RateLimiterConfig {
initial_limit: 10,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
drop(token);
assert_eq!(limiter.state().limit(), 10, "ignore");
}
}

View File

@@ -232,9 +232,9 @@ impl ConnectMechanism for TokioMechanism {
.connect_timeout(timeout);
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);
drop(permit);
let (client, connection) = permit.release_result(res)?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
Ok(poll_client(