more abstractions

This commit is contained in:
Conrad Ludgate
2025-04-16 15:41:04 +01:00
parent 42e36ba5e8
commit e9a12d626d
5 changed files with 84 additions and 105 deletions

View File

@@ -211,7 +211,7 @@ impl PoolingBackend {
None
} else {
debug!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info)?
self.pool.get(ctx, &conn_info)
};
if let Some(client) = maybe_client {

View File

@@ -179,8 +179,8 @@ pub(crate) struct ClientDataRemote {
}
impl ClientDataRemote {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {

View File

@@ -11,11 +11,11 @@ use rand::Rng;
use smol_str::ToSmolStr;
use tracing::{Span, debug, info};
use super::backend::HttpConnError;
use super::conn_pool::ClientDataRemote;
use super::http_conn_pool::ClientDataHttp;
use super::local_conn_pool::ClientDataLocal;
use crate::auth::backend::ComputeUserInfo;
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
@@ -80,8 +80,8 @@ impl<C: ClientInnerExt> ClientInnerCommon<C> {
self.conn_id
}
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
&mut self.data
pub(crate) fn get_data(&self) -> &ClientDataEnum {
&self.data
}
}
@@ -327,12 +327,29 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
pub(crate) trait EndpointConnPoolExt {
type Client;
type ClientInner: ClientInnerExt;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self;
fn clear_closed(&mut self) -> usize;
fn total_conns(&self) -> usize;
}
impl<C: ClientInnerExt> EndpointConnPoolExt for EndpointConnPool<C> {
type Client = Client<C>;
type ClientInner = C;
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
global_pool_size_max_conns: config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}
}
fn clear_closed(&mut self) -> usize {
let mut clients_removed: usize = 0;
@@ -494,75 +511,69 @@ impl<C: ClientInnerExt> GlobalConnPool<EndpointConnPool<C>> {
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
) -> Option<Client<C>> {
let endpoint = conn_info.endpoint_cache_key()?;
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
let endpoint_pool = self.get_endpoint_pool(&endpoint)?;
let client = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
.get_conn_entry(conn_info.db_and_user())?
.conn;
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return None;
}
Ok(None)
tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id()).ok()?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id()).ok()?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
Some(Client::new(client, conn_info.clone(), endpoint_pool))
}
}
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
pub(crate) fn get_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Option<Arc<RwLock<P>>> {
Some(self.global_pool.get(endpoint)?.clone())
}
pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
) -> Arc<RwLock<P>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self.config.pool_options.max_conns_per_endpoint,
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
pool_name: String::from("remote"),
}));
let new_pool = Arc::new(RwLock::new(P::create(
self.config,
self.global_connections_count.clone(),
)));
// find or create a pool for this endpoint
let mut created = false;
@@ -589,6 +600,7 @@ impl<C: ClientInnerExt> GlobalConnPool<EndpointConnPool<C>> {
pool
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerCommon<C>>,

View File

@@ -4,7 +4,6 @@ use std::sync::{Arc, Weak};
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use smol_str::ToSmolStr;
use tracing::{Instrument, debug, error, info, info_span};
@@ -13,11 +12,11 @@ use super::conn_pool_lib::{
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
EndpointConnPoolExt, GlobalConnPool,
};
use crate::config::HttpConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
use crate::protocol2::ConnectionInfoExtra;
use crate::types::EndpointCacheKey;
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
@@ -86,6 +85,15 @@ impl HttpConnPool {
impl EndpointConnPoolExt for HttpConnPool {
type Client = Client<Send>;
type ClientInner = Send;
fn create(_config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count,
}
}
fn clear_closed(&mut self) -> usize {
let Self { conns, .. } = self;
@@ -135,47 +143,6 @@ impl GlobalConnPool<HttpConnPool> {
Some(Client::new(client.conn.clone()))
}
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<HttpConnPool>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
}));
// find or create a pool for this endpoint
let mut created = false;
let pool = self
.global_pool
.entry(endpoint.clone())
.or_insert_with(|| {
created = true;
new_pool
})
.clone();
// log new global pool size
if created {
let global_pool_size = self
.global_pool_size
.fetch_add(1, atomic::Ordering::Relaxed)
+ 1;
info!(
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
);
}
pool
}
}
pub(crate) fn poll_http2_client(

View File

@@ -53,8 +53,8 @@ pub(crate) struct ClientDataLocal {
}
impl ClientDataLocal {
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
&mut self.session
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
&self.session
}
pub fn cancel(&mut self) {
@@ -99,7 +99,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
.map(|entry| entry.conn);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if let Some(client) = client {
if client.inner.is_closed() {
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);