diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index a72e319081..cb13283791 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -211,7 +211,7 @@ impl PoolingBackend { None } else { debug!("pool: looking for an existing connection"); - self.pool.get(ctx, &conn_info)? + self.pool.get(ctx, &conn_info) }; if let Some(client) = maybe_client { diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 6df8d0d1b5..b5f32986c2 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -179,8 +179,8 @@ pub(crate) struct ClientDataRemote { } impl ClientDataRemote { - pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { - &mut self.session + pub fn session(&self) -> &tokio::sync::watch::Sender { + &self.session } pub fn cancel(&mut self) { diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index b191399f45..e0bc92fe77 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -11,11 +11,11 @@ use rand::Rng; use smol_str::ToSmolStr; use tracing::{Span, debug, info}; -use super::backend::HttpConnError; use super::conn_pool::ClientDataRemote; use super::http_conn_pool::ClientDataHttp; use super::local_conn_pool::ClientDataLocal; use crate::auth::backend::ComputeUserInfo; +use crate::config::HttpConfig; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; @@ -80,8 +80,8 @@ impl ClientInnerCommon { self.conn_id } - pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum { - &mut self.data + pub(crate) fn get_data(&self) -> &ClientDataEnum { + &self.data } } @@ -327,12 +327,29 @@ impl DbUserConn for DbUserConnPool { pub(crate) trait EndpointConnPoolExt { type Client; + type ClientInner: ClientInnerExt; + + fn create(config: &HttpConfig, global_connections_count: Arc) -> Self; + fn clear_closed(&mut self) -> usize; fn total_conns(&self) -> usize; } impl EndpointConnPoolExt for EndpointConnPool { type Client = Client; + type ClientInner = C; + + fn create(config: &HttpConfig, global_connections_count: Arc) -> Self { + EndpointConnPool { + pools: HashMap::new(), + total_conns: 0, + max_conns: config.pool_options.max_conns_per_endpoint, + _guard: Metrics::get().proxy.http_endpoint_pools.guard(), + global_connections_count, + global_pool_size_max_conns: config.pool_options.max_total_conns, + pool_name: String::from("remote"), + } + } fn clear_closed(&mut self) -> usize { let mut clients_removed: usize = 0; @@ -494,75 +511,69 @@ impl GlobalConnPool> { self: &Arc, ctx: &RequestContext, conn_info: &ConnInfo, - ) -> Result>, HttpConnError> { - let mut client: Option> = None; - let Some(endpoint) = conn_info.endpoint_cache_key() else { - return Ok(None); - }; + ) -> Option> { + let endpoint = conn_info.endpoint_cache_key()?; - let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); - if let Some(entry) = endpoint_pool + let endpoint_pool = self.get_endpoint_pool(&endpoint)?; + let client = endpoint_pool .write() - .get_conn_entry(conn_info.db_and_user()) - { - client = Some(entry.conn); - } + .get_conn_entry(conn_info.db_and_user())? + .conn; + let endpoint_pool = Arc::downgrade(&endpoint_pool); - // ok return cached connection if found and establish a new one otherwise - if let Some(mut client) = client { - if client.inner.is_closed() { - info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - return Ok(None); - } - tracing::Span::current() - .record("conn_id", tracing::field::display(client.get_conn_id())); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - debug!( - cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" - ); - - match client.get_data() { - ClientDataEnum::Local(data) => { - data.session().send(ctx.session_id())?; - } - - ClientDataEnum::Remote(data) => { - data.session().send(ctx.session_id())?; - } - ClientDataEnum::Http(_) => (), - } - - ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - ctx.success(); - return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); + if client.inner.is_closed() { + info!("pool: cached connection '{conn_info}' is closed, opening a new one"); + return None; } - Ok(None) + + tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id())); + tracing::Span::current().record( + "pid", + tracing::field::display(client.inner.get_process_id()), + ); + debug!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); + + match client.get_data() { + ClientDataEnum::Local(data) => { + data.session().send(ctx.session_id()).ok()?; + } + ClientDataEnum::Remote(data) => { + data.session().send(ctx.session_id()).ok()?; + } + ClientDataEnum::Http(_) => (), + } + + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); + Some(Client::new(client, conn_info.clone(), endpoint_pool)) + } +} + +impl GlobalConnPool

{ + pub(crate) fn get_endpoint_pool( + self: &Arc, + endpoint: &EndpointCacheKey, + ) -> Option>> { + Some(self.global_pool.get(endpoint)?.clone()) } pub(crate) fn get_or_create_endpoint_pool( self: &Arc, endpoint: &EndpointCacheKey, - ) -> Arc>> { + ) -> Arc> { // fast path if let Some(pool) = self.global_pool.get(endpoint) { return pool.clone(); } // slow path - let new_pool = Arc::new(RwLock::new(EndpointConnPool { - pools: HashMap::new(), - total_conns: 0, - max_conns: self.config.pool_options.max_conns_per_endpoint, - _guard: Metrics::get().proxy.http_endpoint_pools.guard(), - global_connections_count: self.global_connections_count.clone(), - global_pool_size_max_conns: self.config.pool_options.max_total_conns, - pool_name: String::from("remote"), - })); + let new_pool = Arc::new(RwLock::new(P::create( + self.config, + self.global_connections_count.clone(), + ))); // find or create a pool for this endpoint let mut created = false; @@ -589,6 +600,7 @@ impl GlobalConnPool> { pool } } + pub(crate) struct Client { span: Span, inner: Option>, diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index 79072cd85d..2e92610736 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -4,7 +4,6 @@ use std::sync::{Arc, Weak}; use hyper::client::conn::http2; use hyper_util::rt::{TokioExecutor, TokioIo}; -use parking_lot::RwLock; use smol_str::ToSmolStr; use tracing::{Instrument, debug, error, info, info_span}; @@ -13,11 +12,11 @@ use super::conn_pool_lib::{ ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt, GlobalConnPool, }; +use crate::config::HttpConfig; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; -use crate::types::EndpointCacheKey; use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; pub(crate) type Send = http2::SendRequest; @@ -86,6 +85,15 @@ impl HttpConnPool { impl EndpointConnPoolExt for HttpConnPool { type Client = Client; + type ClientInner = Send; + + fn create(_config: &HttpConfig, global_connections_count: Arc) -> Self { + HttpConnPool { + conns: VecDeque::new(), + _guard: Metrics::get().proxy.http_endpoint_pools.guard(), + global_connections_count, + } + } fn clear_closed(&mut self) -> usize { let Self { conns, .. } = self; @@ -135,47 +143,6 @@ impl GlobalConnPool { Some(Client::new(client.conn.clone())) } - - fn get_or_create_endpoint_pool( - self: &Arc, - endpoint: &EndpointCacheKey, - ) -> Arc> { - // fast path - if let Some(pool) = self.global_pool.get(endpoint) { - return pool.clone(); - } - - // slow path - let new_pool = Arc::new(RwLock::new(HttpConnPool { - conns: VecDeque::new(), - _guard: Metrics::get().proxy.http_endpoint_pools.guard(), - global_connections_count: self.global_connections_count.clone(), - })); - - // find or create a pool for this endpoint - let mut created = false; - let pool = self - .global_pool - .entry(endpoint.clone()) - .or_insert_with(|| { - created = true; - new_pool - }) - .clone(); - - // log new global pool size - if created { - let global_pool_size = self - .global_pool_size - .fetch_add(1, atomic::Ordering::Relaxed) - + 1; - info!( - "pool: created new pool for '{endpoint}', global pool size now {global_pool_size}" - ); - } - - pool - } } pub(crate) fn poll_http2_client( diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 1d9b35f41d..ed55bc2063 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -53,8 +53,8 @@ pub(crate) struct ClientDataLocal { } impl ClientDataLocal { - pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { - &mut self.session + pub fn session(&self) -> &tokio::sync::watch::Sender { + &self.session } pub fn cancel(&mut self) { @@ -99,7 +99,7 @@ impl LocalConnPool { .map(|entry| entry.conn); // ok return cached connection if found and establish a new one otherwise - if let Some(mut client) = client { + if let Some(client) = client { if client.inner.is_closed() { info!("local_pool: cached connection '{conn_info}' is closed, opening a new one"); return Ok(None);