mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
proxy: reduce global conn pool contention (#4747)
## Problem As documented, the global connection pool will be high contention. ## Summary of changes Use DashMap rather than Mutex<HashMap>. Of note, DashMap currently uses a RwLock internally, but it's partially sharded to reduce contention by a factor of N. We could potentially use flurry which is a port of Java's concurrent hashmap, but I have no good understanding of it's performance characteristics. Dashmap is at least equivalent to hashmap but less contention. See the read heavy benchmark to analyse our expected performance <https://github.com/xacrimon/conc-map-bench#ready-heavy> I also spoke with the developer of dashmap recently, and they are working on porting the implementation to use concurrent HAMT FWIW
This commit is contained in:
@@ -13,6 +13,7 @@ bytes = { workspace = true, features = ["serde"] }
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
dashmap.workspace = true
|
||||
futures.workspace = true
|
||||
git-version.workspace = true
|
||||
hashbrown.workspace = true
|
||||
@@ -29,7 +30,7 @@ metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pbkdf2.workspace = true
|
||||
pbkdf2 = { workspace = true, features = ["simple", "std"] }
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::Mutex;
|
||||
use dashmap::DashMap;
|
||||
use parking_lot::RwLock;
|
||||
use pbkdf2::{
|
||||
password_hash::{PasswordHashString, PasswordHasher, PasswordVerifier, SaltString},
|
||||
Params, Pbkdf2,
|
||||
};
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::time;
|
||||
|
||||
@@ -46,19 +52,40 @@ struct ConnPoolEntry {
|
||||
_last_access: std::time::Instant,
|
||||
}
|
||||
|
||||
// Per-endpoint connection pool, (dbname, username) -> Vec<ConnPoolEntry>
|
||||
// Per-endpoint connection pool, (dbname, username) -> DbUserConnPool
|
||||
// Number of open connections is limited by the `max_conns_per_endpoint`.
|
||||
pub struct EndpointConnPool {
|
||||
pools: HashMap<(String, String), Vec<ConnPoolEntry>>,
|
||||
pools: HashMap<(String, String), DbUserConnPool>,
|
||||
total_conns: usize,
|
||||
}
|
||||
|
||||
/// This is cheap and not hugely secure.
|
||||
/// But probably good enough for in memory only hashes.
|
||||
///
|
||||
/// Still takes 3.5ms to hash on my hardware.
|
||||
/// We don't want to ruin the latency improvements of using the pool by making password verification take too long
|
||||
const PARAMS: Params = Params {
|
||||
rounds: 10_000,
|
||||
output_length: 32,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct DbUserConnPool {
|
||||
conns: Vec<ConnPoolEntry>,
|
||||
password_hash: Option<PasswordHashString>,
|
||||
}
|
||||
|
||||
pub struct GlobalConnPool {
|
||||
// endpoint -> per-endpoint connection pool
|
||||
//
|
||||
// That should be a fairly conteded map, so return reference to the per-endpoint
|
||||
// pool as early as possible and release the lock.
|
||||
global_pool: Mutex<HashMap<String, Arc<Mutex<EndpointConnPool>>>>,
|
||||
global_pool: DashMap<String, Arc<RwLock<EndpointConnPool>>>,
|
||||
|
||||
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
|
||||
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
|
||||
/// It's only used for diagnostics.
|
||||
global_pool_size: AtomicUsize,
|
||||
|
||||
// Maximum number of connections per one endpoint.
|
||||
// Can mix different (dbname, username) connections.
|
||||
@@ -72,7 +99,8 @@ pub struct GlobalConnPool {
|
||||
impl GlobalConnPool {
|
||||
pub fn new(config: &'static crate::config::ProxyConfig) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
global_pool: Mutex::new(HashMap::new()),
|
||||
global_pool: DashMap::new(),
|
||||
global_pool_size: AtomicUsize::new(0),
|
||||
max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT,
|
||||
proxy_config: config,
|
||||
})
|
||||
@@ -85,33 +113,92 @@ impl GlobalConnPool {
|
||||
) -> anyhow::Result<tokio_postgres::Client> {
|
||||
let mut client: Option<tokio_postgres::Client> = None;
|
||||
|
||||
let mut hash_valid = false;
|
||||
if !force_new {
|
||||
let pool = self.get_endpoint_pool(&conn_info.hostname).await;
|
||||
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
|
||||
let mut hash = None;
|
||||
|
||||
// find a pool entry by (dbname, username) if exists
|
||||
let mut pool = pool.lock();
|
||||
let pool_entries = pool.pools.get_mut(&conn_info.db_and_user());
|
||||
if let Some(pool_entries) = pool_entries {
|
||||
if let Some(entry) = pool_entries.pop() {
|
||||
client = Some(entry.conn);
|
||||
pool.total_conns -= 1;
|
||||
{
|
||||
let pool = pool.read();
|
||||
if let Some(pool_entries) = pool.pools.get(&conn_info.db_and_user()) {
|
||||
if !pool_entries.conns.is_empty() {
|
||||
hash = pool_entries.password_hash.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// a connection exists in the pool, verify the password hash
|
||||
if let Some(hash) = hash {
|
||||
let pw = conn_info.password.clone();
|
||||
let validate = tokio::task::spawn_blocking(move || {
|
||||
Pbkdf2.verify_password(pw.as_bytes(), &hash.password_hash())
|
||||
})
|
||||
.await?;
|
||||
|
||||
// if the hash is invalid, don't error
|
||||
// we will continue with the regular connection flow
|
||||
if validate.is_ok() {
|
||||
hash_valid = true;
|
||||
let mut pool = pool.write();
|
||||
if let Some(pool_entries) = pool.pools.get_mut(&conn_info.db_and_user()) {
|
||||
if let Some(entry) = pool_entries.conns.pop() {
|
||||
client = Some(entry.conn);
|
||||
pool.total_conns -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(client) = client {
|
||||
let new_client = if let Some(client) = client {
|
||||
if client.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
connect_to_compute(self.proxy_config, conn_info).await
|
||||
} else {
|
||||
info!("pool: reusing connection '{conn_info}'");
|
||||
Ok(client)
|
||||
return Ok(client);
|
||||
}
|
||||
} else {
|
||||
info!("pool: opening a new connection '{conn_info}'");
|
||||
connect_to_compute(self.proxy_config, conn_info).await
|
||||
};
|
||||
|
||||
match &new_client {
|
||||
// clear the hash. it's no longer valid
|
||||
// TODO: update tokio-postgres fork to allow access to this error kind directly
|
||||
Err(err)
|
||||
if hash_valid && err.to_string().contains("password authentication failed") =>
|
||||
{
|
||||
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
|
||||
let mut pool = pool.write();
|
||||
if let Some(entry) = pool.pools.get_mut(&conn_info.db_and_user()) {
|
||||
entry.password_hash = None;
|
||||
}
|
||||
}
|
||||
// new password is valid and we should insert/update it
|
||||
Ok(_) if !force_new && !hash_valid => {
|
||||
let pw = conn_info.password.clone();
|
||||
let new_hash = tokio::task::spawn_blocking(move || {
|
||||
let salt = SaltString::generate(rand::rngs::OsRng);
|
||||
Pbkdf2
|
||||
.hash_password_customized(pw.as_bytes(), None, None, PARAMS, &salt)
|
||||
.map(|s| s.serialize())
|
||||
})
|
||||
.await??;
|
||||
|
||||
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
|
||||
let mut pool = pool.write();
|
||||
pool.pools
|
||||
.entry(conn_info.db_and_user())
|
||||
.or_default()
|
||||
.password_hash = Some(new_hash);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
new_client
|
||||
}
|
||||
|
||||
pub async fn put(
|
||||
@@ -119,33 +206,31 @@ impl GlobalConnPool {
|
||||
conn_info: &ConnInfo,
|
||||
client: tokio_postgres::Client,
|
||||
) -> anyhow::Result<()> {
|
||||
let pool = self.get_endpoint_pool(&conn_info.hostname).await;
|
||||
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
|
||||
|
||||
// return connection to the pool
|
||||
let mut total_conns;
|
||||
let mut returned = false;
|
||||
let mut per_db_size = 0;
|
||||
{
|
||||
let mut pool = pool.lock();
|
||||
total_conns = pool.total_conns;
|
||||
let total_conns = {
|
||||
let mut pool = pool.write();
|
||||
|
||||
let pool_entries: &mut Vec<ConnPoolEntry> = pool
|
||||
.pools
|
||||
.entry(conn_info.db_and_user())
|
||||
.or_insert_with(|| Vec::with_capacity(1));
|
||||
if total_conns < self.max_conns_per_endpoint {
|
||||
pool_entries.push(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
if pool.total_conns < self.max_conns_per_endpoint {
|
||||
// we create this db-user entry in get, so it should not be None
|
||||
if let Some(pool_entries) = pool.pools.get_mut(&conn_info.db_and_user()) {
|
||||
pool_entries.conns.push(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
|
||||
total_conns += 1;
|
||||
returned = true;
|
||||
per_db_size = pool_entries.len();
|
||||
returned = true;
|
||||
per_db_size = pool_entries.conns.len();
|
||||
|
||||
pool.total_conns += 1;
|
||||
pool.total_conns += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pool.total_conns
|
||||
};
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
@@ -157,25 +242,35 @@ impl GlobalConnPool {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_endpoint_pool(&self, endpoint: &String) -> Arc<Mutex<EndpointConnPool>> {
|
||||
fn get_or_create_endpoint_pool(&self, endpoint: &String) -> Arc<RwLock<EndpointConnPool>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
}));
|
||||
|
||||
// find or create a pool for this endpoint
|
||||
let mut created = false;
|
||||
let mut global_pool = self.global_pool.lock();
|
||||
let pool = global_pool
|
||||
let pool = self
|
||||
.global_pool
|
||||
.entry(endpoint.clone())
|
||||
.or_insert_with(|| {
|
||||
created = true;
|
||||
Arc::new(Mutex::new(EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
}))
|
||||
new_pool
|
||||
})
|
||||
.clone();
|
||||
let global_pool_size = global_pool.len();
|
||||
drop(global_pool);
|
||||
|
||||
// log new global pool size
|
||||
if created {
|
||||
let global_pool_size = self
|
||||
.global_pool_size
|
||||
.fetch_add(1, atomic::Ordering::Relaxed)
|
||||
+ 1;
|
||||
info!(
|
||||
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
|
||||
);
|
||||
|
||||
@@ -44,6 +44,7 @@ const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
|
||||
static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
|
||||
static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
|
||||
static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
|
||||
static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable");
|
||||
@@ -193,7 +194,7 @@ pub async fn handle(
|
||||
let array_mode = headers.get(&ARRAY_MODE) == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
// Allow connection pooling only if explicitly requested
|
||||
let allow_pool = false;
|
||||
let allow_pool = headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
|
||||
|
||||
// isolation level, read only and deferrable
|
||||
|
||||
|
||||
Reference in New Issue
Block a user