Compare commits

...

6 Commits

Author SHA1 Message Date
Conrad Ludgate
ce93770120 fix missing registration 2025-04-22 12:25:00 +01:00
Conrad Ludgate
897cea978a make connection polling generic 2025-04-22 12:24:57 +01:00
Conrad Ludgate
86fb432ab2 fully abstract global conn pool 2025-04-22 12:17:13 +01:00
Conrad Ludgate
e9a12d626d more abstractions 2025-04-22 12:17:13 +01:00
Conrad Ludgate
42e36ba5e8 simplify generics more 2025-04-22 12:17:13 +01:00
Conrad Ludgate
cf05d4e4b2 simplify pool generics 2025-04-22 12:17:13 +01:00
5 changed files with 275 additions and 312 deletions

View File

@@ -16,9 +16,9 @@ use tracing::field::display;
use tracing::{debug, info};
use super::AsyncRW;
use super::conn_pool::poll_client;
use super::conn_pool::poll_client_generic;
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use super::http_conn_pool::{self, HttpConnPool};
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
@@ -42,10 +42,9 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) http_conn_pool: Arc<GlobalConnPool<HttpConnPool>>,
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
pub(crate) pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
@@ -212,7 +211,7 @@ impl PoolingBackend {
None
} else {
debug!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info)?
self.pool.get(ctx, &conn_info)
};
if let Some(client) = maybe_client {
@@ -246,9 +245,9 @@ impl PoolingBackend {
&self,
ctx: &RequestContext,
conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
) -> Result<http_conn_pool::Client, HttpConnError> {
debug!("pool: looking for an existing connection");
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
return Ok(client);
}
@@ -532,7 +531,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -578,7 +577,7 @@ impl ConnectMechanism for TokioMechanism {
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
}
Ok(poll_client(
Ok(poll_client_generic(
self.pool.clone(),
ctx,
self.conn_info.clone(),
@@ -593,7 +592,7 @@ impl ConnectMechanism for TokioMechanism {
}
struct HyperMechanism {
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pool: Arc<GlobalConnPool<HttpConnPool>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -603,7 +602,7 @@ struct HyperMechanism {
#[async_trait]
impl ConnectMechanism for HyperMechanism {
type Connection = http_conn_pool::Client<Send>;
type Connection = http_conn_pool::Client;
type ConnectError = HttpConnError;
type Error = HttpConnError;
@@ -639,15 +638,26 @@ impl ConnectMechanism for HyperMechanism {
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
}
Ok(poll_http2_client(
let client = poll_client_generic(
self.pool.clone(),
ctx,
&self.conn_info,
self.conn_info.clone(),
client,
connection,
self.conn_id,
node_info.aux.clone(),
))
);
// auth-broker -> local-proxy clients don't return to the pool, since
// they are multiplexing and cloneable. So instead we insert it once here.
if let Some(endpoint) = self.conn_info.endpoint_cache_key() {
self.pool
.get_or_create_endpoint_pool(&endpoint)
.write()
.register(&client);
}
Ok(client)
}
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}

View File

@@ -11,7 +11,7 @@ use smallvec::SmallVec;
use tokio::net::TcpStream;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, error, info, info_span, warn};
use tracing::{error, info, info_span, warn};
#[cfg(test)]
use {
super::conn_pool_lib::GlobalConnPoolOptions,
@@ -20,8 +20,7 @@ use {
};
use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
GlobalConnPool,
ClientDataEnum, ClientInnerCommon, ConnInfo, EndpointConnPoolExt, GlobalConnPool,
};
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
@@ -29,6 +28,7 @@ use crate::metrics::Metrics;
use crate::tls::postgres_rustls::MakeRustlsConnect;
type TlsStream = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::Stream;
pub(super) type Conn = postgres_client::Connection<TcpStream, TlsStream>;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
@@ -56,20 +56,20 @@ impl fmt::Display for ConnInfo {
}
}
pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
pub(crate) fn poll_client_generic<P: EndpointConnPoolExt>(
global_pool: Arc<GlobalConnPool<P>>,
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
mut connection: postgres_client::Connection<TcpStream, TlsStream>,
client: P::ClientInner,
connection: P::Connection,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<C> {
) -> P::Client {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let mut session_id = ctx.session_id();
let session_id = ctx.session_id();
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
let span = info_span!(parent: None, "connection", %conn_id);
let span = info_span!(parent: None, "connection", %conn_id, %session_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
@@ -85,27 +85,30 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
async move {
tokio::spawn(async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
let mut cancelled = pin!(cancelled);
let mut connection = pin!(P::spawn_conn(connection));
poll_fn(move |cx| {
let _enter = span.enter();
if cancelled.as_mut().poll(cx).is_ready() {
info!("connection dropped");
return Poll::Ready(())
return Poll::Ready(());
}
match rx.has_changed() {
Ok(true) => {
session_id = *rx.borrow_and_update();
info!(%session_id, "changed session");
let session_id = *rx.borrow_and_update();
span.record("session_id", tracing::field::display(session_id));
info!("changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
Err(_) => {
info!("connection dropped");
return Poll::Ready(())
return Poll::Ready(());
}
_ => {}
}
@@ -117,48 +120,25 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
if let Some(pool) = pool.clone().upgrade() {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool.write().remove_client(db_user.clone(), conn_id) {
if pool.write().remove_conn(db_user.clone(), conn_id) {
info!("idle connection removed");
}
}
}
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!(%session_id, "notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
}
Some(Ok(_)) => {
warn!(%session_id, "unknown message");
}
Some(Err(e)) => {
error!(%session_id, "connection error: {}", e);
break
}
None => {
info!("connection closed");
break
}
}
}
ready!(connection.as_mut().poll(cx));
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_client(db_user.clone(), conn_id) {
if pool.write().remove_conn(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;
}
.instrument(span));
})
.await;
});
let inner = ClientInnerCommon {
inner: client,
aux,
@@ -169,7 +149,42 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
}),
};
Client::new(inner, conn_info, pool_clone)
P::wrap_client(inner, conn_info, pool_clone)
}
pub async fn poll_tokio_postgres_conn_really(mut connection: Conn) {
poll_fn(move |cx| {
loop {
let message = ready!(connection.poll_message(cx));
match message {
Some(Ok(AsyncMessage::Notice(notice))) => {
info!("notice: {}", notice);
}
Some(Ok(AsyncMessage::Notification(notif))) => {
warn!(
pid = notif.process_id(),
channel = notif.channel(),
"notification received"
);
}
Some(Ok(_)) => {
warn!("unknown message");
}
Some(Err(e)) => {
error!("connection error: {}", e);
break;
}
None => {
info!("connection closed");
break;
}
}
}
Poll::Ready(())
})
.await;
}
#[derive(Clone)]
@@ -179,11 +194,11 @@ pub(crate) struct ClientDataRemote {
}
impl ClientDataRemote {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {
pub fn cancel(&self) {
self.cancel.cancel();
}
}
@@ -195,6 +210,7 @@ mod tests {
use super::*;
use crate::proxy::NeonOptions;
use crate::serverless::cancel_set::CancelSet;
use crate::serverless::conn_pool_lib::{Client, ClientInnerExt};
use crate::types::{BranchId, EndpointId, ProjectId};
struct MockClient(Arc<AtomicBool>);

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
@@ -12,11 +11,10 @@ use rand::Rng;
use smol_str::ToSmolStr;
use tracing::{Span, debug, info};
use super::backend::HttpConnError;
use super::conn_pool::ClientDataRemote;
use super::http_conn_pool::ClientDataHttp;
use super::conn_pool::{ClientDataRemote, poll_tokio_postgres_conn_really};
use super::local_conn_pool::ClientDataLocal;
use crate::auth::backend::ComputeUserInfo;
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
@@ -51,7 +49,6 @@ impl ConnInfo {
pub(crate) enum ClientDataEnum {
Remote(ClientDataRemote),
Local(ClientDataLocal),
Http(ClientDataHttp),
}
#[derive(Clone)]
@@ -64,14 +61,9 @@ pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
impl<C: ClientInnerExt> Drop for ClientInnerCommon<C> {
fn drop(&mut self) {
match &mut self.data {
ClientDataEnum::Remote(remote_data) => {
remote_data.cancel();
}
ClientDataEnum::Local(local_data) => {
local_data.cancel();
}
ClientDataEnum::Http(_http_data) => (),
match &self.data {
ClientDataEnum::Remote(remote_data) => remote_data.cancel(),
ClientDataEnum::Local(local_data) => local_data.cancel(),
}
}
}
@@ -81,8 +73,8 @@ impl<C: ClientInnerExt> ClientInnerCommon<C> {
self.conn_id
}
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
&mut self.data
pub(crate) fn get_data(&self) -> &ClientDataEnum {
&self.data
}
}
@@ -326,12 +318,70 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
}
}
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
pub(crate) trait EndpointConnPoolExt: Send + Sync + 'static {
type Client;
type ClientInner: ClientInnerExt;
type Connection: Send + 'static;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self;
fn wrap_client(
inner: ClientInnerCommon<Self::ClientInner>,
conn_info: ConnInfo,
pool: Weak<RwLock<Self>>,
) -> Self::Client;
fn get_conn_entry(
&mut self,
db_user: (DbName, RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>>;
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool;
fn spawn_conn(conn: Self::Connection) -> impl Future<Output = ()> + Send + 'static;
fn clear_closed(&mut self) -> usize;
fn total_conns(&self) -> usize;
}
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
impl<C: ClientInnerExt> EndpointConnPoolExt for EndpointConnPool<C> {
type Client = Client<C>;
type ClientInner = C;
type Connection = super::conn_pool::Conn;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
global_pool_size_max_conns: config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}
}
fn wrap_client(
client: ClientInnerCommon<Self::ClientInner>,
conn_info: ConnInfo,
pool: Weak<RwLock<Self>>,
) -> Self::Client {
Client::new(client, conn_info.clone(), pool)
}
fn get_conn_entry(
&mut self,
db_user: (DbName, RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>> {
Some(self.get_conn_entry(db_user)?.conn)
}
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
self.remove_client(db_user, conn_id)
}
async fn spawn_conn(conn: Self::Connection) {
poll_tokio_postgres_conn_really(conn).await;
}
fn clear_closed(&mut self) -> usize {
let mut clients_removed: usize = 0;
for db_pool in self.pools.values_mut() {
@@ -345,10 +395,9 @@ impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
}
}
pub(crate) struct GlobalConnPool<C, P>
pub(crate) struct GlobalConnPool<P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
P: EndpointConnPoolExt,
{
// endpoint -> per-endpoint connection pool
//
@@ -367,8 +416,6 @@ where
pub(crate) global_connections_count: Arc<AtomicUsize>,
pub(crate) config: &'static crate::config::HttpConfig,
_marker: PhantomData<C>,
}
#[derive(Debug, Clone, Copy)]
@@ -391,10 +438,9 @@ pub struct GlobalConnPoolOptions {
pub max_total_conns: usize,
}
impl<C, P> GlobalConnPool<C, P>
impl<P> GlobalConnPool<P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
P: EndpointConnPoolExt,
{
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
@@ -403,7 +449,6 @@ where
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
_marker: PhantomData,
})
}
@@ -492,80 +537,72 @@ where
}
}
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
) -> Option<P::Client> {
let endpoint = conn_info.endpoint_cache_key()?;
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
let endpoint_pool = self.get_endpoint_pool(&endpoint)?;
let client = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
.get_conn_entry(conn_info.db_and_user())?;
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return None;
}
Ok(None)
tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id()).ok()?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id()).ok()?;
}
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
Some(P::wrap_client(client, conn_info.clone(), endpoint_pool))
}
}
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
pub(crate) fn get_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Option<Arc<RwLock<P>>> {
Some(self.global_pool.get(endpoint)?.clone())
}
pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
) -> Arc<RwLock<P>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self.config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}));
let new_pool = Arc::new(RwLock::new(P::create(
self.config,
self.global_connections_count.clone(),
)));
// find or create a pool for this endpoint
let mut created = false;
@@ -592,6 +629,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
pool
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerCommon<C>>,

View File

@@ -4,32 +4,25 @@ use std::sync::{Arc, Weak};
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use smol_str::ToSmolStr;
use tracing::{Instrument, debug, error, info, info_span};
use tracing::{error, info};
use super::AsyncRW;
use super::backend::HttpConnError;
use super::conn_pool_lib::{
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
EndpointConnPoolExt, GlobalConnPool,
ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt,
};
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
pub(crate) struct ClientDataHttp();
// Per-endpoint connection pool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
pub(crate) struct HttpConnPool {
// TODO(conrad):
// either we should open more connections depending on stream count
// (not exposed by hyper, need our own counter)
@@ -39,13 +32,13 @@ pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
// seems somewhat redundant though.
//
// Probably we should run a semaphore and just the single conn. TBD.
conns: VecDeque<ConnPoolEntry<C>>,
conns: VecDeque<ConnPoolEntry<Send>>,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
}
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
impl HttpConnPool {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<Send>> {
let Self { conns, .. } = self;
loop {
@@ -83,9 +76,59 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
}
removed > 0
}
pub fn register(&mut self, client: &Client) {
self.conns.push_back(ConnPoolEntry {
conn: client.inner.clone(),
_last_access: std::time::Instant::now(),
});
}
}
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
impl EndpointConnPoolExt for HttpConnPool {
type Client = Client;
type ClientInner = Send;
type Connection = Connect;
fn create(_config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
}
}
fn wrap_client(
inner: ClientInnerCommon<Self::ClientInner>,
_conn_info: ConnInfo,
_pool: Weak<parking_lot::RwLock<Self>>,
) -> Self::Client {
Client::new(inner)
}
fn get_conn_entry(
&mut self,
_db_user: (crate::types::DbName, crate::types::RoleName),
) -> Option<ClientInnerCommon<Self::ClientInner>> {
Some(self.get_conn_entry()?.conn)
}
fn remove_conn(
&mut self,
_db_user: (crate::types::DbName, crate::types::RoleName),
conn_id: uuid::Uuid,
) -> bool {
self.remove_conn(conn_id)
}
async fn spawn_conn(conn: Self::Connection) {
let res = conn.await;
match res {
Ok(()) => info!("connection closed"),
Err(e) => error!("connection error: {e:?}"),
}
}
fn clear_closed(&mut self) -> usize {
let Self { conns, .. } = self;
let old_len = conns.len();
@@ -100,7 +143,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
impl Drop for HttpConnPool {
fn drop(&mut self) {
if !self.conns.is_empty() {
self.global_connections_count
@@ -114,154 +157,12 @@ impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
#[expect(unused_results)]
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let result: Result<Option<Client<C>>, HttpConnError>;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
result = Ok(None);
return result;
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
let Some(client) = endpoint_pool.write().get_conn_entry() else {
result = Ok(None);
return result;
};
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
Ok(Some(Client::new(client.conn.clone())))
}
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<HttpConnPool<C>>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
}));
// find or create a pool for this endpoint
let mut created = false;
let pool = self
.global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
new_pool
})
.clone();
// log new global pool size
if created {
let global_pool_size = self
.global_pool_size
.fetch_add(1, atomic::Ordering::Relaxed)
+ 1;
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
pub(crate) struct Client {
pub(crate) inner: ClientInnerCommon<Send>,
}
pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
ctx: &RequestContext,
conn_info: &ConnInfo,
client: Send,
connection: Connect,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
) -> Client<Send> {
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
let session_id = ctx.session_id();
let span = info_span!(parent: None, "connection", %conn_id);
let cold_start_info = ctx.cold_start_info();
span.in_scope(|| {
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
});
let pool = match conn_info.endpoint_cache_key() {
Some(endpoint) => {
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
let client = ClientInnerCommon {
inner: client.clone(),
aux: aux.clone(),
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
pool.write().conns.push_back(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.inc();
Arc::downgrade(&pool)
}
None => Weak::new(),
};
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let res = connection.await;
match res {
Ok(()) => info!("connection closed"),
Err(e) => error!(%session_id, "connection error: {e:?}"),
}
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_conn(conn_id) {
info!("closed connection removed");
}
}
}
.instrument(span),
);
let client = ClientInnerCommon {
inner: client,
aux,
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
Client::new(client)
}
pub(crate) struct Client<C: ClientInnerExt + Clone> {
pub(crate) inner: ClientInnerCommon<C>,
}
impl<C: ClientInnerExt + Clone> Client<C> {
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
impl Client {
pub(self) fn new(inner: ClientInnerCommon<Send>) -> Self {
Self { inner }
}

View File

@@ -53,11 +53,11 @@ pub(crate) struct ClientDataLocal {
}
impl ClientDataLocal {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {
pub fn cancel(&self) {
self.cancel.cancel();
}
}
@@ -99,7 +99,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
.map(|entry| entry.conn);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if let Some(client) = client {
if client.inner.is_closed() {
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
@@ -120,11 +120,9 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);