diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 7db76f3d9e..415a4b7d85 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -102,8 +102,7 @@ pub(super) async fn authenticate( ctx.set_user(db_info.user.into()); ctx.set_project(db_info.aux.clone()); - let cold_start_info = db_info.aux.cold_start_info.clone().unwrap_or_default(); - info!(?cold_start_info, "woken up a compute node"); + info!("woken up a compute node"); // Backwards compatibility. pg_sni_proxy uses "--" in domain names // while direct connections do not. Once we migrate to pg_sni_proxy diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 385f7820cb..c28814b1c8 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -10,6 +10,7 @@ use itertools::Itertools; use proxy::config::TlsServerEndPoint; use proxy::context::RequestMonitoring; use proxy::proxy::run_until_cancelled; +use proxy::{BranchId, EndpointId, ProjectId}; use rustls::pki_types::PrivateKeyDer; use tokio::net::TcpListener; @@ -269,7 +270,12 @@ async fn handle_client( let client = tokio::net::TcpStream::connect(destination).await?; - let metrics_aux: MetricsAuxInfo = Default::default(); + let metrics_aux: MetricsAuxInfo = MetricsAuxInfo { + endpoint_id: (&EndpointId::from("")).into(), + project_id: (&ProjectId::from("")).into(), + branch_id: (&BranchId::from("")).into(), + cold_start_info: proxy::console::messages::ColdStartInfo::Unknown, + }; // doesn't yet matter as pg-sni-router doesn't report analytics logs ctx.set_success(); diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 5a3660520b..d8a1d261ce 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -16,7 +16,7 @@ use crate::{ config::ProjectInfoCacheOptions, console::AuthSecret, intern::{EndpointIdInt, ProjectIdInt, RoleNameInt}, - EndpointId, ProjectId, RoleName, + EndpointId, RoleName, }; use super::{Cache, Cached}; @@ -214,14 +214,11 @@ impl ProjectInfoCacheImpl { } pub fn insert_role_secret( &self, - project_id: &ProjectId, - endpoint_id: &EndpointId, - role_name: &RoleName, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, + role_name: RoleNameInt, secret: Option, ) { - let project_id = ProjectIdInt::from(project_id); - let endpoint_id = EndpointIdInt::from(endpoint_id); - let role_name = RoleNameInt::from(role_name); if self.cache.len() >= self.config.size { // If there are too many entries, wait until the next gc cycle. return; @@ -234,12 +231,10 @@ impl ProjectInfoCacheImpl { } pub fn insert_allowed_ips( &self, - project_id: &ProjectId, - endpoint_id: &EndpointId, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, allowed_ips: Arc>, ) { - let project_id = ProjectIdInt::from(project_id); - let endpoint_id = EndpointIdInt::from(endpoint_id); if self.cache.len() >= self.config.size { // If there are too many entries, wait until the next gc cycle. return; @@ -358,7 +353,7 @@ impl Cache for ProjectInfoCacheImpl { #[cfg(test)] mod tests { use super::*; - use crate::scram::ServerSecret; + use crate::{scram::ServerSecret, ProjectId}; #[tokio::test] async fn test_project_info_cache_settings() { @@ -369,8 +364,8 @@ mod tests { ttl: Duration::from_secs(1), gc_interval: Duration::from_secs(600), }); - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -379,9 +374,23 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap(); assert!(cached.cached()); @@ -393,7 +402,12 @@ mod tests { // Shouldn't add more than 2 roles. let user3: RoleName = "user3".into(); let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32]))); - cache.insert_role_secret(&project_id, &endpoint_id, &user3, secret3.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user3).into(), + secret3.clone(), + ); assert!(cache.get_role_secret(&endpoint_id, &user3).is_none()); let cached = cache.get_allowed_ips(&endpoint_id).unwrap(); @@ -421,8 +435,8 @@ mod tests { cache.clone().disable_ttl(); tokio::time::advance(Duration::from_secs(2)).await; - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -431,9 +445,23 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); tokio::time::advance(Duration::from_secs(2)).await; // Nothing should be invalidated. @@ -470,8 +498,8 @@ mod tests { gc_interval: Duration::from_secs(600), })); - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -480,10 +508,20 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); cache.clone().disable_ttl(); tokio::time::advance(Duration::from_millis(100)).await; - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); // Added before ttl was disabled + ttl should be still cached. let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap(); @@ -497,7 +535,11 @@ mod tests { assert!(cache.get_role_secret(&endpoint_id, &user2).is_none()); // Added after ttl was disabled + ttl should not be cached. - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); let cached = cache.get_allowed_ips(&endpoint_id).unwrap(); assert!(!cached.cached()); diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 65153babcb..ee33b97fbd 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -276,6 +276,7 @@ impl ConnCfg { let stream = connection.stream.into_inner(); info!( + cold_start_info = ctx.cold_start_info.as_str(), "connected to compute node at {host} ({socket_addr}) sslmode={:?}", self.0.get_ssl_mode() ); diff --git a/proxy/src/console/messages.rs b/proxy/src/console/messages.rs index 102076f2c6..45161f5ac8 100644 --- a/proxy/src/console/messages.rs +++ b/proxy/src/console/messages.rs @@ -3,7 +3,7 @@ use std::fmt; use crate::auth::IpPattern; -use crate::{BranchId, EndpointId, ProjectId}; +use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; /// Generic error response with human-readable description. /// Note that we can't always present it to user as is. @@ -18,7 +18,7 @@ pub struct ConsoleError { pub struct GetRoleSecret { pub role_secret: Box, pub allowed_ips: Option>, - pub project_id: Option, + pub project_id: Option, } // Manually implement debug to omit sensitive info. @@ -93,22 +93,47 @@ impl fmt::Debug for DatabaseInfo { /// Various labels for prometheus metrics. /// Also known as `ProxyMetricsAuxInfo` in the console. -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone)] pub struct MetricsAuxInfo { - pub endpoint_id: EndpointId, - pub project_id: ProjectId, - pub branch_id: BranchId, - pub cold_start_info: Option, + pub endpoint_id: EndpointIdInt, + pub project_id: ProjectIdInt, + pub branch_id: BranchIdInt, + #[serde(default)] + pub cold_start_info: ColdStartInfo, } -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum ColdStartInfo { #[default] - Unknown = 0, - Warm = 1, - PoolHit = 2, - PoolMiss = 3, + Unknown, + /// Compute was already running + Warm, + #[serde(rename = "pool_hit")] + /// Compute was not running but there was an available VM + VmPoolHit, + #[serde(rename = "pool_miss")] + /// Compute was not running and there were no VMs available + VmPoolMiss, + + // not provided by control plane + /// Connection available from HTTP pool + HttpPoolHit, + /// Cached connection info + WarmCached, +} + +impl ColdStartInfo { + pub fn as_str(&self) -> &'static str { + match self { + ColdStartInfo::Unknown => "unknown", + ColdStartInfo::Warm => "warm", + ColdStartInfo::VmPoolHit => "pool_hit", + ColdStartInfo::VmPoolMiss => "pool_miss", + ColdStartInfo::HttpPoolHit => "http_pool_hit", + ColdStartInfo::WarmCached => "warm_cached", + } + } } #[cfg(test)] diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 69bfd6b045..f7d621fb12 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -12,7 +12,8 @@ use crate::{ compute, config::{CacheOptions, ProjectInfoCacheOptions}, context::RequestMonitoring, - scram, EndpointCacheKey, ProjectId, + intern::ProjectIdInt, + scram, EndpointCacheKey, }; use dashmap::DashMap; use std::{sync::Arc, time::Duration}; @@ -271,7 +272,7 @@ pub struct AuthInfo { /// List of IP addresses allowed for the autorization. pub allowed_ips: Vec, /// Project ID. This is used for cache invalidation. - pub project_id: Option, + pub project_id: Option, } /// Info for establishing a connection to a compute node. diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index b759c81373..cfe491f2aa 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -4,10 +4,16 @@ use super::{ errors::{ApiError, GetAuthInfoError, WakeComputeError}, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo, }; -use crate::console::provider::{CachedAllowedIps, CachedRoleSecret}; use crate::context::RequestMonitoring; use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl}; use crate::{auth::IpPattern, cache::Cached}; +use crate::{ + console::{ + messages::MetricsAuxInfo, + provider::{CachedAllowedIps, CachedRoleSecret}, + }, + BranchId, EndpointId, ProjectId, +}; use futures::TryFutureExt; use std::{str::FromStr, sync::Arc}; use thiserror::Error; @@ -114,7 +120,12 @@ impl Api { let node = NodeInfo { config, - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, allow_self_signed_compute: false, }; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 289b0c08f7..1a3e2ca795 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -181,15 +181,16 @@ impl super::Api for Api { } let auth_info = self.do_get_auth_info(ctx, user_info).await?; if let Some(project_id) = auth_info.project_id { + let ep_int = ep.into(); self.caches.project_info.insert_role_secret( - &project_id, - ep, - user, + project_id, + ep_int, + user.into(), auth_info.secret.clone(), ); self.caches.project_info.insert_allowed_ips( - &project_id, - ep, + project_id, + ep_int, Arc::new(auth_info.allowed_ips), ); ctx.set_project_id(project_id); @@ -217,15 +218,16 @@ impl super::Api for Api { let allowed_ips = Arc::new(auth_info.allowed_ips); let user = &user_info.user; if let Some(project_id) = auth_info.project_id { + let ep_int = ep.into(); self.caches.project_info.insert_role_secret( - &project_id, - ep, - user, + project_id, + ep_int, + user.into(), auth_info.secret.clone(), ); self.caches .project_info - .insert_allowed_ips(&project_id, ep, allowed_ips.clone()); + .insert_allowed_ips(project_id, ep_int, allowed_ips.clone()); ctx.set_project_id(project_id); } Ok(( @@ -248,8 +250,7 @@ impl super::Api for Api { // which means that we might cache it to reduce the load and latency. if let Some(cached) = self.caches.node_info.get(&key) { info!(key = &*key, "found cached compute node info"); - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); + ctx.set_project(cached.aux.clone()); return Ok(cached); } @@ -260,17 +261,21 @@ impl super::Api for Api { if permit.should_check_cache() { if let Some(cached) = self.caches.node_info.get(&key) { info!(key = &*key, "found cached compute node info"); - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); + ctx.set_project(cached.aux.clone()); return Ok(cached); } } - let node = self.do_wake_compute(ctx, user_info).await?; + let mut node = self.do_wake_compute(ctx, user_info).await?; ctx.set_project(node.aux.clone()); - let cold_start_info = node.aux.cold_start_info.clone().unwrap_or_default(); - info!(?cold_start_info, "woken up a compute node"); - let (_, cached) = self.caches.node_info.insert(key.clone(), node); + let cold_start_info = node.aux.cold_start_info; + info!("woken up a compute node"); + + // store the cached node as 'warm' + node.aux.cold_start_info = ColdStartInfo::WarmCached; + let (_, mut cached) = self.caches.node_info.insert(key.clone(), node); + cached.aux.cold_start_info = cold_start_info; + info!(key = &*key, "created a cache entry for compute node info"); Ok(cached) diff --git a/proxy/src/context.rs b/proxy/src/context.rs index 7ca830cdb4..fec95f4722 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -11,8 +11,9 @@ use uuid::Uuid; use crate::{ console::messages::{ColdStartInfo, MetricsAuxInfo}, error::ErrorKind, + intern::{BranchIdInt, ProjectIdInt}, metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND}, - BranchId, DbName, EndpointId, ProjectId, RoleName, + DbName, EndpointId, RoleName, }; use self::parquet::RequestData; @@ -34,8 +35,8 @@ pub struct RequestMonitoring { pub span: Span, // filled in as they are discovered - project: Option, - branch: Option, + project: Option, + branch: Option, endpoint_id: Option, dbname: Option, user: Option, @@ -43,7 +44,7 @@ pub struct RequestMonitoring { error_kind: Option, pub(crate) auth_method: Option, success: bool, - cold_start_info: Option, + pub(crate) cold_start_info: ColdStartInfo, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. @@ -92,7 +93,7 @@ impl RequestMonitoring { error_kind: None, auth_method: None, success: false, - cold_start_info: None, + cold_start_info: ColdStartInfo::Unknown, sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()), latency_timer: LatencyTimer::new(protocol), @@ -113,26 +114,31 @@ impl RequestMonitoring { } pub fn set_cold_start_info(&mut self, info: ColdStartInfo) { - self.cold_start_info = Some(info); + self.cold_start_info = info; + self.latency_timer.cold_start_info(info); } pub fn set_project(&mut self, x: MetricsAuxInfo) { - self.set_endpoint_id(x.endpoint_id); + if self.endpoint_id.is_none() { + self.set_endpoint_id(x.endpoint_id.as_str().into()) + } self.branch = Some(x.branch_id); self.project = Some(x.project_id); - self.cold_start_info = x.cold_start_info; + self.set_cold_start_info(x.cold_start_info); } - pub fn set_project_id(&mut self, project_id: ProjectId) { + pub fn set_project_id(&mut self, project_id: ProjectIdInt) { self.project = Some(project_id); } pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) { - self.span.record("ep", display(&endpoint_id)); - crate::metrics::CONNECTING_ENDPOINTS - .with_label_values(&[self.protocol]) - .measure(&endpoint_id); - self.endpoint_id = Some(endpoint_id); + if self.endpoint_id.is_none() { + self.span.record("ep", display(&endpoint_id)); + crate::metrics::CONNECTING_ENDPOINTS + .with_label_values(&[self.protocol]) + .measure(&endpoint_id); + self.endpoint_id = Some(endpoint_id); + } } pub fn set_application(&mut self, app: Option) { diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 04e5695255..eb77409429 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -87,7 +87,7 @@ pub struct RequestData { /// Or if we make it to proxy_pass success: bool, /// Indicates if the cplane started the new compute node for this request. - cold_start_info: Option<&'static str>, + cold_start_info: &'static str, /// Tracks time from session start (HTTP request/libpq TCP handshake) /// Through to success/failure duration_us: u64, @@ -115,12 +115,7 @@ impl From<&RequestMonitoring> for RequestData { region: value.region, error: value.error_kind.as_ref().map(|e| e.to_metric_label()), success: value.success, - cold_start_info: value.cold_start_info.as_ref().map(|x| match x { - crate::console::messages::ColdStartInfo::Unknown => "unknown", - crate::console::messages::ColdStartInfo::Warm => "warm", - crate::console::messages::ColdStartInfo::PoolHit => "pool_hit", - crate::console::messages::ColdStartInfo::PoolMiss => "pool_miss", - }), + cold_start_info: value.cold_start_info.as_str(), duration_us: SystemTime::from(value.first_packet) .elapsed() .unwrap_or_default() @@ -454,7 +449,7 @@ mod tests { region: "us-east-1", error: None, success: rng.gen(), - cold_start_info: Some("no"), + cold_start_info: "no", duration_us: rng.gen_range(0..30_000_000), } } @@ -524,15 +519,15 @@ mod tests { assert_eq!( file_stats, [ - (1314406, 3, 6000), - (1314399, 3, 6000), - (1314459, 3, 6000), - (1314416, 3, 6000), - (1314546, 3, 6000), - (1314388, 3, 6000), - (1314180, 3, 6000), - (1314416, 3, 6000), - (438359, 1, 2000) + (1314385, 3, 6000), + (1314378, 3, 6000), + (1314438, 3, 6000), + (1314395, 3, 6000), + (1314525, 3, 6000), + (1314367, 3, 6000), + (1314159, 3, 6000), + (1314395, 3, 6000), + (438352, 1, 2000) ] ); @@ -562,11 +557,11 @@ mod tests { assert_eq!( file_stats, [ - (1220668, 5, 10000), - (1226818, 5, 10000), - (1228612, 5, 10000), - (1227974, 5, 10000), - (1219252, 5, 10000) + (1220633, 5, 10000), + (1226783, 5, 10000), + (1228577, 5, 10000), + (1227939, 5, 10000), + (1219217, 5, 10000) ] ); @@ -598,11 +593,11 @@ mod tests { assert_eq!( file_stats, [ - (1206315, 5, 10000), - (1206046, 5, 10000), - (1206339, 5, 10000), - (1206327, 5, 10000), - (1206582, 5, 10000) + (1206280, 5, 10000), + (1206011, 5, 10000), + (1206304, 5, 10000), + (1206292, 5, 10000), + (1206547, 5, 10000) ] ); @@ -627,15 +622,15 @@ mod tests { assert_eq!( file_stats, [ - (1314406, 3, 6000), - (1314399, 3, 6000), - (1314459, 3, 6000), - (1314416, 3, 6000), - (1314546, 3, 6000), - (1314388, 3, 6000), - (1314180, 3, 6000), - (1314416, 3, 6000), - (438359, 1, 2000) + (1314385, 3, 6000), + (1314378, 3, 6000), + (1314438, 3, 6000), + (1314395, 3, 6000), + (1314525, 3, 6000), + (1314367, 3, 6000), + (1314159, 3, 6000), + (1314395, 3, 6000), + (438352, 1, 2000) ] ); @@ -672,7 +667,7 @@ mod tests { // files are smaller than the size threshold, but they took too long to fill so were flushed early assert_eq!( file_stats, - [(658837, 2, 3001), (658551, 2, 3000), (658347, 2, 2999)] + [(658823, 2, 3001), (658537, 2, 3000), (658333, 2, 2999)] ); tmpdir.close().unwrap(); diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 9da1fdc02f..59ee899c08 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -12,6 +12,8 @@ use metrics::{ use once_cell::sync::Lazy; use tokio::time::{self, Instant}; +use crate::console::messages::ColdStartInfo; + pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { register_int_counter_pair_vec!( "proxy_opened_db_connections_total", @@ -50,8 +52,8 @@ pub static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { "proxy_compute_connection_latency_seconds", "Time it took for proxy to establish a connection to the compute endpoint", // http/ws/tcp, true/false, true/false, success/failure, client/client_and_cplane - // 3 * 2 * 2 * 2 * 2 = 48 counters - &["protocol", "cache_miss", "pool_miss", "outcome", "excluded"], + // 3 * 6 * 2 * 2 = 72 counters + &["protocol", "cold_start_info", "outcome", "excluded"], // largest bucket = 2^16 * 0.5ms = 32s exponential_buckets(0.0005, 2.0, 16).unwrap(), ) @@ -183,6 +185,20 @@ struct Accumulated { compute: time::Duration, } +enum Outcome { + Success, + Failed, +} + +impl Outcome { + fn as_str(&self) -> &'static str { + match self { + Outcome::Success => "success", + Outcome::Failed => "failed", + } + } +} + pub struct LatencyTimer { // time since the stopwatch was started start: time::Instant, @@ -192,9 +208,8 @@ pub struct LatencyTimer { accumulated: Accumulated, // label data protocol: &'static str, - cache_miss: bool, - pool_miss: bool, - outcome: &'static str, + cold_start_info: ColdStartInfo, + outcome: Outcome, } pub struct LatencyTimerPause<'a> { @@ -210,11 +225,9 @@ impl LatencyTimer { stop: None, accumulated: Accumulated::default(), protocol, - cache_miss: false, - // by default we don't do pooling - pool_miss: true, + cold_start_info: ColdStartInfo::Unknown, // assume failed unless otherwise specified - outcome: "failed", + outcome: Outcome::Failed, } } @@ -226,12 +239,8 @@ impl LatencyTimer { } } - pub fn cache_miss(&mut self) { - self.cache_miss = true; - } - - pub fn pool_hit(&mut self) { - self.pool_miss = false; + pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) { + self.cold_start_info = cold_start_info; } pub fn success(&mut self) { @@ -239,7 +248,7 @@ impl LatencyTimer { self.stop = Some(time::Instant::now()); // success - self.outcome = "success"; + self.outcome = Outcome::Success; } } @@ -264,9 +273,8 @@ impl Drop for LatencyTimer { COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, + self.cold_start_info.as_str(), + self.outcome.as_str(), "client", ]) .observe((duration.saturating_sub(self.accumulated.client)).as_secs_f64()); @@ -275,9 +283,8 @@ impl Drop for LatencyTimer { COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, + self.cold_start_info.as_str(), + self.outcome.as_str(), "client_and_cplane", ]) .observe((duration.saturating_sub(accumulated_total)).as_secs_f64()); diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index c76e2ff6d9..4c0d68ce0b 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -87,7 +87,6 @@ impl ConnectMechanism for TcpMechanism<'_> { } /// Try to connect to the compute node, retrying if necessary. -/// This function might update `node_info`, so we take it by `&mut`. #[tracing::instrument(skip_all)] pub async fn connect_to_compute( ctx: &mut RequestMonitoring, @@ -132,7 +131,6 @@ where } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node info!("compute node's state has likely changed; requesting a wake-up"); - ctx.latency_timer.cache_miss(); let old_node_info = invalidate_cache(node_info); let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?; node_info.reuse_settings(old_node_info); diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index cf53c6e673..c81a1a8292 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -19,8 +19,8 @@ pub async fn proxy_pass( aux: MetricsAuxInfo, ) -> anyhow::Result<()> { let usage = USAGE_METRICS.register(Ids { - endpoint_id: aux.endpoint_id.clone(), - branch_id: aux.branch_id.clone(), + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }); let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]); diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index a4051447c1..71d85e106d 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -12,11 +12,12 @@ use crate::auth::backend::{ }; use crate::config::CertResolver; use crate::console::caches::NodeInfoCache; +use crate::console::messages::MetricsAuxInfo; use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend}; use crate::console::{self, CachedNodeInfo, NodeInfo}; use crate::error::ErrorKind; use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; -use crate::{http, sasl, scram}; +use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId}; use anyhow::{bail, Context}; use async_trait::async_trait; use rstest::rstest; @@ -512,7 +513,12 @@ impl TestBackend for TestConnectMechanism { fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { let node = NodeInfo { config: compute::ConnCfg::new(), - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, allow_self_signed_compute: false, }; let (_, node) = cache.insert("key".into(), node); diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index f10779d7ba..8aa5ad4e8a 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -9,7 +9,6 @@ use crate::{ config::ProxyConfig, console::{ errors::{GetAuthInfoError, WakeComputeError}, - messages::ColdStartInfo, CachedNodeInfo, }, context::RequestMonitoring, @@ -57,7 +56,10 @@ impl PoolingBackend { let auth_outcome = crate::auth::validate_password_and_exchange(&conn_info.password, secret).await?; let res = match auth_outcome { - crate::sasl::Outcome::Success(key) => Ok(key), + crate::sasl::Outcome::Success(key) => { + info!("user successfully authenticated"); + Ok(key) + } crate::sasl::Outcome::Failure(reason) => { info!("auth backend failed with an error: {reason}"); Err(AuthError::auth_failed(&*conn_info.user_info.user)) @@ -89,8 +91,6 @@ impl PoolingBackend { }; if let Some(client) = maybe_client { - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); return Ok(client); } let conn_id = uuid::Uuid::new_v4(); diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index c7e8eaef76..35311facb8 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -17,7 +17,7 @@ use tokio::time::Instant; use tokio_postgres::tls::NoTlsStream; use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket}; -use crate::console::messages::MetricsAuxInfo; +use crate::console::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL}; use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; use crate::{ @@ -383,9 +383,12 @@ impl GlobalConnPool { "pid", &tracing::field::display(client.inner.get_process_id()), ); - info!("pool: reusing connection '{conn_info}'"); + info!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); client.session.send(ctx.session_id)?; - ctx.latency_timer.pool_hit(); + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.latency_timer.success(); return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); } @@ -454,8 +457,9 @@ pub fn poll_client( let (tx, mut rx) = tokio::sync::watch::channel(session_id); let span = info_span!(parent: None, "connection", %conn_id); + let cold_start_info = ctx.cold_start_info; span.in_scope(|| { - info!(%conn_info, %session_id, "new connection"); + info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); }); let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)), @@ -565,8 +569,8 @@ impl Client { pub fn metrics(&self) -> Arc { let aux = &self.inner.as_ref().unwrap().aux; USAGE_METRICS.register(Ids { - endpoint_id: aux.endpoint_id.clone(), - branch_id: aux.branch_id.clone(), + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }) } } @@ -666,6 +670,8 @@ impl Drop for Client { mod tests { use std::{mem, sync::atomic::AtomicBool}; + use crate::{BranchId, EndpointId, ProjectId}; + use super::*; struct MockClient(Arc); @@ -691,7 +697,12 @@ mod tests { ClientInner { inner: client, session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()), - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, conn_id: uuid::Uuid::new_v4(), } } diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index b21056735d..5ffbf95c07 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -3,7 +3,8 @@ use crate::{ config::{MetricBackupCollectionConfig, MetricCollectionConfig}, context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, - http, BranchId, EndpointId, + http, + intern::{BranchIdInt, EndpointIdInt}, }; use anyhow::Context; use async_compression::tokio::write::GzipEncoder; @@ -43,8 +44,8 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); /// because we enrich the event with project_id in the control-plane endpoint. #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)] pub struct Ids { - pub endpoint_id: EndpointId, - pub branch_id: BranchId, + pub endpoint_id: EndpointIdInt, + pub branch_id: BranchIdInt, } pub trait MetricCounterRecorder { @@ -494,7 +495,7 @@ mod tests { use url::Url; use super::*; - use crate::{http, rate_limiter::RateLimiterConfig}; + use crate::{http, rate_limiter::RateLimiterConfig, BranchId, EndpointId}; #[tokio::test] async fn metrics() { @@ -536,8 +537,8 @@ mod tests { // register a new counter let counter = metrics.register(Ids { - endpoint_id: "e1".into(), - branch_id: "b1".into(), + endpoint_id: (&EndpointId::from("e1")).into(), + branch_id: (&BranchId::from("b1")).into(), }); // the counter should be observed despite 0 egress