proxy: track control-plane durations per connection request

This commit is contained in:
Conrad Ludgate
2023-12-08 12:29:50 +00:00
parent 699049b8f3
commit ff34746d21
8 changed files with 96 additions and 21 deletions

View File

@@ -189,7 +189,7 @@ async fn auth_quirks(
let AuthInfo {
secret,
allowed_ips,
} = api.get_auth_info(extra, &info).await?;
} = api.get_auth_info(extra, &info, latency_timer).await?;
// check allowed list
if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) {
@@ -255,7 +255,7 @@ async fn auth_and_wake_compute(
let mut num_retries = 0;
let mut node = loop {
let wake_res = api.wake_compute(extra, &compute_credentials.info).await;
let wake_res = api.wake_compute(extra, &compute_credentials.info, latency_timer).await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
@@ -388,12 +388,13 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*;
match self {
Console(api, creds) => api.get_allowed_ips(extra, creds).await,
Console(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
#[cfg(feature = "testing")]
Postgres(api, creds) => api.get_allowed_ips(extra, creds).await,
Postgres(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
Link(_) => Ok(Arc::new(vec![])),
#[cfg(test)]
Test(x) => x.get_allowed_ips(),
@@ -405,13 +406,14 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Console(api, creds) => api.wake_compute(extra, creds, latency_timer).map_ok(Some).await,
#[cfg(feature = "testing")]
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Postgres(api, creds) => api.wake_compute(extra, creds, latency_timer).map_ok(Some).await,
Link(_) => Ok(None),
#[cfg(test)]
Test(x) => x.wake_compute().map(Some),

View File

@@ -33,7 +33,7 @@ pub(super) async fn authenticate(
config.scram_protocol_timeout,
async {
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement");

View File

@@ -24,7 +24,7 @@ pub async fn authenticate_cleartext(
warn!("cleartext auth flow override is enabled, proceeding");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
let auth_outcome = AuthFlow::new(client)
.begin(auth::CleartextPassword(secret))
@@ -54,7 +54,7 @@ pub async fn password_hack_no_authentication(
warn!("project not specified, resorting to the password hack auth flow");
// pause the timer while we communicate with the client
let _paused = latency_timer.pause();
let _paused = latency_timer.wait_for_user();
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)

View File

@@ -6,7 +6,7 @@ use super::messages::MetricsAuxInfo;
use crate::{
auth::backend::ComputeUserInfo,
cache::{timed_lru, TimedLru},
compute, scram,
compute, scram, proxy::LatencyTimer,
};
use async_trait::async_trait;
use dashmap::DashMap;
@@ -250,12 +250,14 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, errors::GetAuthInfoError>;
async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
@@ -263,6 +265,7 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}

View File

@@ -7,6 +7,7 @@ use super::{
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use crate::proxy::LatencyTimer;
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
@@ -146,6 +147,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(creds).await
}
@@ -154,6 +156,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
}
@@ -163,6 +166,7 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
_creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached)

View File

@@ -5,7 +5,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::proxy::{LatencyTimer, ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
@@ -158,7 +158,9 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
let _timer = latency_timer.control_plane();
self.do_get_auth_info(extra, creds).await
}
@@ -166,6 +168,7 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
let key: &str = &creds.endpoint;
if let Some(allowed_ips) = self.caches.allowed_ips.get(key) {
@@ -177,7 +180,11 @@ impl super::Api for Api {
ALLOWED_IPS_BY_CACHE_OUTCOME
.with_label_values(&["miss"])
.inc();
let timer = latency_timer.control_plane();
let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips);
drop(timer);
self.caches
.allowed_ips
.insert(key.into(), allowed_ips.clone());
@@ -189,6 +196,7 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key: &str = &creds.inner.cache_key;
@@ -214,7 +222,10 @@ impl super::Api for Api {
}
}
let timer = latency_timer.control_plane();
let node = self.do_wake_compute(extra, creds).await?;
drop(timer);
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
info!(key = &*key, "created a cache entry for compute node info");

View File

@@ -110,6 +110,19 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});
static CONTROL_PLANE_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_control_plane_latency_seconds",
"Time proxy spent talking to control-plane/console while trying to establish a connection to the compute endpoint",
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_console_request_latency",
@@ -174,6 +187,10 @@ pub struct LatencyTimer {
start: Option<Instant>,
// accumulated time on the stopwatch
accumulated: std::time::Duration,
// time since the stopwatch was started while talking to control-plane
start_cp: Option<Instant>,
// accumulated time on the stopwatch while talking to control-plane
accumulated_cp: std::time::Duration,
// label data
protocol: &'static str,
cache_miss: bool,
@@ -181,7 +198,11 @@ pub struct LatencyTimer {
outcome: &'static str,
}
pub struct LatencyTimerPause<'a> {
pub struct LatencyTimerUserIO<'a> {
timer: &'a mut LatencyTimer,
}
pub struct LatencyTimerControlPlane<'a> {
timer: &'a mut LatencyTimer,
}
@@ -190,6 +211,8 @@ impl LatencyTimer {
Self {
start: Some(Instant::now()),
accumulated: std::time::Duration::ZERO,
start_cp: None,
accumulated_cp: std::time::Duration::ZERO,
protocol,
cache_miss: false,
// by default we don't do pooling
@@ -199,11 +222,17 @@ impl LatencyTimer {
}
}
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
pub fn control_plane(&mut self) -> LatencyTimerControlPlane<'_> {
// start the stopwatch again
self.start = Some(Instant::now());
LatencyTimerControlPlane { timer: self }
}
pub fn wait_for_user(&mut self) -> LatencyTimerUserIO<'_> {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
LatencyTimerPause { timer: self }
LatencyTimerUserIO { timer: self }
}
pub fn cache_miss(&mut self) {
@@ -219,13 +248,25 @@ impl LatencyTimer {
}
}
impl Drop for LatencyTimerPause<'_> {
impl Drop for LatencyTimerUserIO<'_> {
fn drop(&mut self) {
// start the stopwatch again
self.timer.start = Some(Instant::now());
}
}
impl Drop for LatencyTimerControlPlane<'_> {
fn drop(&mut self) {
// stop the control-plane stopwatch and record the time that we have accumulated
let start = self
.timer
.start_cp
.take()
.expect("latency timer should be started");
self.timer.accumulated_cp += start.elapsed();
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration =
@@ -237,7 +278,21 @@ impl Drop for LatencyTimer {
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration.as_secs_f64())
.observe(duration.as_secs_f64());
let duration_cp = self
.start_cp
.map(|start| start.elapsed())
.unwrap_or_default()
+ self.accumulated_cp;
CONTROL_PLANE_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration_cp.as_secs_f64());
}
}
@@ -695,9 +750,9 @@ where
info!("compute node's state has likely changed; requesting a wake-up");
let node_info = loop {
let wake_res = match creds {
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds, &mut latency_timer).await,
#[cfg(feature = "testing")]
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds, &mut latency_timer).await,
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend

View File

@@ -405,7 +405,7 @@ async fn connect_to_compute(
conn_info: &ConnInfo,
conn_id: uuid::Uuid,
session_id: uuid::Uuid,
latency_timer: LatencyTimer,
mut latency_timer: LatencyTimer,
peer_addr: IpAddr,
) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref();
@@ -437,13 +437,13 @@ async fn connect_to_compute(
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http {
let allowed_ips = backend.get_allowed_ips(&extra).await?;
let allowed_ips = backend.get_allowed_ips(&extra, &mut latency_timer).await?;
if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed().into());
}
}
let node_info = backend
.wake_compute(&extra)
.wake_compute(&extra, &mut latency_timer)
.await?
.context("missing cache entry from wake_compute")?;