mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
proxy: add connect compute concurrency lock (#7607)
## Problem Too many connect_compute attempts can overwhelm postgres, getting the connections stuck. ## Summary of changes Limit number of connection attempts that can happen at a given time.
This commit is contained in:
@@ -118,8 +118,11 @@ struct ProxyCliArgs {
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
wake_compute_cache: String,
|
||||
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
|
||||
wake_compute_lock: String,
|
||||
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
|
||||
connect_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
@@ -529,24 +532,21 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
endpoint_cache_config,
|
||||
)));
|
||||
|
||||
let config::WakeComputeLockOptions {
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(
|
||||
console::locks::ApiLocks::new(
|
||||
"wake_compute_lock",
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().wake_compute_lock,
|
||||
)
|
||||
.unwrap(),
|
||||
));
|
||||
let locks = Box::leak(Box::new(console::locks::ApiLocks::new(
|
||||
"wake_compute_lock",
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().wake_compute_lock,
|
||||
)?));
|
||||
tokio::spawn(locks.garbage_collect_worker());
|
||||
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
@@ -572,6 +572,23 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
auth::BackendType::Link(MaybeOwned::Owned(url), ())
|
||||
}
|
||||
};
|
||||
|
||||
let config::ConcurrencyLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.connect_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)");
|
||||
let connect_compute_locks = console::locks::ApiLocks::new(
|
||||
"connect_compute_lock",
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
epoch,
|
||||
&Metrics::get().proxy.connect_compute_lock,
|
||||
)?;
|
||||
|
||||
let http_config = HttpConfig {
|
||||
request_timeout: args.sql_over_http.sql_over_http_timeout,
|
||||
pool_options: GlobalConnPoolOptions {
|
||||
@@ -607,11 +624,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
region: args.region.clone(),
|
||||
aws_region: args.aws_region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute_retry_config: config::RetryConfig::parse(
|
||||
&args.connect_to_compute_retry,
|
||||
)?,
|
||||
}));
|
||||
|
||||
tokio::spawn(config.connect_compute_locks.garbage_collect_worker());
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
error::{ReportableError, UserFacingError},
|
||||
metrics::{Metrics, NumDbConnectionsGuard},
|
||||
proxy::neon_option,
|
||||
Host,
|
||||
};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
@@ -101,6 +102,16 @@ impl ConnCfg {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_host(&self) -> Result<Host, WakeComputeError> {
|
||||
match self.0.get_hosts() {
|
||||
[tokio_postgres::config::Host::Tcp(s)] => Ok(s.into()),
|
||||
// we should not have multiple address or unix addresses.
|
||||
_ => Err(WakeComputeError::BadComputeAddress(
|
||||
"invalid compute address".into(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply startup message params to the connection config.
|
||||
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
|
||||
// Only set `user` if it's not present in the config.
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use crate::{
|
||||
auth::{self, backend::AuthRateLimiter},
|
||||
console::locks::ApiLocks,
|
||||
rate_limiter::RateBucketInfo,
|
||||
serverless::GlobalConnPoolOptions,
|
||||
Host,
|
||||
};
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use itertools::Itertools;
|
||||
@@ -34,6 +36,7 @@ pub struct ProxyConfig {
|
||||
pub handshake_timeout: Duration,
|
||||
pub aws_region: String,
|
||||
pub wake_compute_retry_config: RetryConfig,
|
||||
pub connect_compute_locks: ApiLocks<Host>,
|
||||
pub connect_to_compute_retry_config: RetryConfig,
|
||||
}
|
||||
|
||||
@@ -573,7 +576,7 @@ impl RetryConfig {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
pub struct WakeComputeLockOptions {
|
||||
pub struct ConcurrencyLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
@@ -584,9 +587,12 @@ pub struct WakeComputeLockOptions {
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl WakeComputeLockOptions {
|
||||
impl ConcurrencyLockOptions {
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
|
||||
"shards=64,permits=50,epoch=10m,timeout=500ms";
|
||||
|
||||
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
|
||||
|
||||
@@ -636,7 +642,7 @@ impl WakeComputeLockOptions {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for WakeComputeLockOptions {
|
||||
impl FromStr for ConcurrencyLockOptions {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(options: &str) -> Result<Self, Self::Err> {
|
||||
@@ -672,7 +678,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_parse_lock_options() -> anyhow::Result<()> {
|
||||
let WakeComputeLockOptions {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
@@ -683,7 +689,7 @@ mod tests {
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(permits, 4);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
@@ -694,7 +700,7 @@ mod tests {
|
||||
assert_eq!(shards, 16);
|
||||
assert_eq!(permits, 8);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
let ConcurrencyLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
|
||||
@@ -17,7 +17,7 @@ use crate::{
|
||||
scram, EndpointCacheKey,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::{hash::Hash, sync::Arc, time::Duration};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
@@ -447,16 +447,16 @@ impl ApiCaches {
|
||||
}
|
||||
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks {
|
||||
pub struct ApiLocks<K> {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<EndpointCacheKey, Arc<Semaphore>>,
|
||||
node_locks: DashMap<K, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
}
|
||||
|
||||
impl ApiLocks {
|
||||
impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
permits: usize,
|
||||
@@ -475,10 +475,7 @@ impl ApiLocks {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_wake_compute_permit(
|
||||
&self,
|
||||
key: &EndpointCacheKey,
|
||||
) -> Result<WakeComputePermit, errors::WakeComputeError> {
|
||||
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, errors::WakeComputeError> {
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::{
|
||||
http,
|
||||
metrics::{CacheOutcome, Metrics},
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
scram, Normalize,
|
||||
scram, EndpointCacheKey, Normalize,
|
||||
};
|
||||
use crate::{cache::Cached, context::RequestMonitoring};
|
||||
use futures::TryFutureExt;
|
||||
@@ -25,7 +25,7 @@ use tracing::{error, info, info_span, warn, Instrument};
|
||||
pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
pub caches: &'static ApiCaches,
|
||||
pub locks: &'static ApiLocks,
|
||||
pub locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
pub endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
jwt: String,
|
||||
}
|
||||
@@ -35,7 +35,7 @@ impl Api {
|
||||
pub fn new(
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
@@ -289,7 +289,7 @@ impl super::Api for Api {
|
||||
return Err(WakeComputeError::TooManyConnections);
|
||||
}
|
||||
|
||||
let permit = self.locks.get_wake_compute_permit(&key).await?;
|
||||
let permit = self.locks.get_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
|
||||
@@ -159,6 +159,9 @@ smol_str_wrapper!(EndpointCacheKey);
|
||||
|
||||
smol_str_wrapper!(DbName);
|
||||
|
||||
// postgres hostname, will likely be a port:ip addr
|
||||
smol_str_wrapper!(Host);
|
||||
|
||||
// Endpoints are a bit tricky. Rare they might be branches or projects.
|
||||
impl EndpointId {
|
||||
pub fn is_endpoint(&self) -> bool {
|
||||
|
||||
@@ -126,6 +126,9 @@ pub struct ProxyMetrics {
|
||||
|
||||
/// Number of events consumed from redis (per event type).
|
||||
pub redis_events_count: CounterVec<StaticLabelSet<RedisEventsCount>>,
|
||||
|
||||
#[metric(namespace = "connect_compute_lock")]
|
||||
pub connect_compute_lock: ApiLockMetrics,
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
@@ -149,6 +152,12 @@ impl Default for ProxyMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ApiLockMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "direction")]
|
||||
pub enum HttpDirection {
|
||||
|
||||
@@ -301,7 +301,10 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
let mut node = connect_to_compute(
|
||||
ctx,
|
||||
&TcpMechanism { params: ¶ms },
|
||||
&TcpMechanism {
|
||||
params: ¶ms,
|
||||
locks: &config.connect_compute_locks,
|
||||
},
|
||||
&user_info,
|
||||
mode.allow_self_signed_compute(config),
|
||||
config.wake_compute_retry_config,
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::{
|
||||
auth::backend::ComputeCredentialKeys,
|
||||
compute::{self, PostgresConnection},
|
||||
config::RetryConfig,
|
||||
console::{self, errors::WakeComputeError, CachedNodeInfo, NodeInfo},
|
||||
console::{self, errors::WakeComputeError, locks::ApiLocks, CachedNodeInfo, NodeInfo},
|
||||
context::RequestMonitoring,
|
||||
error::ReportableError,
|
||||
metrics::{ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType},
|
||||
@@ -10,6 +10,7 @@ use crate::{
|
||||
retry::{retry_after, ShouldRetry},
|
||||
wake_compute::wake_compute,
|
||||
},
|
||||
Host,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use pq_proto::StartupMessageParams;
|
||||
@@ -64,6 +65,9 @@ pub trait ComputeConnectBackend {
|
||||
pub struct TcpMechanism<'a> {
|
||||
/// KV-dictionary with PostgreSQL connection params.
|
||||
pub params: &'a StartupMessageParams,
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
pub locks: &'static ApiLocks<Host>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -79,6 +83,8 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
let host = node_info.config.get_host()?;
|
||||
let _permit = self.locks.get_permit(&host).await?;
|
||||
node_info.connect(ctx, timeout).await
|
||||
}
|
||||
|
||||
|
||||
@@ -9,11 +9,13 @@ use crate::{
|
||||
config::{AuthenticationConfig, ProxyConfig},
|
||||
console::{
|
||||
errors::{GetAuthInfoError, WakeComputeError},
|
||||
locks::ApiLocks,
|
||||
CachedNodeInfo,
|
||||
},
|
||||
context::RequestMonitoring,
|
||||
error::{ErrorKind, ReportableError, UserFacingError},
|
||||
proxy::connect_compute::ConnectMechanism,
|
||||
proxy::{connect_compute::ConnectMechanism, retry::ShouldRetry},
|
||||
Host,
|
||||
};
|
||||
|
||||
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
|
||||
@@ -105,6 +107,7 @@ impl PoolingBackend {
|
||||
conn_id,
|
||||
conn_info,
|
||||
pool: self.pool.clone(),
|
||||
locks: &self.config.connect_compute_locks,
|
||||
},
|
||||
&backend,
|
||||
false, // do not allow self signed compute for http flow
|
||||
@@ -154,16 +157,31 @@ impl UserFacingError for HttpConnError {
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for HttpConnError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
HttpConnError::ConnectionError(e) => e.could_retry(),
|
||||
HttpConnError::ConnectionClosedAbruptly(_) => false,
|
||||
HttpConnError::GetAuthInfo(_) => false,
|
||||
HttpConnError::AuthError(_) => false,
|
||||
HttpConnError::WakeCompute(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
locks: &'static ApiLocks<Host>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TokioMechanism {
|
||||
type Connection = Client<tokio_postgres::Client>;
|
||||
type ConnectError = tokio_postgres::Error;
|
||||
type ConnectError = HttpConnError;
|
||||
type Error = HttpConnError;
|
||||
|
||||
async fn connect_once(
|
||||
@@ -172,6 +190,9 @@ impl ConnectMechanism for TokioMechanism {
|
||||
node_info: &CachedNodeInfo,
|
||||
timeout: Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let host = node_info.config.get_host()?;
|
||||
let permit = self.locks.get_permit(&host).await?;
|
||||
|
||||
let mut config = (*node_info.config).clone();
|
||||
let config = config
|
||||
.user(&self.conn_info.user_info.user)
|
||||
@@ -182,6 +203,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
drop(pause);
|
||||
drop(permit);
|
||||
|
||||
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
|
||||
Ok(poll_client(
|
||||
|
||||
Reference in New Issue
Block a user