From e4872bec57b5e5e779aae559f48ac01f97440907 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 26 Jul 2025 09:07:32 +0100 Subject: [PATCH] add metrics for caches --- proxy/src/binary/proxy.rs | 7 +--- proxy/src/cache/common.rs | 36 +++++++++++++++++ proxy/src/cache/node_info.rs | 19 +++++++-- proxy/src/cache/project_info.rs | 39 ++++++++++++++++-- proxy/src/metrics.rs | 70 ++++++++++++++++++++++++++++----- proxy/src/serverless/rest.rs | 27 +++++++++++-- 6 files changed, 173 insertions(+), 25 deletions(-) diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index e912bebd67..0ea5a89945 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -535,12 +535,7 @@ pub async fn run() -> anyhow::Result<()> { // add a task to flush the db_schema cache every 10 minutes #[cfg(feature = "rest_broker")] if let Some(db_schema_cache) = &config.rest_config.db_schema_cache { - maintenance_tasks.spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(600)).await; - db_schema_cache.0.run_pending_tasks(); - } - }); + maintenance_tasks.spawn(db_schema_cache.maintain()); } if let Some(metrics_config) = &config.metric_collection { diff --git a/proxy/src/cache/common.rs b/proxy/src/cache/common.rs index bced5be972..9a7d0d99cf 100644 --- a/proxy/src/cache/common.rs +++ b/proxy/src/cache/common.rs @@ -2,8 +2,12 @@ use std::ops::{Deref, DerefMut}; use std::time::{Duration, Instant}; use moka::Expiry; +use moka::notification::RemovalCause; use crate::control_plane::messages::ControlPlaneErrorMessage; +use crate::metrics::{ + CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics, +}; /// Default TTL used when caching errors from control plane. pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30); @@ -130,3 +134,35 @@ impl Expiry> for CplaneExpiry { self.expire_early(value, updated_at) } } + +pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) { + let cause = match cause { + RemovalCause::Expired => CacheRemovalCause::Expired, + RemovalCause::Explicit => CacheRemovalCause::Explicit, + RemovalCause::Replaced => CacheRemovalCause::Replaced, + RemovalCause::Size => CacheRemovalCause::Size, + }; + Metrics::get() + .cache + .evicted_total + .inc(CacheEviction { cache: kind, cause }); +} + +#[inline] +pub fn count_cache_outcome(kind: CacheKind, cache_result: Option) -> Option { + let outcome = if cache_result.is_some() { + CacheOutcome::Hit + } else { + CacheOutcome::Miss + }; + Metrics::get().cache.request_total.inc(CacheOutcomeGroup { + cache: kind, + outcome, + }); + cache_result +} + +#[inline] +pub fn count_cache_insert(kind: CacheKind) { + Metrics::get().cache.inserted_total.inc(kind); +} diff --git a/proxy/src/cache/node_info.rs b/proxy/src/cache/node_info.rs index 22f4d9dc04..f046eba8ce 100644 --- a/proxy/src/cache/node_info.rs +++ b/proxy/src/cache/node_info.rs @@ -1,7 +1,8 @@ -use crate::cache::common::Cache; +use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener}; use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry}; use crate::config::CacheOptions; use crate::control_plane::NodeInfo; +use crate::metrics::{CacheKind, Metrics}; use crate::types::EndpointCacheKey; pub(crate) struct NodeInfoCache(moka::sync::Cache>); @@ -22,15 +23,27 @@ impl NodeInfoCache { .name("node_info_cache") .expire_after(CplaneExpiry::default()); let builder = config.moka(builder); + + if let Some(size) = config.size { + Metrics::get() + .cache + .capacity + .set(CacheKind::NodeInfo, size as i64); + } + + let builder = builder + .eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause)); + Self(builder.build()) } pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult) { + count_cache_insert(CacheKind::NodeInfo); self.0.insert(key, value); } - pub fn get(&'static self, key: &EndpointCacheKey) -> Option> { - self.0.get(key) + pub fn get(&self, key: &EndpointCacheKey) -> Option> { + count_cache_outcome(CacheKind::NodeInfo, self.0.get(key)) } pub fn get_entry( diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index a6af9c158b..b7347d8e57 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -5,11 +5,14 @@ use clashmap::ClashMap; use moka::sync::Cache; use tracing::{debug, info}; -use crate::cache::common::{ControlPlaneResult, CplaneExpiry}; +use crate::cache::common::{ + ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener, +}; use crate::config::ProjectInfoCacheOptions; use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason}; use crate::control_plane::{EndpointAccessControl, RoleAccessControl}; use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt}; +use crate::metrics::{CacheKind, Metrics}; use crate::types::{EndpointId, RoleName}; /// Cache for project info. @@ -82,17 +85,32 @@ impl ProjectInfoCache { impl ProjectInfoCache { pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self { + Metrics::get().cache.capacity.set( + CacheKind::ProjectInfoRoles, + (config.size * config.max_roles) as i64, + ); + Metrics::get() + .cache + .capacity + .set(CacheKind::ProjectInfoEndpoints, config.size as i64); + // we cache errors for 30 seconds, unless retry_at is set. let expiry = CplaneExpiry::default(); Self { role_controls: Cache::builder() .name("role_access_controls") + .eviction_listener(|_k, _v, cause| { + eviction_listener(CacheKind::ProjectInfoRoles, cause); + }) .max_capacity(config.size * config.max_roles) .time_to_live(config.ttl) .expire_after(expiry) .build(), ep_controls: Cache::builder() .name("endpoint_access_controls") + .eviction_listener(|_k, _v, cause| { + eviction_listener(CacheKind::ProjectInfoEndpoints, cause); + }) .max_capacity(config.size) .time_to_live(config.ttl) .expire_after(expiry) @@ -111,7 +129,10 @@ impl ProjectInfoCache { let endpoint_id = EndpointIdInt::get(endpoint_id)?; let role_name = RoleNameInt::get(role_name)?; - self.role_controls.get(&(endpoint_id, role_name)) + count_cache_outcome( + CacheKind::ProjectInfoRoles, + self.role_controls.get(&(endpoint_id, role_name)), + ) } pub(crate) fn get_endpoint_access( @@ -120,7 +141,10 @@ impl ProjectInfoCache { ) -> Option> { let endpoint_id = EndpointIdInt::get(endpoint_id)?; - self.ep_controls.get(&endpoint_id) + count_cache_outcome( + CacheKind::ProjectInfoEndpoints, + self.ep_controls.get(&endpoint_id), + ) } pub(crate) fn insert_endpoint_access( @@ -144,6 +168,9 @@ impl ProjectInfoCache { "created a cache entry for endpoint access" ); + count_cache_insert(CacheKind::ProjectInfoEndpoints); + count_cache_insert(CacheKind::ProjectInfoRoles); + self.ep_controls.insert(endpoint_id, Ok(controls)); self.role_controls .insert((endpoint_id, role_name), Ok(role_controls)); @@ -172,10 +199,14 @@ impl ProjectInfoCache { // leave the entry alone if it's already Ok Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop, // replace the entry - _ => moka::ops::compute::Op::Put(Err(msg.clone())), + _ => { + count_cache_insert(CacheKind::ProjectInfoEndpoints); + moka::ops::compute::Op::Put(Err(msg.clone())) + } }); } + count_cache_insert(CacheKind::ProjectInfoRoles); self.role_controls .insert((endpoint_id, role_name), Err(msg)); } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 869bce32f2..0726a145df 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -8,8 +8,8 @@ use measured::label::{ use measured::metric::histogram::Thresholds; use measured::metric::name::MetricName; use measured::{ - Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup, - MetricGroup, + Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec, + LabelGroup, MetricGroup, }; use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric}; use tokio::time::{self, Instant}; @@ -29,6 +29,9 @@ pub struct Metrics { #[metric(namespace = "service")] pub service: ServiceMetrics, + + #[metric(namespace = "cache")] + pub cache: CacheMetrics, } static SELF: OnceLock = OnceLock::new(); @@ -219,13 +222,6 @@ pub enum Bool { False, } -#[derive(FixedCardinalityLabel, Copy, Clone)] -#[label(singleton = "outcome")] -pub enum CacheOutcome { - Hit, - Miss, -} - #[derive(LabelGroup)] #[label(set = ConsoleRequestSet)] pub struct ConsoleRequest<'a> { @@ -704,3 +700,59 @@ pub enum ServiceState { Running, Terminating, } + +#[derive(MetricGroup)] +#[metric(new())] +pub struct CacheMetrics { + /// The capacity of the cache + pub capacity: GaugeVec>, + /// The total number of entries inserted into the cache + pub inserted_total: CounterVec>, + /// The total number of entries removed from the cache + pub evicted_total: CounterVec, + /// The total number of cache requests + pub request_total: CounterVec, +} + +impl Default for CacheMetrics { + fn default() -> Self { + Self::new() + } +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug)] +#[label(singleton = "cache")] +pub enum CacheKind { + NodeInfo, + ProjectInfoEndpoints, + ProjectInfoRoles, + Schema, +} + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug)] +pub enum CacheRemovalCause { + Expired, + Explicit, + Replaced, + Size, +} + +#[derive(LabelGroup)] +#[label(set = CacheEvictionSet)] +pub struct CacheEviction { + pub cache: CacheKind, + pub cause: CacheRemovalCause, +} + +#[derive(FixedCardinalityLabel, Copy, Clone)] +pub enum CacheOutcome { + Hit, + Miss, +} + +#[derive(LabelGroup)] +#[label(set = CacheOutcomeSet)] +pub struct CacheOutcomeGroup { + pub cache: CacheKind, + pub outcome: CacheOutcome, +} diff --git a/proxy/src/serverless/rest.rs b/proxy/src/serverless/rest.rs index 632b308e5d..7837ad7088 100644 --- a/proxy/src/serverless/rest.rs +++ b/proxy/src/serverless/rest.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::convert::Infallible; use std::sync::Arc; use bytes::Bytes; @@ -54,11 +55,12 @@ use super::http_util::{ }; use super::json::JsonConversionError; use crate::auth::backend::ComputeCredentialKeys; +use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener}; use crate::config::ProxyConfig; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::http::read_body_with_limit; -use crate::metrics::Metrics; +use crate::metrics::{CacheKind, Metrics}; use crate::serverless::sql_over_http::HEADER_VALUE_TRUE; use crate::types::EndpointCacheKey; use crate::util::deserialize_json_string; @@ -138,15 +140,31 @@ pub struct ApiConfig { } // The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint -pub(crate) struct DbSchemaCache(pub Cache>); +pub(crate) struct DbSchemaCache(Cache>); impl DbSchemaCache { pub fn new(config: crate::config::CacheOptions) -> Self { let builder = Cache::builder().name("db_schema_cache"); let builder = config.moka(builder); + let metrics = &Metrics::get().cache; + if let Some(size) = config.size { + metrics.capacity.set(CacheKind::Schema, size as i64); + } + + let builder = + builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause)); + Self(builder.build()) } + pub async fn maintain(&self) -> Result { + let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60)); + loop { + ticker.tick().await; + self.0.run_pending_tasks(); + } + } + pub async fn get_cached_or_remote( &self, endpoint_id: &EndpointCacheKey, @@ -156,7 +174,8 @@ impl DbSchemaCache { ctx: &RequestContext, config: &'static ProxyConfig, ) -> Result, RestError> { - match self.0.get(endpoint_id) { + let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id)); + match cache_result { Some(v) => Ok(v), None => { info!("db_schema cache miss for endpoint: {:?}", endpoint_id); @@ -180,6 +199,7 @@ impl DbSchemaCache { db_extra_search_path: None, }; let value = Arc::new((api_config, schema_owned)); + count_cache_insert(CacheKind::Schema); self.0.insert(endpoint_id.clone(), value); return Err(e); } @@ -188,6 +208,7 @@ impl DbSchemaCache { } }; let value = Arc::new((api_config, schema_owned)); + count_cache_insert(CacheKind::Schema); self.0.insert(endpoint_id.clone(), value.clone()); Ok(value) }