From ff34746d21e71f7ec172afec16a3cdccd444d97e Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 8 Dec 2023 12:29:50 +0000 Subject: [PATCH] proxy: track control-plane durations per connection request --- proxy/src/auth/backend.rs | 14 +++--- proxy/src/auth/backend/classic.rs | 2 +- proxy/src/auth/backend/hacks.rs | 4 +- proxy/src/console/provider.rs | 5 ++- proxy/src/console/provider/mock.rs | 4 ++ proxy/src/console/provider/neon.rs | 13 +++++- proxy/src/proxy.rs | 69 +++++++++++++++++++++++++++--- proxy/src/serverless/conn_pool.rs | 6 +-- 8 files changed, 96 insertions(+), 21 deletions(-) diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 649b3f40f2..99e1a28cbc 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -189,7 +189,7 @@ async fn auth_quirks( let AuthInfo { secret, allowed_ips, - } = api.get_auth_info(extra, &info).await?; + } = api.get_auth_info(extra, &info, latency_timer).await?; // check allowed list if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) { @@ -255,7 +255,7 @@ async fn auth_and_wake_compute( let mut num_retries = 0; let mut node = loop { - let wake_res = api.wake_compute(extra, &compute_credentials.info).await; + let wake_res = api.wake_compute(extra, &compute_credentials.info, latency_timer).await; match handle_try_wake(wake_res, num_retries) { Err(e) => { error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); @@ -388,12 +388,13 @@ impl BackendType<'_, ComputeUserInfo> { pub async fn get_allowed_ips( &self, extra: &ConsoleReqExtra<'_>, + latency_timer: &mut LatencyTimer, ) -> Result>, GetAuthInfoError> { use BackendType::*; match self { - Console(api, creds) => api.get_allowed_ips(extra, creds).await, + Console(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await, #[cfg(feature = "testing")] - Postgres(api, creds) => api.get_allowed_ips(extra, creds).await, + Postgres(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await, Link(_) => Ok(Arc::new(vec![])), #[cfg(test)] Test(x) => x.get_allowed_ips(), @@ -405,13 +406,14 @@ impl BackendType<'_, ComputeUserInfo> { pub async fn wake_compute( &self, extra: &ConsoleReqExtra<'_>, + latency_timer: &mut LatencyTimer, ) -> Result, console::errors::WakeComputeError> { use BackendType::*; match self { - Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, + Console(api, creds) => api.wake_compute(extra, creds, latency_timer).map_ok(Some).await, #[cfg(feature = "testing")] - Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, + Postgres(api, creds) => api.wake_compute(extra, creds, latency_timer).map_ok(Some).await, Link(_) => Ok(None), #[cfg(test)] Test(x) => x.wake_compute().map(Some), diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index ce52daf16c..ebcb150bca 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -33,7 +33,7 @@ pub(super) async fn authenticate( config.scram_protocol_timeout, async { // pause the timer while we communicate with the client - let _paused = latency_timer.pause(); + let _paused = latency_timer.wait_for_user(); flow.begin(scram).await.map_err(|error| { warn!(?error, "error sending scram acknowledgement"); diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index abbd25008b..62de3a70bf 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -24,7 +24,7 @@ pub async fn authenticate_cleartext( warn!("cleartext auth flow override is enabled, proceeding"); // pause the timer while we communicate with the client - let _paused = latency_timer.pause(); + let _paused = latency_timer.wait_for_user(); let auth_outcome = AuthFlow::new(client) .begin(auth::CleartextPassword(secret)) @@ -54,7 +54,7 @@ pub async fn password_hack_no_authentication( warn!("project not specified, resorting to the password hack auth flow"); // pause the timer while we communicate with the client - let _paused = latency_timer.pause(); + let _paused = latency_timer.wait_for_user(); let payload = AuthFlow::new(client) .begin(auth::PasswordHack) diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index ccb5cbdb92..1f860f10e3 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -6,7 +6,7 @@ use super::messages::MetricsAuxInfo; use crate::{ auth::backend::ComputeUserInfo, cache::{timed_lru, TimedLru}, - compute, scram, + compute, scram, proxy::LatencyTimer, }; use async_trait::async_trait; use dashmap::DashMap; @@ -250,12 +250,14 @@ pub trait Api { &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result; async fn get_allowed_ips( &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result>, errors::GetAuthInfoError>; /// Wake up the compute node and return the corresponding connection info. @@ -263,6 +265,7 @@ pub trait Api { &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result; } diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index 8aad8c06bc..3b0691a2c9 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -7,6 +7,7 @@ use super::{ AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, }; use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl}; +use crate::proxy::LatencyTimer; use async_trait::async_trait; use futures::TryFutureExt; use thiserror::Error; @@ -146,6 +147,7 @@ impl super::Api for Api { &self, _extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + _latency_timer: &mut LatencyTimer, ) -> Result { self.do_get_auth_info(creds).await } @@ -154,6 +156,7 @@ impl super::Api for Api { &self, _extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + _latency_timer: &mut LatencyTimer, ) -> Result>, GetAuthInfoError> { Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips)) } @@ -163,6 +166,7 @@ impl super::Api for Api { &self, _extra: &ConsoleReqExtra<'_>, _creds: &ComputeUserInfo, + _latency_timer: &mut LatencyTimer, ) -> Result { self.do_wake_compute() .map_ok(CachedNodeInfo::new_uncached) diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index f0510e91ea..afec0038fb 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -5,7 +5,7 @@ use super::{ errors::{ApiError, GetAuthInfoError, WakeComputeError}, ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, }; -use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; +use crate::proxy::{LatencyTimer, ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; use crate::{auth::backend::ComputeUserInfo, compute, http, scram}; use async_trait::async_trait; use futures::TryFutureExt; @@ -158,7 +158,9 @@ impl super::Api for Api { &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result { + let _timer = latency_timer.control_plane(); self.do_get_auth_info(extra, creds).await } @@ -166,6 +168,7 @@ impl super::Api for Api { &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result>, GetAuthInfoError> { let key: &str = &creds.endpoint; if let Some(allowed_ips) = self.caches.allowed_ips.get(key) { @@ -177,7 +180,11 @@ impl super::Api for Api { ALLOWED_IPS_BY_CACHE_OUTCOME .with_label_values(&["miss"]) .inc(); + + let timer = latency_timer.control_plane(); let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips); + drop(timer); + self.caches .allowed_ips .insert(key.into(), allowed_ips.clone()); @@ -189,6 +196,7 @@ impl super::Api for Api { &self, extra: &ConsoleReqExtra<'_>, creds: &ComputeUserInfo, + latency_timer: &mut LatencyTimer, ) -> Result { let key: &str = &creds.inner.cache_key; @@ -214,7 +222,10 @@ impl super::Api for Api { } } + let timer = latency_timer.control_plane(); let node = self.do_wake_compute(extra, creds).await?; + drop(timer); + let (_, cached) = self.caches.node_info.insert(key.clone(), node); info!(key = &*key, "created a cache entry for compute node info"); diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 7cf3ed5b8a..0f994b1f71 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -110,6 +110,19 @@ static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { .unwrap() }); +static CONTROL_PLANE_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( + "proxy_compute_connection_control_plane_latency_seconds", + "Time proxy spent talking to control-plane/console while trying to establish a connection to the compute endpoint", + // http/ws/tcp, true/false, true/false, success/failure + // 3 * 2 * 2 * 2 = 24 counters + &["protocol", "cache_miss", "pool_miss", "outcome"], + // largest bucket = 2^16 * 0.5ms = 32s + exponential_buckets(0.0005, 2.0, 16).unwrap(), + ) + .unwrap() +}); + pub static CONSOLE_REQUEST_LATENCY: Lazy = Lazy::new(|| { register_histogram_vec!( "proxy_console_request_latency", @@ -174,6 +187,10 @@ pub struct LatencyTimer { start: Option, // accumulated time on the stopwatch accumulated: std::time::Duration, + // time since the stopwatch was started while talking to control-plane + start_cp: Option, + // accumulated time on the stopwatch while talking to control-plane + accumulated_cp: std::time::Duration, // label data protocol: &'static str, cache_miss: bool, @@ -181,7 +198,11 @@ pub struct LatencyTimer { outcome: &'static str, } -pub struct LatencyTimerPause<'a> { +pub struct LatencyTimerUserIO<'a> { + timer: &'a mut LatencyTimer, +} + +pub struct LatencyTimerControlPlane<'a> { timer: &'a mut LatencyTimer, } @@ -190,6 +211,8 @@ impl LatencyTimer { Self { start: Some(Instant::now()), accumulated: std::time::Duration::ZERO, + start_cp: None, + accumulated_cp: std::time::Duration::ZERO, protocol, cache_miss: false, // by default we don't do pooling @@ -199,11 +222,17 @@ impl LatencyTimer { } } - pub fn pause(&mut self) -> LatencyTimerPause<'_> { + pub fn control_plane(&mut self) -> LatencyTimerControlPlane<'_> { + // start the stopwatch again + self.start = Some(Instant::now()); + LatencyTimerControlPlane { timer: self } + } + + pub fn wait_for_user(&mut self) -> LatencyTimerUserIO<'_> { // stop the stopwatch and record the time that we have accumulated let start = self.start.take().expect("latency timer should be started"); self.accumulated += start.elapsed(); - LatencyTimerPause { timer: self } + LatencyTimerUserIO { timer: self } } pub fn cache_miss(&mut self) { @@ -219,13 +248,25 @@ impl LatencyTimer { } } -impl Drop for LatencyTimerPause<'_> { +impl Drop for LatencyTimerUserIO<'_> { fn drop(&mut self) { // start the stopwatch again self.timer.start = Some(Instant::now()); } } +impl Drop for LatencyTimerControlPlane<'_> { + fn drop(&mut self) { + // stop the control-plane stopwatch and record the time that we have accumulated + let start = self + .timer + .start_cp + .take() + .expect("latency timer should be started"); + self.timer.accumulated_cp += start.elapsed(); + } +} + impl Drop for LatencyTimer { fn drop(&mut self) { let duration = @@ -237,7 +278,21 @@ impl Drop for LatencyTimer { bool_to_str(self.pool_miss), self.outcome, ]) - .observe(duration.as_secs_f64()) + .observe(duration.as_secs_f64()); + + let duration_cp = self + .start_cp + .map(|start| start.elapsed()) + .unwrap_or_default() + + self.accumulated_cp; + CONTROL_PLANE_LATENCY + .with_label_values(&[ + self.protocol, + bool_to_str(self.cache_miss), + bool_to_str(self.pool_miss), + self.outcome, + ]) + .observe(duration_cp.as_secs_f64()); } } @@ -695,9 +750,9 @@ where info!("compute node's state has likely changed; requesting a wake-up"); let node_info = loop { let wake_res = match creds { - auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, + auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds, &mut latency_timer).await, #[cfg(feature = "testing")] - auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, + auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds, &mut latency_timer).await, // nothing to do? auth::BackendType::Link(_) => return Err(err.into()), // test backend diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index b9d1a9692d..7c9720648c 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -405,7 +405,7 @@ async fn connect_to_compute( conn_info: &ConnInfo, conn_id: uuid::Uuid, session_id: uuid::Uuid, - latency_timer: LatencyTimer, + mut latency_timer: LatencyTimer, peer_addr: IpAddr, ) -> anyhow::Result { let tls = config.tls_config.as_ref(); @@ -437,13 +437,13 @@ async fn connect_to_compute( }; // TODO(anna): this is a bit hacky way, consider using console notification listener. if !config.disable_ip_check_for_http { - let allowed_ips = backend.get_allowed_ips(&extra).await?; + let allowed_ips = backend.get_allowed_ips(&extra, &mut latency_timer).await?; if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) { return Err(auth::AuthError::ip_address_not_allowed().into()); } } let node_info = backend - .wake_compute(&extra) + .wake_compute(&extra, &mut latency_timer) .await? .context("missing cache entry from wake_compute")?;