From 42e36ba5e8413738a40fab795a8d7f392b2a51db Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 16 Apr 2025 15:25:59 +0100 Subject: [PATCH] simplify generics more --- proxy/src/serverless/backend.rs | 11 +++++------ proxy/src/serverless/conn_pool.rs | 2 +- proxy/src/serverless/conn_pool_lib.rs | 23 ++++++++++------------ proxy/src/serverless/http_conn_pool.rs | 27 ++++++++++---------------- 4 files changed, 26 insertions(+), 37 deletions(-) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 81cf37a996..a72e319081 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -42,10 +42,9 @@ use crate::rate_limiter::EndpointRateLimiter; use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX}; pub(crate) struct PoolingBackend { - pub(crate) http_conn_pool: Arc>, + pub(crate) http_conn_pool: Arc>, pub(crate) local_pool: Arc>, - pub(crate) pool: - Arc>>, + pub(crate) pool: Arc>>, pub(crate) config: &'static ProxyConfig, pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>, @@ -248,7 +247,7 @@ impl PoolingBackend { conn_info: ConnInfo, ) -> Result, HttpConnError> { debug!("pool: looking for an existing connection"); - if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) { + if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) { return Ok(client); } @@ -532,7 +531,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError { } struct TokioMechanism { - pool: Arc>>, + pool: Arc>>, conn_info: ConnInfo, conn_id: uuid::Uuid, @@ -593,7 +592,7 @@ impl ConnectMechanism for TokioMechanism { } struct HyperMechanism { - pool: Arc>, + pool: Arc>, conn_info: ConnInfo, conn_id: uuid::Uuid, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 87176ff7d6..6df8d0d1b5 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -57,7 +57,7 @@ impl fmt::Display for ConnInfo { } pub(crate) fn poll_client( - global_pool: Arc>>, + global_pool: Arc>>, ctx: &RequestContext, conn_info: ConnInfo, client: C, diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 42a3ea17a2..b191399f45 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; @@ -326,12 +325,15 @@ impl DbUserConn for DbUserConnPool { } } -pub(crate) trait EndpointConnPoolExt { +pub(crate) trait EndpointConnPoolExt { + type Client; fn clear_closed(&mut self) -> usize; fn total_conns(&self) -> usize; } -impl EndpointConnPoolExt for EndpointConnPool { +impl EndpointConnPoolExt for EndpointConnPool { + type Client = Client; + fn clear_closed(&mut self) -> usize { let mut clients_removed: usize = 0; for db_pool in self.pools.values_mut() { @@ -345,10 +347,9 @@ impl EndpointConnPoolExt for EndpointConnPool { } } -pub(crate) struct GlobalConnPool +pub(crate) struct GlobalConnPool

where - C: ClientInnerExt, - P: EndpointConnPoolExt, + P: EndpointConnPoolExt, { // endpoint -> per-endpoint connection pool // @@ -367,8 +368,6 @@ where pub(crate) global_connections_count: Arc, pub(crate) config: &'static crate::config::HttpConfig, - - _marker: PhantomData, } #[derive(Debug, Clone, Copy)] @@ -391,10 +390,9 @@ pub struct GlobalConnPoolOptions { pub max_total_conns: usize, } -impl GlobalConnPool +impl

GlobalConnPool

where - C: ClientInnerExt, - P: EndpointConnPoolExt, + P: EndpointConnPoolExt, { pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { let shards = config.pool_options.pool_shards; @@ -403,7 +401,6 @@ where global_pool_size: AtomicUsize::new(0), config, global_connections_count: Arc::new(AtomicUsize::new(0)), - _marker: PhantomData, }) } @@ -492,7 +489,7 @@ where } } -impl GlobalConnPool> { +impl GlobalConnPool> { pub(crate) fn get( self: &Arc, ctx: &RequestContext, diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index f233294009..79072cd85d 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -9,7 +9,6 @@ use smol_str::ToSmolStr; use tracing::{Instrument, debug, error, info, info_span}; use super::AsyncRW; -use super::backend::HttpConnError; use super::conn_pool_lib::{ ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt, GlobalConnPool, @@ -85,7 +84,9 @@ impl HttpConnPool { } } -impl EndpointConnPoolExt for HttpConnPool { +impl EndpointConnPoolExt for HttpConnPool { + type Client = Client; + fn clear_closed(&mut self) -> usize { let Self { conns, .. } = self; let old_len = conns.len(); @@ -114,23 +115,15 @@ impl Drop for HttpConnPool { } } -impl GlobalConnPool { - #[expect(unused_results)] +impl GlobalConnPool { pub(crate) fn get( self: &Arc, ctx: &RequestContext, conn_info: &ConnInfo, - ) -> Result>, HttpConnError> { - let result: Result>, HttpConnError>; - let Some(endpoint) = conn_info.endpoint_cache_key() else { - result = Ok(None); - return result; - }; - let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); - let Some(client) = endpoint_pool.write().get_conn_entry() else { - result = Ok(None); - return result; - }; + ) -> Option> { + let endpoint = conn_info.endpoint_cache_key()?; + let endpoint_pool = self.global_pool.get(&endpoint)?.clone(); + let client = endpoint_pool.write().get_conn_entry()?; tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); debug!( @@ -140,7 +133,7 @@ impl GlobalConnPool { ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.success(); - Ok(Some(Client::new(client.conn.clone()))) + Some(Client::new(client.conn.clone())) } fn get_or_create_endpoint_pool( @@ -186,7 +179,7 @@ impl GlobalConnPool { } pub(crate) fn poll_http2_client( - global_pool: Arc>, + global_pool: Arc>, ctx: &RequestContext, conn_info: &ConnInfo, client: Send,