diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index e0bc92fe77..c3e1000f6b 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -330,6 +330,16 @@ pub(crate) trait EndpointConnPoolExt { type ClientInner: ClientInnerExt; fn create(config: &HttpConfig, global_connections_count: Arc) -> Self; + fn wrap_client( + inner: ClientInnerCommon, + conn_info: ConnInfo, + pool: Weak>, + ) -> Self::Client; + + fn get_conn_entry( + &mut self, + db_user: (DbName, RoleName), + ) -> Option>; fn clear_closed(&mut self) -> usize; fn total_conns(&self) -> usize; @@ -351,6 +361,21 @@ impl EndpointConnPoolExt for EndpointConnPool { } } + fn wrap_client( + client: ClientInnerCommon, + conn_info: ConnInfo, + pool: Weak>, + ) -> Self::Client { + Client::new(client, conn_info.clone(), pool) + } + + fn get_conn_entry( + &mut self, + db_user: (DbName, RoleName), + ) -> Option> { + Some(self.get_conn_entry(db_user)?.conn) + } + fn clear_closed(&mut self) -> usize { let mut clients_removed: usize = 0; for db_pool in self.pools.values_mut() { @@ -506,19 +531,18 @@ where } } -impl GlobalConnPool> { +impl GlobalConnPool

{ pub(crate) fn get( self: &Arc, ctx: &RequestContext, conn_info: &ConnInfo, - ) -> Option> { + ) -> Option { let endpoint = conn_info.endpoint_cache_key()?; let endpoint_pool = self.get_endpoint_pool(&endpoint)?; let client = endpoint_pool .write() - .get_conn_entry(conn_info.db_and_user())? - .conn; + .get_conn_entry(conn_info.db_and_user())?; let endpoint_pool = Arc::downgrade(&endpoint_pool); @@ -548,7 +572,7 @@ impl GlobalConnPool> { } ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - Some(Client::new(client, conn_info.clone(), endpoint_pool)) + Some(P::wrap_client(client, conn_info.clone(), endpoint_pool)) } } diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index 2e92610736..7e77da8408 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Weak}; use hyper::client::conn::http2; use hyper_util::rt::{TokioExecutor, TokioIo}; use smol_str::ToSmolStr; -use tracing::{Instrument, debug, error, info, info_span}; +use tracing::{Instrument, error, info, info_span}; use super::AsyncRW; use super::conn_pool_lib::{ @@ -14,7 +14,7 @@ use super::conn_pool_lib::{ }; use crate::config::HttpConfig; use crate::context::RequestContext; -use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; +use crate::control_plane::messages::MetricsAuxInfo; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; @@ -95,6 +95,21 @@ impl EndpointConnPoolExt for HttpConnPool { } } + fn wrap_client( + inner: ClientInnerCommon, + _conn_info: ConnInfo, + _pool: Weak>, + ) -> Self::Client { + Client::new(inner) + } + + fn get_conn_entry( + &mut self, + _db_user: (crate::types::DbName, crate::types::RoleName), + ) -> Option> { + Some(self.get_conn_entry()?.conn) + } + fn clear_closed(&mut self) -> usize { let Self { conns, .. } = self; let old_len = conns.len(); @@ -123,28 +138,6 @@ impl Drop for HttpConnPool { } } -impl GlobalConnPool { - pub(crate) fn get( - self: &Arc, - ctx: &RequestContext, - conn_info: &ConnInfo, - ) -> Option> { - let endpoint = conn_info.endpoint_cache_key()?; - let endpoint_pool = self.global_pool.get(&endpoint)?.clone(); - let client = endpoint_pool.write().get_conn_entry()?; - - tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); - debug!( - cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" - ); - ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - ctx.success(); - - Some(Client::new(client.conn.clone())) - } -} - pub(crate) fn poll_http2_client( global_pool: Arc>, ctx: &RequestContext,