Compare commits

...

2 Commits

Author SHA1 Message Date
Conrad Ludgate
cd277a1420 fmt 2023-12-08 15:58:17 +00:00
Conrad Ludgate
ff34746d21 proxy: track control-plane durations per connection request 2023-12-08 12:29:50 +00:00
8 changed files with 112 additions and 21 deletions

View File

@@ -189,7 +189,7 @@ async fn auth_quirks(
let AuthInfo { let AuthInfo {
secret, secret,
allowed_ips, allowed_ips,
} = api.get_auth_info(extra, &info).await?; } = api.get_auth_info(extra, &info, latency_timer).await?;
// check allowed list // check allowed list
if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) { if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) {
@@ -255,7 +255,9 @@ async fn auth_and_wake_compute(
let mut num_retries = 0; let mut num_retries = 0;
let mut node = loop { let mut node = loop {
let wake_res = api.wake_compute(extra, &compute_credentials.info).await; let wake_res = api
.wake_compute(extra, &compute_credentials.info, latency_timer)
.await;
match handle_try_wake(wake_res, num_retries) { match handle_try_wake(wake_res, num_retries) {
Err(e) => { Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
@@ -388,12 +390,13 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips( pub async fn get_allowed_ips(
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> { ) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*; use BackendType::*;
match self { match self {
Console(api, creds) => api.get_allowed_ips(extra, creds).await, Console(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
Postgres(api, creds) => api.get_allowed_ips(extra, creds).await, Postgres(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
Link(_) => Ok(Arc::new(vec![])), Link(_) => Ok(Arc::new(vec![])),
#[cfg(test)] #[cfg(test)]
Test(x) => x.get_allowed_ips(), Test(x) => x.get_allowed_ips(),
@@ -405,13 +408,22 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn wake_compute( pub async fn wake_compute(
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> { ) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*; use BackendType::*;
match self { match self {
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, Console(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, Postgres(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
Link(_) => Ok(None), Link(_) => Ok(None),
#[cfg(test)] #[cfg(test)]
Test(x) => x.wake_compute().map(Some), Test(x) => x.wake_compute().map(Some),

View File

@@ -33,7 +33,7 @@ pub(super) async fn authenticate(
config.scram_protocol_timeout, config.scram_protocol_timeout,
async { async {
// pause the timer while we communicate with the client // pause the timer while we communicate with the client
let _paused = latency_timer.pause(); let _paused = latency_timer.wait_for_user();
flow.begin(scram).await.map_err(|error| { flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement"); warn!(?error, "error sending scram acknowledgement");

View File

@@ -24,7 +24,7 @@ pub async fn authenticate_cleartext(
warn!("cleartext auth flow override is enabled, proceeding"); warn!("cleartext auth flow override is enabled, proceeding");
// pause the timer while we communicate with the client // pause the timer while we communicate with the client
let _paused = latency_timer.pause(); let _paused = latency_timer.wait_for_user();
let auth_outcome = AuthFlow::new(client) let auth_outcome = AuthFlow::new(client)
.begin(auth::CleartextPassword(secret)) .begin(auth::CleartextPassword(secret))
@@ -54,7 +54,7 @@ pub async fn password_hack_no_authentication(
warn!("project not specified, resorting to the password hack auth flow"); warn!("project not specified, resorting to the password hack auth flow");
// pause the timer while we communicate with the client // pause the timer while we communicate with the client
let _paused = latency_timer.pause(); let _paused = latency_timer.wait_for_user();
let payload = AuthFlow::new(client) let payload = AuthFlow::new(client)
.begin(auth::PasswordHack) .begin(auth::PasswordHack)

View File

@@ -6,7 +6,9 @@ use super::messages::MetricsAuxInfo;
use crate::{ use crate::{
auth::backend::ComputeUserInfo, auth::backend::ComputeUserInfo,
cache::{timed_lru, TimedLru}, cache::{timed_lru, TimedLru},
compute, scram, compute,
proxy::LatencyTimer,
scram,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap; use dashmap::DashMap;
@@ -250,12 +252,14 @@ pub trait Api {
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, errors::GetAuthInfoError>; ) -> Result<AuthInfo, errors::GetAuthInfoError>;
async fn get_allowed_ips( async fn get_allowed_ips(
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>; ) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info. /// Wake up the compute node and return the corresponding connection info.
@@ -263,6 +267,7 @@ pub trait Api {
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, errors::WakeComputeError>; ) -> Result<CachedNodeInfo, errors::WakeComputeError>;
} }

View File

@@ -6,6 +6,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError}, errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
}; };
use crate::proxy::LatencyTimer;
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl}; use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use async_trait::async_trait; use async_trait::async_trait;
use futures::TryFutureExt; use futures::TryFutureExt;
@@ -146,6 +147,7 @@ impl super::Api for Api {
&self, &self,
_extra: &ConsoleReqExtra<'_>, _extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> { ) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(creds).await self.do_get_auth_info(creds).await
} }
@@ -154,6 +156,7 @@ impl super::Api for Api {
&self, &self,
_extra: &ConsoleReqExtra<'_>, _extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> { ) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips)) Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
} }
@@ -163,6 +166,7 @@ impl super::Api for Api {
&self, &self,
_extra: &ConsoleReqExtra<'_>, _extra: &ConsoleReqExtra<'_>,
_creds: &ComputeUserInfo, _creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> { ) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute() self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached) .map_ok(CachedNodeInfo::new_uncached)

View File

@@ -5,7 +5,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError}, errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
}; };
use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; use crate::proxy::{LatencyTimer, ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::{auth::backend::ComputeUserInfo, compute, http, scram}; use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
use async_trait::async_trait; use async_trait::async_trait;
use futures::TryFutureExt; use futures::TryFutureExt;
@@ -158,7 +158,9 @@ impl super::Api for Api {
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> { ) -> Result<AuthInfo, GetAuthInfoError> {
let _timer = latency_timer.control_plane();
self.do_get_auth_info(extra, creds).await self.do_get_auth_info(extra, creds).await
} }
@@ -166,6 +168,7 @@ impl super::Api for Api {
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> { ) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
let key: &str = &creds.endpoint; let key: &str = &creds.endpoint;
if let Some(allowed_ips) = self.caches.allowed_ips.get(key) { if let Some(allowed_ips) = self.caches.allowed_ips.get(key) {
@@ -177,7 +180,11 @@ impl super::Api for Api {
ALLOWED_IPS_BY_CACHE_OUTCOME ALLOWED_IPS_BY_CACHE_OUTCOME
.with_label_values(&["miss"]) .with_label_values(&["miss"])
.inc(); .inc();
let timer = latency_timer.control_plane();
let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips); let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips);
drop(timer);
self.caches self.caches
.allowed_ips .allowed_ips
.insert(key.into(), allowed_ips.clone()); .insert(key.into(), allowed_ips.clone());
@@ -189,6 +196,7 @@ impl super::Api for Api {
&self, &self,
extra: &ConsoleReqExtra<'_>, extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo, creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> { ) -> Result<CachedNodeInfo, WakeComputeError> {
let key: &str = &creds.inner.cache_key; let key: &str = &creds.inner.cache_key;
@@ -214,7 +222,10 @@ impl super::Api for Api {
} }
} }
let timer = latency_timer.control_plane();
let node = self.do_wake_compute(extra, creds).await?; let node = self.do_wake_compute(extra, creds).await?;
drop(timer);
let (_, cached) = self.caches.node_info.insert(key.clone(), node); let (_, cached) = self.caches.node_info.insert(key.clone(), node);
info!(key = &*key, "created a cache entry for compute node info"); info!(key = &*key, "created a cache entry for compute node info");

View File

@@ -110,6 +110,19 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap() .unwrap()
}); });
static CONTROL_PLANE_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_control_plane_latency_seconds",
"Time proxy spent talking to control-plane/console while trying to establish a connection to the compute endpoint",
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| { pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!( register_histogram_vec!(
"proxy_console_request_latency", "proxy_console_request_latency",
@@ -174,6 +187,10 @@ pub struct LatencyTimer {
start: Option<Instant>, start: Option<Instant>,
// accumulated time on the stopwatch // accumulated time on the stopwatch
accumulated: std::time::Duration, accumulated: std::time::Duration,
// time since the stopwatch was started while talking to control-plane
start_cp: Option<Instant>,
// accumulated time on the stopwatch while talking to control-plane
accumulated_cp: std::time::Duration,
// label data // label data
protocol: &'static str, protocol: &'static str,
cache_miss: bool, cache_miss: bool,
@@ -181,7 +198,11 @@ pub struct LatencyTimer {
outcome: &'static str, outcome: &'static str,
} }
pub struct LatencyTimerPause<'a> { pub struct LatencyTimerUserIO<'a> {
timer: &'a mut LatencyTimer,
}
pub struct LatencyTimerControlPlane<'a> {
timer: &'a mut LatencyTimer, timer: &'a mut LatencyTimer,
} }
@@ -190,6 +211,8 @@ impl LatencyTimer {
Self { Self {
start: Some(Instant::now()), start: Some(Instant::now()),
accumulated: std::time::Duration::ZERO, accumulated: std::time::Duration::ZERO,
start_cp: None,
accumulated_cp: std::time::Duration::ZERO,
protocol, protocol,
cache_miss: false, cache_miss: false,
// by default we don't do pooling // by default we don't do pooling
@@ -199,11 +222,17 @@ impl LatencyTimer {
} }
} }
pub fn pause(&mut self) -> LatencyTimerPause<'_> { pub fn control_plane(&mut self) -> LatencyTimerControlPlane<'_> {
// start the stopwatch again
self.start = Some(Instant::now());
LatencyTimerControlPlane { timer: self }
}
pub fn wait_for_user(&mut self) -> LatencyTimerUserIO<'_> {
// stop the stopwatch and record the time that we have accumulated // stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started"); let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed(); self.accumulated += start.elapsed();
LatencyTimerPause { timer: self } LatencyTimerUserIO { timer: self }
} }
pub fn cache_miss(&mut self) { pub fn cache_miss(&mut self) {
@@ -219,13 +248,25 @@ impl LatencyTimer {
} }
} }
impl Drop for LatencyTimerPause<'_> { impl Drop for LatencyTimerUserIO<'_> {
fn drop(&mut self) { fn drop(&mut self) {
// start the stopwatch again // start the stopwatch again
self.timer.start = Some(Instant::now()); self.timer.start = Some(Instant::now());
} }
} }
impl Drop for LatencyTimerControlPlane<'_> {
fn drop(&mut self) {
// stop the control-plane stopwatch and record the time that we have accumulated
let start = self
.timer
.start_cp
.take()
.expect("latency timer should be started");
self.timer.accumulated_cp += start.elapsed();
}
}
impl Drop for LatencyTimer { impl Drop for LatencyTimer {
fn drop(&mut self) { fn drop(&mut self) {
let duration = let duration =
@@ -237,7 +278,21 @@ impl Drop for LatencyTimer {
bool_to_str(self.pool_miss), bool_to_str(self.pool_miss),
self.outcome, self.outcome,
]) ])
.observe(duration.as_secs_f64()) .observe(duration.as_secs_f64());
let duration_cp = self
.start_cp
.map(|start| start.elapsed())
.unwrap_or_default()
+ self.accumulated_cp;
CONTROL_PLANE_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration_cp.as_secs_f64());
} }
} }
@@ -695,9 +750,13 @@ where
info!("compute node's state has likely changed; requesting a wake-up"); info!("compute node's state has likely changed; requesting a wake-up");
let node_info = loop { let node_info = loop {
let wake_res = match creds { let wake_res = match creds {
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, auth::BackendType::Console(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
#[cfg(feature = "testing")] #[cfg(feature = "testing")]
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, auth::BackendType::Postgres(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
// nothing to do? // nothing to do?
auth::BackendType::Link(_) => return Err(err.into()), auth::BackendType::Link(_) => return Err(err.into()),
// test backend // test backend

View File

@@ -405,7 +405,7 @@ async fn connect_to_compute(
conn_info: &ConnInfo, conn_info: &ConnInfo,
conn_id: uuid::Uuid, conn_id: uuid::Uuid,
session_id: uuid::Uuid, session_id: uuid::Uuid,
latency_timer: LatencyTimer, mut latency_timer: LatencyTimer,
peer_addr: IpAddr, peer_addr: IpAddr,
) -> anyhow::Result<ClientInner> { ) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref(); let tls = config.tls_config.as_ref();
@@ -437,13 +437,13 @@ async fn connect_to_compute(
}; };
// TODO(anna): this is a bit hacky way, consider using console notification listener. // TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http { if !config.disable_ip_check_for_http {
let allowed_ips = backend.get_allowed_ips(&extra).await?; let allowed_ips = backend.get_allowed_ips(&extra, &mut latency_timer).await?;
if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) { if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed().into()); return Err(auth::AuthError::ip_address_not_allowed().into());
} }
} }
let node_info = backend let node_info = backend
.wake_compute(&extra) .wake_compute(&extra, &mut latency_timer)
.await? .await?
.context("missing cache entry from wake_compute")?; .context("missing cache entry from wake_compute")?;