mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-30 16:50:37 +00:00
Compare commits
6 Commits
cloneable/
...
conrad/poo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce93770120 | ||
|
|
897cea978a | ||
|
|
86fb432ab2 | ||
|
|
e9a12d626d | ||
|
|
42e36ba5e8 | ||
|
|
cf05d4e4b2 |
@@ -16,9 +16,9 @@ use tracing::field::display;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::AsyncRW;
|
||||
use super::conn_pool::poll_client;
|
||||
use super::conn_pool::poll_client_generic;
|
||||
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
|
||||
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
|
||||
use super::http_conn_pool::{self, HttpConnPool};
|
||||
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
|
||||
use crate::auth::backend::local::StaticAuthRules;
|
||||
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
|
||||
@@ -42,10 +42,9 @@ use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<HttpConnPool>>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<postgres_client::Client>>,
|
||||
pub(crate) pool:
|
||||
Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
pub(crate) pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
|
||||
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
@@ -212,7 +211,7 @@ impl PoolingBackend {
|
||||
None
|
||||
} else {
|
||||
debug!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
self.pool.get(ctx, &conn_info)
|
||||
};
|
||||
|
||||
if let Some(client) = maybe_client {
|
||||
@@ -246,9 +245,9 @@ impl PoolingBackend {
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
|
||||
) -> Result<http_conn_pool::Client, HttpConnError> {
|
||||
debug!("pool: looking for an existing connection");
|
||||
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
if let Some(client) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
@@ -532,7 +531,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
pool: Arc<GlobalConnPool<EndpointConnPool<postgres_client::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -578,7 +577,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
|
||||
}
|
||||
|
||||
Ok(poll_client(
|
||||
Ok(poll_client_generic(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
self.conn_info.clone(),
|
||||
@@ -593,7 +592,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
}
|
||||
|
||||
struct HyperMechanism {
|
||||
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pool: Arc<GlobalConnPool<HttpConnPool>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -603,7 +602,7 @@ struct HyperMechanism {
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for HyperMechanism {
|
||||
type Connection = http_conn_pool::Client<Send>;
|
||||
type Connection = http_conn_pool::Client;
|
||||
type ConnectError = HttpConnError;
|
||||
type Error = HttpConnError;
|
||||
|
||||
@@ -639,15 +638,26 @@ impl ConnectMechanism for HyperMechanism {
|
||||
info!("latency={}, query_id={}", ctx.get_proxy_latency(), query_id);
|
||||
}
|
||||
|
||||
Ok(poll_http2_client(
|
||||
let client = poll_client_generic(
|
||||
self.pool.clone(),
|
||||
ctx,
|
||||
&self.conn_info,
|
||||
self.conn_info.clone(),
|
||||
client,
|
||||
connection,
|
||||
self.conn_id,
|
||||
node_info.aux.clone(),
|
||||
))
|
||||
);
|
||||
|
||||
// auth-broker -> local-proxy clients don't return to the pool, since
|
||||
// they are multiplexing and cloneable. So instead we insert it once here.
|
||||
if let Some(endpoint) = self.conn_info.endpoint_cache_key() {
|
||||
self.pool
|
||||
.get_or_create_endpoint_pool(&endpoint)
|
||||
.write()
|
||||
.register(&client);
|
||||
}
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, _config: &mut compute::ConnCfg) {}
|
||||
|
||||
@@ -11,7 +11,7 @@ use smallvec::SmallVec;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, error, info, info_span, warn};
|
||||
use tracing::{error, info, info_span, warn};
|
||||
#[cfg(test)]
|
||||
use {
|
||||
super::conn_pool_lib::GlobalConnPoolOptions,
|
||||
@@ -20,8 +20,7 @@ use {
|
||||
};
|
||||
|
||||
use super::conn_pool_lib::{
|
||||
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
|
||||
GlobalConnPool,
|
||||
ClientDataEnum, ClientInnerCommon, ConnInfo, EndpointConnPoolExt, GlobalConnPool,
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
@@ -29,6 +28,7 @@ use crate::metrics::Metrics;
|
||||
use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
|
||||
type TlsStream = <MakeRustlsConnect as MakeTlsConnect<TcpStream>>::Stream;
|
||||
pub(super) type Conn = postgres_client::Connection<TcpStream, TlsStream>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConnInfoWithAuth {
|
||||
@@ -56,20 +56,20 @@ impl fmt::Display for ConnInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
|
||||
pub(crate) fn poll_client_generic<P: EndpointConnPoolExt>(
|
||||
global_pool: Arc<GlobalConnPool<P>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
mut connection: postgres_client::Connection<TcpStream, TlsStream>,
|
||||
client: P::ClientInner,
|
||||
connection: P::Connection,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<C> {
|
||||
) -> P::Client {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let mut session_id = ctx.session_id();
|
||||
let session_id = ctx.session_id();
|
||||
let (tx, mut rx) = tokio::sync::watch::channel(session_id);
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let span = info_span!(parent: None, "connection", %conn_id, %session_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
@@ -85,27 +85,30 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
let cancel = CancellationToken::new();
|
||||
let cancelled = cancel.clone().cancelled_owned();
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
tokio::spawn(async move {
|
||||
let _conn_gauge = conn_gauge;
|
||||
let mut idle_timeout = pin!(tokio::time::sleep(idle));
|
||||
let mut cancelled = pin!(cancelled);
|
||||
let mut connection = pin!(P::spawn_conn(connection));
|
||||
|
||||
poll_fn(move |cx| {
|
||||
let _enter = span.enter();
|
||||
|
||||
if cancelled.as_mut().poll(cx).is_ready() {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
match rx.has_changed() {
|
||||
Ok(true) => {
|
||||
session_id = *rx.borrow_and_update();
|
||||
info!(%session_id, "changed session");
|
||||
let session_id = *rx.borrow_and_update();
|
||||
span.record("session_id", tracing::field::display(session_id));
|
||||
info!("changed session");
|
||||
idle_timeout.as_mut().reset(Instant::now() + idle);
|
||||
}
|
||||
Err(_) => {
|
||||
info!("connection dropped");
|
||||
return Poll::Ready(())
|
||||
return Poll::Ready(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -117,48 +120,25 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
// remove client from pool - should close the connection if it's idle.
|
||||
// does nothing if the client is currently checked-out and in-use
|
||||
if pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.write().remove_conn(db_user.clone(), conn_id) {
|
||||
info!("idle connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session_id, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session_id, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session_id, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session_id, "connection error: {}", e);
|
||||
break
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
ready!(connection.as_mut().poll(cx));
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
if pool.write().remove_client(db_user.clone(), conn_id) {
|
||||
if pool.write().remove_conn(db_user.clone(), conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
}).await;
|
||||
|
||||
}
|
||||
.instrument(span));
|
||||
})
|
||||
.await;
|
||||
});
|
||||
let inner = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
@@ -169,7 +149,42 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
}),
|
||||
};
|
||||
|
||||
Client::new(inner, conn_info, pool_clone)
|
||||
P::wrap_client(inner, conn_info, pool_clone)
|
||||
}
|
||||
|
||||
pub async fn poll_tokio_postgres_conn_really(mut connection: Conn) {
|
||||
poll_fn(move |cx| {
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!("notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(
|
||||
pid = notif.process_id(),
|
||||
channel = notif.channel(),
|
||||
"notification received"
|
||||
);
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!("unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("connection error: {}", e);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -179,11 +194,11 @@ pub(crate) struct ClientDataRemote {
|
||||
}
|
||||
|
||||
impl ClientDataRemote {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&mut self.session
|
||||
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&self.session
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
pub fn cancel(&self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
@@ -195,6 +210,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::serverless::conn_pool_lib::{Client, ClientInnerExt};
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
|
||||
struct MockClient(Arc<AtomicBool>);
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -12,11 +11,10 @@ use rand::Rng;
|
||||
use smol_str::ToSmolStr;
|
||||
use tracing::{Span, debug, info};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool::ClientDataRemote;
|
||||
use super::http_conn_pool::ClientDataHttp;
|
||||
use super::conn_pool::{ClientDataRemote, poll_tokio_postgres_conn_really};
|
||||
use super::local_conn_pool::ClientDataLocal;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::config::HttpConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
@@ -51,7 +49,6 @@ impl ConnInfo {
|
||||
pub(crate) enum ClientDataEnum {
|
||||
Remote(ClientDataRemote),
|
||||
Local(ClientDataLocal),
|
||||
Http(ClientDataHttp),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -64,14 +61,9 @@ pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
|
||||
|
||||
impl<C: ClientInnerExt> Drop for ClientInnerCommon<C> {
|
||||
fn drop(&mut self) {
|
||||
match &mut self.data {
|
||||
ClientDataEnum::Remote(remote_data) => {
|
||||
remote_data.cancel();
|
||||
}
|
||||
ClientDataEnum::Local(local_data) => {
|
||||
local_data.cancel();
|
||||
}
|
||||
ClientDataEnum::Http(_http_data) => (),
|
||||
match &self.data {
|
||||
ClientDataEnum::Remote(remote_data) => remote_data.cancel(),
|
||||
ClientDataEnum::Local(local_data) => local_data.cancel(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -81,8 +73,8 @@ impl<C: ClientInnerExt> ClientInnerCommon<C> {
|
||||
self.conn_id
|
||||
}
|
||||
|
||||
pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum {
|
||||
&mut self.data
|
||||
pub(crate) fn get_data(&self) -> &ClientDataEnum {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
@@ -326,12 +318,70 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
|
||||
pub(crate) trait EndpointConnPoolExt: Send + Sync + 'static {
|
||||
type Client;
|
||||
type ClientInner: ClientInnerExt;
|
||||
type Connection: Send + 'static;
|
||||
|
||||
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self;
|
||||
fn wrap_client(
|
||||
inner: ClientInnerCommon<Self::ClientInner>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<RwLock<Self>>,
|
||||
) -> Self::Client;
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>>;
|
||||
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool;
|
||||
|
||||
fn spawn_conn(conn: Self::Connection) -> impl Future<Output = ()> + Send + 'static;
|
||||
|
||||
fn clear_closed(&mut self) -> usize;
|
||||
fn total_conns(&self) -> usize;
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
|
||||
impl<C: ClientInnerExt> EndpointConnPoolExt for EndpointConnPool<C> {
|
||||
type Client = Client<C>;
|
||||
type ClientInner = C;
|
||||
type Connection = super::conn_pool::Conn;
|
||||
|
||||
fn create(config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
|
||||
EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
max_conns: config.pool_options.max_conns_per_endpoint,
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count,
|
||||
global_pool_size_max_conns: config.pool_options.max_total_conns,
|
||||
pool_name: String::from("remote"),
|
||||
}
|
||||
}
|
||||
|
||||
fn wrap_client(
|
||||
client: ClientInnerCommon<Self::ClientInner>,
|
||||
conn_info: ConnInfo,
|
||||
pool: Weak<RwLock<Self>>,
|
||||
) -> Self::Client {
|
||||
Client::new(client, conn_info.clone(), pool)
|
||||
}
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
db_user: (DbName, RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>> {
|
||||
Some(self.get_conn_entry(db_user)?.conn)
|
||||
}
|
||||
|
||||
fn remove_conn(&mut self, db_user: (DbName, RoleName), conn_id: uuid::Uuid) -> bool {
|
||||
self.remove_client(db_user, conn_id)
|
||||
}
|
||||
|
||||
async fn spawn_conn(conn: Self::Connection) {
|
||||
poll_tokio_postgres_conn_really(conn).await;
|
||||
}
|
||||
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let mut clients_removed: usize = 0;
|
||||
for db_pool in self.pools.values_mut() {
|
||||
@@ -345,10 +395,9 @@ impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GlobalConnPool<C, P>
|
||||
pub(crate) struct GlobalConnPool<P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
P: EndpointConnPoolExt,
|
||||
{
|
||||
// endpoint -> per-endpoint connection pool
|
||||
//
|
||||
@@ -367,8 +416,6 @@ where
|
||||
pub(crate) global_connections_count: Arc<AtomicUsize>,
|
||||
|
||||
pub(crate) config: &'static crate::config::HttpConfig,
|
||||
|
||||
_marker: PhantomData<C>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -391,10 +438,9 @@ pub struct GlobalConnPoolOptions {
|
||||
pub max_total_conns: usize,
|
||||
}
|
||||
|
||||
impl<C, P> GlobalConnPool<C, P>
|
||||
impl<P> GlobalConnPool<P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
P: EndpointConnPoolExt,
|
||||
{
|
||||
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
|
||||
let shards = config.pool_options.pool_shards;
|
||||
@@ -403,7 +449,6 @@ where
|
||||
global_pool_size: AtomicUsize::new(0),
|
||||
config,
|
||||
global_connections_count: Arc::new(AtomicUsize::new(0)),
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -492,80 +537,72 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
|
||||
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let mut client: Option<ClientInnerCommon<C>> = None;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
return Ok(None);
|
||||
};
|
||||
) -> Option<P::Client> {
|
||||
let endpoint = conn_info.endpoint_cache_key()?;
|
||||
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
if let Some(entry) = endpoint_pool
|
||||
let endpoint_pool = self.get_endpoint_pool(&endpoint)?;
|
||||
let client = endpoint_pool
|
||||
.write()
|
||||
.get_conn_entry(conn_info.db_and_user())
|
||||
{
|
||||
client = Some(entry.conn);
|
||||
}
|
||||
.get_conn_entry(conn_info.db_and_user())?;
|
||||
|
||||
let endpoint_pool = Arc::downgrade(&endpoint_pool);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::Span::current()
|
||||
.record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return None;
|
||||
}
|
||||
Ok(None)
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id()).ok()?;
|
||||
}
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id()).ok()?;
|
||||
}
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
Some(P::wrap_client(client, conn_info.clone(), endpoint_pool))
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: EndpointConnPoolExt> GlobalConnPool<P> {
|
||||
pub(crate) fn get_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Option<Arc<RwLock<P>>> {
|
||||
Some(self.global_pool.get(endpoint)?.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Arc<RwLock<EndpointConnPool<C>>> {
|
||||
) -> Arc<RwLock<P>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
|
||||
pools: HashMap::new(),
|
||||
total_conns: 0,
|
||||
max_conns: self.config.pool_options.max_conns_per_endpoint,
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count: self.global_connections_count.clone(),
|
||||
global_pool_size_max_conns: self.config.pool_options.max_total_conns,
|
||||
pool_name: String::from("remote"),
|
||||
}));
|
||||
let new_pool = Arc::new(RwLock::new(P::create(
|
||||
self.config,
|
||||
self.global_connections_count.clone(),
|
||||
)));
|
||||
|
||||
// find or create a pool for this endpoint
|
||||
let mut created = false;
|
||||
@@ -592,6 +629,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
|
||||
pool
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt> {
|
||||
span: Span,
|
||||
inner: Option<ClientInnerCommon<C>>,
|
||||
|
||||
@@ -4,32 +4,25 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use hyper::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use smol_str::ToSmolStr;
|
||||
use tracing::{Instrument, debug, error, info, info_span};
|
||||
use tracing::{error, info};
|
||||
|
||||
use super::AsyncRW;
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool_lib::{
|
||||
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
|
||||
EndpointConnPoolExt, GlobalConnPool,
|
||||
ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, EndpointConnPoolExt,
|
||||
};
|
||||
use crate::config::HttpConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
|
||||
|
||||
pub(crate) type Send = http2::SendRequest<hyper::body::Incoming>;
|
||||
pub(crate) type Connect = http2::Connection<TokioIo<AsyncRW>, hyper::body::Incoming, TokioExecutor>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataHttp();
|
||||
|
||||
// Per-endpoint connection pool
|
||||
// Number of open connections is limited by the `max_conns_per_endpoint`.
|
||||
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
|
||||
pub(crate) struct HttpConnPool {
|
||||
// TODO(conrad):
|
||||
// either we should open more connections depending on stream count
|
||||
// (not exposed by hyper, need our own counter)
|
||||
@@ -39,13 +32,13 @@ pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
|
||||
// seems somewhat redundant though.
|
||||
//
|
||||
// Probably we should run a semaphore and just the single conn. TBD.
|
||||
conns: VecDeque<ConnPoolEntry<C>>,
|
||||
conns: VecDeque<ConnPoolEntry<Send>>,
|
||||
_guard: HttpEndpointPoolsGuard<'static>,
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
|
||||
impl HttpConnPool {
|
||||
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<Send>> {
|
||||
let Self { conns, .. } = self;
|
||||
|
||||
loop {
|
||||
@@ -83,9 +76,59 @@ impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
}
|
||||
removed > 0
|
||||
}
|
||||
|
||||
pub fn register(&mut self, client: &Client) {
|
||||
self.conns.push_back(ConnPoolEntry {
|
||||
conn: client.inner.clone(),
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
|
||||
impl EndpointConnPoolExt for HttpConnPool {
|
||||
type Client = Client;
|
||||
type ClientInner = Send;
|
||||
type Connection = Connect;
|
||||
|
||||
fn create(_config: &HttpConfig, global_connections_count: Arc<AtomicUsize>) -> Self {
|
||||
HttpConnPool {
|
||||
conns: VecDeque::new(),
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count,
|
||||
}
|
||||
}
|
||||
|
||||
fn wrap_client(
|
||||
inner: ClientInnerCommon<Self::ClientInner>,
|
||||
_conn_info: ConnInfo,
|
||||
_pool: Weak<parking_lot::RwLock<Self>>,
|
||||
) -> Self::Client {
|
||||
Client::new(inner)
|
||||
}
|
||||
|
||||
fn get_conn_entry(
|
||||
&mut self,
|
||||
_db_user: (crate::types::DbName, crate::types::RoleName),
|
||||
) -> Option<ClientInnerCommon<Self::ClientInner>> {
|
||||
Some(self.get_conn_entry()?.conn)
|
||||
}
|
||||
|
||||
fn remove_conn(
|
||||
&mut self,
|
||||
_db_user: (crate::types::DbName, crate::types::RoleName),
|
||||
conn_id: uuid::Uuid,
|
||||
) -> bool {
|
||||
self.remove_conn(conn_id)
|
||||
}
|
||||
|
||||
async fn spawn_conn(conn: Self::Connection) {
|
||||
let res = conn.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!("connection error: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let Self { conns, .. } = self;
|
||||
let old_len = conns.len();
|
||||
@@ -100,7 +143,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
|
||||
impl Drop for HttpConnPool {
|
||||
fn drop(&mut self) {
|
||||
if !self.conns.is_empty() {
|
||||
self.global_connections_count
|
||||
@@ -114,154 +157,12 @@ impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
|
||||
#[expect(unused_results)]
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let result: Result<Option<Client<C>>, HttpConnError>;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
result = Ok(None);
|
||||
return result;
|
||||
};
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
let Some(client) = endpoint_pool.write().get_conn_entry() else {
|
||||
result = Ok(None);
|
||||
return result;
|
||||
};
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
|
||||
Ok(Some(Client::new(client.conn.clone())))
|
||||
}
|
||||
|
||||
fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Arc<RwLock<HttpConnPool<C>>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(HttpConnPool {
|
||||
conns: VecDeque::new(),
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count: self.global_connections_count.clone(),
|
||||
}));
|
||||
|
||||
// find or create a pool for this endpoint
|
||||
let mut created = false;
|
||||
let pool = self
|
||||
.global_pool
|
||||
.entry(endpoint.clone())
|
||||
.or_insert_with(|| {
|
||||
created = true;
|
||||
new_pool
|
||||
})
|
||||
.clone();
|
||||
|
||||
// log new global pool size
|
||||
if created {
|
||||
let global_pool_size = self
|
||||
.global_pool_size
|
||||
.fetch_add(1, atomic::Ordering::Relaxed)
|
||||
+ 1;
|
||||
info!(
|
||||
"pool: created new pool for '{endpoint}', global pool size now {global_pool_size}"
|
||||
);
|
||||
}
|
||||
|
||||
pool
|
||||
}
|
||||
pub(crate) struct Client {
|
||||
pub(crate) inner: ClientInnerCommon<Send>,
|
||||
}
|
||||
|
||||
pub(crate) fn poll_http2_client(
|
||||
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
client: Send,
|
||||
connection: Connect,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
) -> Client<Send> {
|
||||
let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol());
|
||||
let session_id = ctx.session_id();
|
||||
|
||||
let span = info_span!(parent: None, "connection", %conn_id);
|
||||
let cold_start_info = ctx.cold_start_info();
|
||||
span.in_scope(|| {
|
||||
info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection");
|
||||
});
|
||||
|
||||
let pool = match conn_info.endpoint_cache_key() {
|
||||
Some(endpoint) => {
|
||||
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
|
||||
let client = ClientInnerCommon {
|
||||
inner: client.clone(),
|
||||
aux: aux.clone(),
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
pool.write().conns.push_back(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.inc();
|
||||
|
||||
Arc::downgrade(&pool)
|
||||
}
|
||||
None => Weak::new(),
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
async move {
|
||||
let _conn_gauge = conn_gauge;
|
||||
let res = connection.await;
|
||||
match res {
|
||||
Ok(()) => info!("connection closed"),
|
||||
Err(e) => error!(%session_id, "connection error: {e:?}"),
|
||||
}
|
||||
|
||||
// remove from connection pool
|
||||
if let Some(pool) = pool.clone().upgrade() {
|
||||
if pool.write().remove_conn(conn_id) {
|
||||
info!("closed connection removed");
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
let client = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
|
||||
Client::new(client)
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt + Clone> {
|
||||
pub(crate) inner: ClientInnerCommon<C>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Client<C> {
|
||||
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
|
||||
impl Client {
|
||||
pub(self) fn new(inner: ClientInnerCommon<Send>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
|
||||
@@ -53,11 +53,11 @@ pub(crate) struct ClientDataLocal {
|
||||
}
|
||||
|
||||
impl ClientDataLocal {
|
||||
pub fn session(&mut self) -> &mut tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&mut self.session
|
||||
pub fn session(&self) -> &tokio::sync::watch::Sender<uuid::Uuid> {
|
||||
&self.session
|
||||
}
|
||||
|
||||
pub fn cancel(&mut self) {
|
||||
pub fn cancel(&self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
.map(|entry| entry.conn);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if let Some(client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("local_pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
@@ -120,11 +120,9 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
|
||||
Reference in New Issue
Block a user