proxy: count cache-miss for compute latency (#5539)

## Problem

Would be good to view latency for hot-path vs cold-path

## Summary of changes

add some labels to latency metrics
This commit is contained in:
Conrad Ludgate
2023-10-16 16:31:04 +01:00
committed by GitHub
parent 44b1c4c456
commit 8c522ea034
3 changed files with 74 additions and 21 deletions

View File

@@ -20,7 +20,7 @@ use tokio_postgres::AsyncMessage;
use crate::{
auth, console,
metrics::{Ids, MetricCounter, USAGE_METRICS},
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
proxy::{LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
};
use crate::{compute, config};
@@ -139,6 +139,7 @@ impl GlobalConnPool {
session_id: uuid::Uuid,
) -> anyhow::Result<Client> {
let mut client: Option<Client> = None;
let mut latency_timer = LatencyTimer::new("http");
let mut hash_valid = false;
if !force_new {
@@ -182,15 +183,16 @@ impl GlobalConnPool {
let new_client = if let Some(client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
connect_to_compute(self.proxy_config, conn_info, session_id).await
connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await
} else {
latency_timer.pool_hit();
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
return Ok(client);
}
} else {
info!("pool: opening a new connection '{conn_info}'");
connect_to_compute(self.proxy_config, conn_info, session_id).await
connect_to_compute(self.proxy_config, conn_info, session_id, latency_timer).await
};
match &new_client {
@@ -347,6 +349,7 @@ async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
session_id: uuid::Uuid,
latency_timer: LatencyTimer,
) -> anyhow::Result<Client> {
let tls = config.tls_config.as_ref();
let common_names = tls.and_then(|tls| tls.common_names.clone());
@@ -386,6 +389,7 @@ async fn connect_to_compute(
node_info,
&extra,
&creds,
latency_timer,
)
.await
}

View File

@@ -15,12 +15,11 @@ use crate::{
use anyhow::{bail, Context};
use async_trait::async_trait;
use futures::TryFutureExt;
use metrics::{
exponential_buckets, register_histogram, register_int_counter_vec, Histogram, IntCounterVec,
};
use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
use std::{error::Error, io, ops::ControlFlow, sync::Arc};
use prometheus::{register_histogram_vec, HistogramVec};
use std::{error::Error, io, ops::ControlFlow, sync::Arc, time::Instant};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
time,
@@ -93,16 +92,57 @@ pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
static COMPUTE_CONNECTION_LATENCY: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_latency_seconds",
"Time it took for proxy to establish a connection to the compute endpoint",
&["protocol", "cache_miss", "pool_miss"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
.unwrap()
});
pub struct LatencyTimer {
start: Instant,
pool_miss: bool,
cache_miss: bool,
protocol: &'static str,
}
impl LatencyTimer {
pub fn new(protocol: &'static str) -> Self {
Self {
start: Instant::now(),
cache_miss: false,
// by default we don't do pooling
pool_miss: true,
protocol,
}
}
pub fn cache_miss(&mut self) {
self.cache_miss = true;
}
pub fn pool_hit(&mut self) {
self.pool_miss = false;
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration = self.start.elapsed().as_secs_f64();
COMPUTE_CONNECTION_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
])
.observe(duration)
}
}
static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_connection_failures_total",
@@ -495,13 +535,12 @@ pub async fn connect_to_compute<M: ConnectMechanism>(
mut node_info: console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
mut latency_timer: LatencyTimer,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let _timer = COMPUTE_CONNECTION_LATENCY.start_timer();
mechanism.update_connect_config(&mut node_info.config);
// try once
@@ -513,6 +552,8 @@ where
}
};
latency_timer.cache_miss();
let mut num_retries = 1;
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
@@ -789,6 +830,8 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
application_name: params.get("application_name"),
};
let latency_timer = LatencyTimer::new(mode.protocol_label());
let auth_result = match creds
.authenticate(&extra, &mut stream, mode.allow_cleartext())
.await
@@ -812,9 +855,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
node_info.allow_self_signed_compute = allow_self_signed_compute;
let aux = node_info.aux.clone();
let mut node = connect_to_compute(&TcpMechanism { params }, node_info, &extra, &creds)
.or_else(|e| stream.throw_error(e))
.await?;
let mut node = connect_to_compute(
&TcpMechanism { params },
node_info,
&extra,
&creds,
latency_timer,
)
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER

View File

@@ -450,7 +450,7 @@ async fn connect_to_compute_success() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap();
mechanism.verify();
@@ -461,7 +461,7 @@ async fn connect_to_compute_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap();
mechanism.verify();
@@ -473,7 +473,7 @@ async fn connect_to_compute_non_retry_1() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap_err();
mechanism.verify();
@@ -485,7 +485,7 @@ async fn connect_to_compute_non_retry_2() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap();
mechanism.verify();
@@ -501,7 +501,7 @@ async fn connect_to_compute_non_retry_3() {
Retry, Retry, Retry, Retry, /* the 17th time */ Retry,
]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap_err();
mechanism.verify();
@@ -513,7 +513,7 @@ async fn wake_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap();
mechanism.verify();
@@ -525,7 +525,7 @@ async fn wake_non_retry() {
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)
connect_to_compute(&mechanism, cache, &extra, &creds, LatencyTimer::new("test"))
.await
.unwrap_err();
mechanism.verify();