diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index 7258f1a2cf..b268c4073e 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -20,7 +20,7 @@ use tokio_postgres::AsyncMessage; use crate::{ auth, console, metrics::{Ids, MetricCounter, USAGE_METRICS}, - proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER}, + proxy::{LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER}, }; use crate::{compute, config}; @@ -139,6 +139,7 @@ impl GlobalConnPool { session_id: uuid::Uuid, ) -> anyhow::Result { let mut client: Option = None; + let mut latency_timer = LatencyTimer::new("http"); let mut hash_valid = false; if !force_new { @@ -182,15 +183,16 @@ impl GlobalConnPool { let new_client = if let Some(client) = client { if client.inner.is_closed() { info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - connect_to_compute(self.proxy_config, conn_info, session_id).await + connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await } else { + latency_timer.pool_hit(); info!("pool: reusing connection '{conn_info}'"); client.session.send(session_id)?; return Ok(client); } } else { info!("pool: opening a new connection '{conn_info}'"); - connect_to_compute(self.proxy_config, conn_info, session_id).await + connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await }; match &new_client { @@ -347,6 +349,7 @@ async fn connect_to_compute( config: &config::ProxyConfig, conn_info: &ConnInfo, session_id: uuid::Uuid, + latency_timer: LatencyTimer, ) -> anyhow::Result { let tls = config.tls_config.as_ref(); let common_names = tls.and_then(|tls| tls.common_names.clone()); @@ -386,6 +389,7 @@ async fn connect_to_compute( node_info, &extra, &creds, + latency_timer, ) .await } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 9cfe98347b..6a928547a9 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -15,12 +15,11 @@ use crate::{ use anyhow::{bail, Context}; use async_trait::async_trait; use futures::TryFutureExt; -use metrics::{ - exponential_buckets, register_histogram, register_int_counter_vec, Histogram, IntCounterVec, -}; +use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use std::{error::Error, io, ops::ControlFlow, sync::Arc}; +use prometheus::{register_histogram_vec, HistogramVec}; +use std::{error::Error, io, ops::ControlFlow, sync::Arc, time::Instant}; use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, time, @@ -93,16 +92,57 @@ pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { .unwrap() }); -static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { - register_histogram!( +static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( "proxy_compute_connection_latency_seconds", "Time it took for proxy to establish a connection to the compute endpoint", + &["protocol", "cache_miss", "pool_miss"], // largest bucket = 2^16 * 0.5ms = 32s exponential_buckets(0.0005, 2.0, 16).unwrap(), ) .unwrap() }); +pub struct LatencyTimer { + start: Instant, + pool_miss: bool, + cache_miss: bool, + protocol: &'static str, +} + +impl LatencyTimer { + pub fn new(protocol: &'static str) -> Self { + Self { + start: Instant::now(), + cache_miss: false, + // by default we don't do pooling + pool_miss: true, + protocol, + } + } + + pub fn cache_miss(&mut self) { + self.cache_miss = true; + } + + pub fn pool_hit(&mut self) { + self.pool_miss = false; + } +} + +impl Drop for LatencyTimer { + fn drop(&mut self) { + let duration = self.start.elapsed().as_secs_f64(); + COMPUTE_CONNECTION_LATENCY + .with_label_values(&[ + self.protocol, + bool_to_str(self.cache_miss), + bool_to_str(self.pool_miss), + ]) + .observe(duration) + } +} + static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_connection_failures_total", @@ -495,13 +535,12 @@ pub async fn connect_to_compute( mut node_info: console::CachedNodeInfo, extra: &console::ConsoleReqExtra<'_>, creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>, + mut latency_timer: LatencyTimer, ) -> Result where M::ConnectError: ShouldRetry + std::fmt::Debug, M::Error: From, { - let _timer = COMPUTE_CONNECTION_LATENCY.start_timer(); - mechanism.update_connect_config(&mut node_info.config); // try once @@ -513,6 +552,8 @@ where } }; + latency_timer.cache_miss(); + let mut num_retries = 1; // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node @@ -789,6 +830,8 @@ impl Client<'_, S> { application_name: params.get("application_name"), }; + let latency_timer = LatencyTimer::new(mode.protocol_label()); + let auth_result = match creds .authenticate(&extra, &mut stream, mode.allow_cleartext()) .await @@ -812,9 +855,15 @@ impl Client<'_, S> { node_info.allow_self_signed_compute = allow_self_signed_compute; let aux = node_info.aux.clone(); - let mut node = connect_to_compute(&TcpMechanism { params }, node_info, &extra, &creds) - .or_else(|e| stream.throw_error(e)) - .await?; + let mut node = connect_to_compute( + &TcpMechanism { params }, + node_info, + &extra, + &creds, + latency_timer, + ) + .or_else(|e| stream.throw_error(e)) + .await?; let proto = mode.protocol_label(); NUM_DB_CONNECTIONS_OPENED_COUNTER diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 9615017883..8f7f461bef 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -450,7 +450,7 @@ async fn connect_to_compute_success() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Connect]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap(); mechanism.verify(); @@ -461,7 +461,7 @@ async fn connect_to_compute_retry() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap(); mechanism.verify(); @@ -473,7 +473,7 @@ async fn connect_to_compute_non_retry_1() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap_err(); mechanism.verify(); @@ -485,7 +485,7 @@ async fn connect_to_compute_non_retry_2() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap(); mechanism.verify(); @@ -501,7 +501,7 @@ async fn connect_to_compute_non_retry_3() { Retry, Retry, Retry, Retry, /* the 17th time */ Retry, ]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap_err(); mechanism.verify(); @@ -513,7 +513,7 @@ async fn wake_retry() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap(); mechanism.verify(); @@ -525,7 +525,7 @@ async fn wake_non_retry() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]); let (cache, extra, creds) = helper_create_connect_info(&mechanism); - connect_to_compute(&mechanism, cache, &extra, &creds) + connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test")) .await .unwrap_err(); mechanism.verify();