Add GC to http connection pool (#6196)

## Problem

HTTP connection pool will grow without being pruned

## Summary of changes

Remove connection clients from pools once idle, or once they exit.
Periodically clear pool shards.

GC Logic:

Each shard contains a hashmap of `Arc<EndpointPool>`s.
Each connection stores a `Weak<EndpointPool>`.

During a GC sweep, we take a random shard write lock, and check that if
any of the `Arc<EndpointPool>`s are unique (using `Arc::get_mut`).
- If they are unique, then we check that the endpoint-pool is empty, and
sweep if it is.
- If they are not unique, then the endpoint-pool is in active use and we
don't sweep.
- Idle connections will self-clear from the endpoint-pool after 5
minutes.

Technically, the uniqueness of the endpoint-pool should be enough to
consider it empty, but the connection count check is done for
completeness sake.
This commit is contained in:
Conrad Ludgate
2023-12-21 12:00:10 +00:00
committed by GitHub
parent 48890d206e
commit 2df3602a4b
5 changed files with 321 additions and 118 deletions

View File

@@ -11,6 +11,7 @@ use proxy::http;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::RateLimiterConfig;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;
use anyhow::bail;
@@ -95,12 +96,8 @@ struct ProxyCliArgs {
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
/// timeout for http connections
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
/// Whether the SQL over http pool is opt-in
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
sql_over_http_pool_opt_in: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
scram_protocol_timeout: tokio::time::Duration,
@@ -138,6 +135,36 @@ struct ProxyCliArgs {
disable_ip_check_for_http: bool,
}
#[derive(clap::Args, Clone, Copy, Debug)]
struct SqlOverHttpArgs {
/// timeout for http connection requests
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
sql_over_http_timeout: tokio::time::Duration,
/// Whether the SQL over http pool is opt-in
#[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
sql_over_http_pool_opt_in: bool,
/// How many connections to pool for each endpoint. Excess connections are discarded
#[clap(long, default_value_t = 20)]
sql_over_http_pool_max_conns_per_endpoint: usize,
/// How long pooled connections should remain idle for before closing
#[clap(long, default_value = "5m", value_parser = humantime::parse_duration)]
sql_over_http_idle_timeout: tokio::time::Duration,
/// Duration each shard will wait on average before a GC sweep.
/// A longer time will causes sweeps to take longer but will interfere less frequently.
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
sql_over_http_pool_gc_epoch: tokio::time::Duration,
/// How many shards should the global pool have. Must be a power of two.
/// More shards will introduce less contention for pool operations, but can
/// increase memory used by the pool
#[clap(long, default_value_t = 128)]
sql_over_http_pool_shards: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
@@ -327,8 +354,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
}
};
let http_config = HttpConfig {
timeout: args.sql_over_http_timeout,
pool_opt_in: args.sql_over_http_pool_opt_in,
request_timeout: args.sql_over_http.sql_over_http_timeout,
pool_options: GlobalConnPoolOptions {
max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint,
gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch,
pool_shards: args.sql_over_http.sql_over_http_pool_shards,
idle_timeout: args.sql_over_http.sql_over_http_idle_timeout,
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
},
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,

View File

@@ -1,4 +1,4 @@
use crate::{auth, rate_limiter::RateBucketInfo};
use crate::{auth, rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions};
use anyhow::{bail, ensure, Context, Ok};
use rustls::{sign, Certificate, PrivateKey};
use sha2::{Digest, Sha256};
@@ -36,8 +36,8 @@ pub struct TlsConfig {
}
pub struct HttpConfig {
pub timeout: tokio::time::Duration,
pub pool_opt_in: bool,
pub request_timeout: tokio::time::Duration,
pub pool_options: GlobalConnPoolOptions,
}
pub struct AuthenticationConfig {

View File

@@ -6,9 +6,13 @@ mod conn_pool;
mod sql_over_http;
mod websocket;
pub use conn_pool::GlobalConnPoolOptions;
use anyhow::bail;
use hyper::StatusCode;
use metrics::IntCounterPairGuard;
use rand::rngs::StdRng;
use rand::SeedableRng;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
@@ -47,6 +51,11 @@ pub async fn task_main(
let conn_pool = conn_pool::GlobalConnPool::new(config);
let conn_pool2 = Arc::clone(&conn_pool);
tokio::spawn(async move {
conn_pool2.gc_worker(StdRng::from_entropy()).await;
});
// shutdown the connection pool
tokio::spawn({
let cancellation_token = cancellation_token.clone();

View File

@@ -1,15 +1,19 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::future::poll_fn;
use futures::{future::poll_fn, Future};
use metrics::{register_int_counter_pair, IntCounterPair, IntCounterPairGuard};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use pbkdf2::{
password_hash::{PasswordHashString, PasswordHasher, PasswordVerifier, SaltString},
Params, Pbkdf2,
};
use pq_proto::StartupMessageParams;
use prometheus::{exponential_buckets, register_histogram, Histogram};
use rand::Rng;
use smol_str::SmolStr;
use std::{collections::HashMap, net::IpAddr, sync::Arc};
use std::{collections::HashMap, net::IpAddr, pin::pin, sync::Arc, sync::Weak, time::Duration};
use std::{
fmt,
task::{ready, Poll},
@@ -18,7 +22,7 @@ use std::{
ops::Deref,
sync::atomic::{self, AtomicUsize},
};
use tokio::time;
use tokio::time::{self, Instant};
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use crate::{
@@ -30,11 +34,10 @@ use crate::{
};
use crate::{compute, config};
use tracing::{error, warn, Span};
use tracing::{debug, error, warn, Span};
use tracing::{info, info_span, Instrument};
pub const APP_NAME: &str = "/sql_over_http";
const MAX_CONNS_PER_ENDPOINT: usize = 20;
#[derive(Debug, Clone)]
pub struct ConnInfo {
@@ -69,6 +72,77 @@ struct ConnPoolEntry {
pub struct EndpointConnPool {
pools: HashMap<(SmolStr, SmolStr), DbUserConnPool>,
total_conns: usize,
max_conns: usize,
_guard: IntCounterPairGuard,
}
impl EndpointConnPool {
fn get_conn_entry(&mut self, db_user: (SmolStr, SmolStr)) -> Option<ConnPoolEntry> {
let Self {
pools, total_conns, ..
} = self;
pools
.get_mut(&db_user)
.and_then(|pool_entries| pool_entries.get_conn_entry(total_conns))
}
fn remove_client(&mut self, db_user: (SmolStr, SmolStr), conn_id: uuid::Uuid) -> bool {
let Self {
pools, total_conns, ..
} = self;
if let Some(pool) = pools.get_mut(&db_user) {
let old_len = pool.conns.len();
pool.conns.retain(|conn| conn.conn.conn_id != conn_id);
let new_len = pool.conns.len();
let removed = old_len - new_len;
*total_conns -= removed;
removed > 0
} else {
false
}
}
fn put(pool: &RwLock<Self>, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id;
if client.inner.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
return Ok(());
}
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < pool.max_conns {
// we create this db-user entry in get, so it should not be None
if let Some(pool_entries) = pool.pools.get_mut(&conn_info.db_and_user()) {
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
}
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
}
}
/// 4096 is the number of rounds that SCRAM-SHA-256 recommends.
@@ -87,6 +161,27 @@ pub struct DbUserConnPool {
password_hash: Option<PasswordHashString>,
}
impl DbUserConnPool {
fn clear_closed_clients(&mut self, conns: &mut usize) {
let old_len = self.conns.len();
self.conns.retain(|conn| !conn.conn.inner.is_closed());
let new_len = self.conns.len();
let removed = old_len - new_len;
*conns -= removed;
}
fn get_conn_entry(&mut self, conns: &mut usize) -> Option<ConnPoolEntry> {
self.clear_closed_clients(conns);
let conn = self.conns.pop();
if conn.is_some() {
*conns -= 1;
}
conn
}
}
pub struct GlobalConnPool {
// endpoint -> per-endpoint connection pool
//
@@ -94,52 +189,127 @@ pub struct GlobalConnPool {
// pool as early as possible and release the lock.
global_pool: DashMap<SmolStr, Arc<RwLock<EndpointConnPool>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
proxy_config: &'static crate::config::ProxyConfig,
}
#[derive(Debug, Clone, Copy)]
pub struct GlobalConnPoolOptions {
// Maximum number of connections per one endpoint.
// Can mix different (dbname, username) connections.
// When running out of free slots for a particular endpoint,
// falls back to opening a new connection for each request.
max_conns_per_endpoint: usize,
pub max_conns_per_endpoint: usize,
proxy_config: &'static crate::config::ProxyConfig,
pub gc_epoch: Duration,
// Using a lock to remove any race conditions.
// Eg cleaning up connections while a new connection is returned
closed: RwLock<bool>,
pub pool_shards: usize,
pub idle_timeout: Duration,
pub opt_in: bool,
}
pub static GC_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"proxy_http_pool_reclaimation_lag_seconds",
"Time it takes to reclaim unused connection pools",
// 1us -> 65ms
exponential_buckets(1e-6, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static ENDPOINT_POOLS: Lazy<IntCounterPair> = Lazy::new(|| {
register_int_counter_pair!(
"proxy_http_pool_endpoints_registered_total",
"Number of endpoints we have registered pools for",
"proxy_http_pool_endpoints_unregistered_total",
"Number of endpoints we have unregistered pools for",
)
.unwrap()
});
impl GlobalConnPool {
pub fn new(config: &'static crate::config::ProxyConfig) -> Arc<Self> {
let shards = config.http_config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::new(),
global_pool: DashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
max_conns_per_endpoint: MAX_CONNS_PER_ENDPOINT,
proxy_config: config,
closed: RwLock::new(false),
})
}
pub fn shutdown(&self) {
*self.closed.write() = true;
// drops all strong references to endpoint-pools
self.global_pool.clear();
}
self.global_pool.retain(|_, endpoint_pool| {
let mut pool = endpoint_pool.write();
// by clearing this hashmap, we remove the slots that a connection can be returned to.
// when returning, it drops the connection if the slot doesn't exist
pool.pools.clear();
pool.total_conns = 0;
pub async fn gc_worker(&self, mut rng: impl Rng) {
let epoch = self.proxy_config.http_config.pool_options.gc_epoch;
let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
loop {
interval.tick().await;
false
let shard = rng.gen_range(0..self.global_pool.shards().len());
self.gc(shard);
}
}
fn gc(&self, shard: usize) {
debug!(shard, "pool: performing epoch reclamation");
// acquire a random shard lock
let mut shard = self.global_pool.shards()[shard].write();
let timer = GC_LATENCY.start_timer();
let current_len = shard.len();
shard.retain(|endpoint, x| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool {
pools, total_conns, ..
} = pool.get_mut();
// ensure that closed clients are removed
pools
.iter_mut()
.for_each(|(_, db_pool)| db_pool.clear_closed_clients(total_conns));
// we only remove this pool if it has no active connections
if *total_conns == 0 {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
}
true
});
let new_len = shard.len();
drop(shard);
timer.observe_duration();
let removed = current_len - new_len;
if removed > 0 {
let global_pool_size = self
.global_pool_size
.fetch_sub(removed, atomic::Ordering::Relaxed)
- removed;
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
pub async fn get(
self: &Arc<Self>,
conn_info: &ConnInfo,
conn_info: ConnInfo,
force_new: bool,
session_id: uuid::Uuid,
peer_addr: IpAddr,
@@ -147,15 +317,11 @@ impl GlobalConnPool {
let mut client: Option<ClientInner> = None;
let mut latency_timer = LatencyTimer::new("http");
let pool = if force_new {
None
} else {
Some((conn_info.clone(), self.clone()))
};
let mut hash_valid = false;
let mut endpoint_pool = Weak::new();
if !force_new {
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
endpoint_pool = Arc::downgrade(&pool);
let mut hash = None;
// find a pool entry by (dbname, username) if exists
@@ -180,12 +346,8 @@ impl GlobalConnPool {
// we will continue with the regular connection flow
if validate.is_ok() {
hash_valid = true;
let mut pool = pool.write();
if let Some(pool_entries) = pool.pools.get_mut(&conn_info.db_and_user()) {
if let Some(entry) = pool_entries.conns.pop() {
client = Some(entry.conn);
pool.total_conns -= 1;
}
if let Some(entry) = pool.write().get_conn_entry(conn_info.db_and_user()) {
client = Some(entry.conn)
}
}
}
@@ -198,11 +360,12 @@ impl GlobalConnPool {
info!(%conn_id, "pool: cached connection '{conn_info}' is closed, opening a new one");
connect_to_compute(
self.proxy_config,
conn_info,
&conn_info,
conn_id,
session_id,
latency_timer,
peer_addr,
endpoint_pool.clone(),
)
.await
} else {
@@ -214,18 +377,19 @@ impl GlobalConnPool {
);
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
return Ok(Client::new(client, conn_info, endpoint_pool).await);
}
} else {
let conn_id = uuid::Uuid::new_v4();
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
connect_to_compute(
self.proxy_config,
conn_info,
&conn_info,
conn_id,
session_id,
latency_timer,
peer_addr,
endpoint_pool.clone(),
)
.await
};
@@ -269,59 +433,7 @@ impl GlobalConnPool {
_ => {}
}
let new_client = new_client?;
Ok(Client::new(new_client, pool).await)
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close
// while we are in the middle of returning the connection.
let closed = self.closed.read();
if *closed {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is closed");
return Ok(());
}
if client.inner.is_closed() {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because connection is closed");
return Ok(());
}
let pool = self.get_or_create_endpoint_pool(&conn_info.hostname);
// return connection to the pool
let mut returned = false;
let mut per_db_size = 0;
let total_conns = {
let mut pool = pool.write();
if pool.total_conns < self.max_conns_per_endpoint {
// we create this db-user entry in get, so it should not be None
if let Some(pool_entries) = pool.pools.get_mut(&conn_info.db_and_user()) {
pool_entries.conns.push(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
returned = true;
per_db_size = pool_entries.conns.len();
pool.total_conns += 1;
}
}
pool.total_conns
};
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
Ok(Client::new(new_client, conn_info, endpoint_pool).await)
}
fn get_or_create_endpoint_pool(&self, endpoint: &SmolStr) -> Arc<RwLock<EndpointConnPool>> {
@@ -334,6 +446,12 @@ impl GlobalConnPool {
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
pools: HashMap::new(),
total_conns: 0,
max_conns: self
.proxy_config
.http_config
.pool_options
.max_conns_per_endpoint,
_guard: ENDPOINT_POOLS.guard(),
}));
// find or create a pool for this endpoint
@@ -363,9 +481,11 @@ impl GlobalConnPool {
}
struct TokioMechanism<'a> {
pool: Weak<RwLock<EndpointConnPool>>,
conn_info: &'a ConnInfo,
session_id: uuid::Uuid,
conn_id: uuid::Uuid,
idle: Duration,
}
#[async_trait]
@@ -385,6 +505,8 @@ impl ConnectMechanism for TokioMechanism<'_> {
timeout,
self.conn_id,
self.session_id,
self.pool.clone(),
self.idle,
)
.await
}
@@ -403,6 +525,7 @@ async fn connect_to_compute(
session_id: uuid::Uuid,
latency_timer: LatencyTimer,
peer_addr: IpAddr,
pool: Weak<RwLock<EndpointConnPool>>,
) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
@@ -447,6 +570,8 @@ async fn connect_to_compute(
conn_id,
conn_info,
session_id,
pool,
idle: config.http_config.pool_options.idle_timeout,
},
node_info,
&extra,
@@ -462,6 +587,8 @@ async fn connect_to_compute_once(
timeout: time::Duration,
conn_id: uuid::Uuid,
mut session: uuid::Uuid,
pool: Weak<RwLock<EndpointConnPool>>,
idle: Duration,
) -> Result<ClientInner, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();
@@ -490,13 +617,29 @@ async fn connect_to_compute_once(
branch_id: node_info.aux.branch_id.clone(),
};
let db_user = conn_info.db_and_user();
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
poll_fn(move |cx| {
if matches!(rx.has_changed(), Ok(true)) {
session = *rx.borrow_and_update();
info!(%session, "changed session");
idle_timeout.as_mut().reset(Instant::now() + idle);
}
// 5 minute idle connection timeout
if idle_timeout.as_mut().poll(cx).is_ready() {
idle_timeout.as_mut().reset(Instant::now() + idle);
info!("connection idle");
if let Some(pool) = pool.clone().upgrade() {
// remove client from pool - should close the connection if it's idle.
// does nothing if the client is currently checked-out and in-use
if pool.write().remove_client(db_user.clone(), conn_id) {
info!("idle connection removed");
}
}
}
loop {
@@ -514,15 +657,25 @@ async fn connect_to_compute_once(
}
Some(Err(e)) => {
error!(%session, "connection error: {}", e);
return Poll::Ready(())
break
}
None => {
info!("connection closed");
return Poll::Ready(())
break
}
}
}
}).await
// remove from connection pool
if let Some(pool) = pool.clone().upgrade() {
if pool.write().remove_client(db_user.clone(), conn_id) {
info!("closed connection removed");
}
}
Poll::Ready(())
}).await;
}
.instrument(span)
);
@@ -552,23 +705,27 @@ pub struct Client {
conn_id: uuid::Uuid,
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool>>,
}
pub struct Discard<'a> {
conn_id: uuid::Uuid,
pool: &'a mut Option<(ConnInfo, Arc<GlobalConnPool>)>,
conn_info: &'a ConnInfo,
pool: &'a mut Weak<RwLock<EndpointConnPool>>,
}
impl Client {
pub(self) async fn new(
inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
conn_info: ConnInfo,
pool: Weak<RwLock<EndpointConnPool>>,
) -> Self {
Self {
conn_id: inner.conn_id,
inner: Some(inner),
span: Span::current(),
conn_info,
pool,
}
}
@@ -577,6 +734,7 @@ impl Client {
inner,
pool,
conn_id,
conn_info,
span: _,
} = self;
(
@@ -586,6 +744,7 @@ impl Client {
.inner,
Discard {
pool,
conn_info,
conn_id: *conn_id,
},
)
@@ -601,14 +760,14 @@ impl Client {
impl Discard<'_> {
pub fn check_idle(&mut self, status: ReadyForQueryStatus) {
if status != ReadyForQueryStatus::Idle {
if let Some((conn_info, _)) = self.pool.take() {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle")
}
let conn_info = &self.conn_info;
if status != ReadyForQueryStatus::Idle && std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is not idle")
}
}
pub fn discard(&mut self) {
if let Some((conn_info, _)) = self.pool.take() {
let conn_info = &self.conn_info;
if std::mem::take(self.pool).strong_count() > 0 {
info!(conn_id = %self.conn_id, "pool: throwing away connection '{conn_info}' because connection is potentially in a broken state")
}
}
@@ -628,16 +787,17 @@ impl Deref for Client {
impl Drop for Client {
fn drop(&mut self) {
let conn_info = self.conn_info.clone();
let client = self
.inner
.take()
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
if let Some(conn_pool) = std::mem::take(&mut self.pool).upgrade() {
let current_span = self.span.clone();
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client);
let _ = EndpointConnPool::put(&conn_pool, &conn_info, client);
});
}
}

View File

@@ -206,7 +206,7 @@ pub async fn handle(
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.timeout,
config.request_timeout,
handle_inner(
config,
request,
@@ -278,7 +278,7 @@ pub async fn handle(
Err(_) => {
let message = format!(
"HTTP-Connection timed out, execution time exeeded {} seconds",
config.timeout.as_secs()
config.request_timeout.as_secs()
);
error!(message);
json_response(
@@ -320,7 +320,8 @@ async fn handle_inner(
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
let allow_pool = !config.pool_opt_in || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
let allow_pool =
!config.pool_options.opt_in || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
// isolation level, read only and deferrable
@@ -359,7 +360,7 @@ async fn handle_inner(
let payload: Payload = serde_json::from_slice(&body)?;
let mut client = conn_pool
.get(&conn_info, !allow_pool, session_id, peer_addr)
.get(conn_info, !allow_pool, session_id, peer_addr)
.await?;
let mut response = Response::builder()