From 86f7396eb794b3ebc8264cd09417d0b4540fb647 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 13 Dec 2023 17:38:26 +0000 Subject: [PATCH] permit for unauthenticated connection attempts --- proxy/src/console/provider.rs | 9 +++++---- proxy/src/console/provider/neon.rs | 18 ++++++++---------- proxy/src/serverless/conn_pool.rs | 28 ++++++++++++++++++++++++---- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index ebdc20ce41..a605ba8724 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -10,6 +10,7 @@ use crate::{ }; use async_trait::async_trait; use dashmap::DashMap; +use smol_str::SmolStr; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{OwnedSemaphorePermit, Semaphore}, @@ -285,10 +286,10 @@ pub struct ApiCaches { pub allowed_ips: TimedLru, Arc>>, } -/// Various caches for [`console`](super). +/// Per-endpoint semaphore pub struct ApiLocks { name: &'static str, - node_locks: DashMap, Arc>, + node_locks: DashMap>, permits: usize, timeout: Duration, registered: prometheus::IntCounter, @@ -354,9 +355,9 @@ impl ApiLocks { }) } - pub async fn get_wake_compute_permit( + pub async fn get_permit( &self, - key: &Arc, + key: &SmolStr, ) -> Result { if self.permits == 0 { return Ok(WakeComputePermit { permit: None }); diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index d5b4e55a04..b35112d769 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -192,33 +192,31 @@ impl super::Api for Api { extra: &ConsoleReqExtra, creds: &ComputeUserInfo, ) -> Result { - let key: &str = &creds.inner.cache_key; + let key = &creds.inner.cache_key; // Every time we do a wakeup http request, the compute node will stay up // for some time (highly depends on the console's scale-to-zero policy); // The connection info remains the same during that period of time, // which means that we might cache it to reduce the load and latency. - if let Some(cached) = self.caches.node_info.get(key) { - info!(key = key, "found cached compute node info"); + if let Some(cached) = self.caches.node_info.get(&**key) { + info!(key = &**key, "found cached compute node info"); return Ok(cached); } - let key: Arc = key.into(); - - let permit = self.locks.get_wake_compute_permit(&key).await?; + let permit = self.locks.get_permit(key).await?; // after getting back a permit - it's possible the cache was filled // double check if permit.should_check_cache() { - if let Some(cached) = self.caches.node_info.get(&key) { - info!(key = &*key, "found cached compute node info"); + if let Some(cached) = self.caches.node_info.get(&**key) { + info!(key = &**key, "found cached compute node info"); return Ok(cached); } } let node = self.do_wake_compute(extra, creds).await?; - let (_, cached) = self.caches.node_info.insert(key.clone(), node); - info!(key = &*key, "created a cache entry for compute node info"); + let (_, cached) = self.caches.node_info.insert(key.as_str().into(), node); + info!(key = &**key, "created a cache entry for compute node info"); Ok(cached) } diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 4f3b31b9be..7dd417a3bf 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -9,7 +9,7 @@ use pbkdf2::{ }; use pq_proto::StartupMessageParams; use smol_str::SmolStr; -use std::{collections::HashMap, net::IpAddr, sync::Arc}; +use std::{collections::HashMap, net::IpAddr, sync::Arc, time::Duration}; use std::{ fmt, task::{ready, Poll}, @@ -23,7 +23,7 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus}; use crate::{ auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list}, - console, + console::{self, locks::ApiLocks}, proxy::{ neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER, @@ -114,17 +114,32 @@ pub struct GlobalConnPool { // Using a lock to remove any race conditions. // Eg cleaning up connections while a new connection is returned closed: RwLock, + + // semaphore guarding unauthenticated postgres connections + connection_lock: ApiLocks, } impl GlobalConnPool { pub fn new(config: &'static crate::config::ProxyConfig) -> Arc { - Arc::new(Self { + let connection_lock = + ApiLocks::new("http_connect_lock", 2, 32, Duration::from_secs(10)).unwrap(); + let this = Arc::new(Self { global_pool: DashMap::new(), global_pool_size: AtomicUsize::new(0), max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT, proxy_config: config, closed: RwLock::new(false), - }) + connection_lock, + }); + + let this2 = this.clone(); + tokio::spawn(async move { + this2 + .connection_lock + .garbage_collect_worker(Duration::from_secs(600)) + .await + }); + this } pub fn shutdown(&self) { @@ -221,6 +236,11 @@ impl GlobalConnPool { return Ok(Client::new(client, pool).await); } } else { + // acquire a permit for un-authenticated access to the compute. + // to be clear, postgres will authenticate, but we want to limit the connections + // that have potential to be unauthenticated. + let _permit = self.connection_lock.get_permit(&conn_info.hostname).await?; + let conn_id = uuid::Uuid::new_v4(); info!(%conn_id, "pool: opening a new connection '{conn_info}'"); connect_to_compute(