mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
[proxy] add metrics for caches (#12752)
Exposes metrics for caches. LKB-2594
This exposes a high level namespace, `cache`, that all cache metrics can
be added to - this makes it easier to make library panels for the caches
as I understand it.
To calculate the current cache fill ratio, you could use the following
query:
```
(
cache_inserted_total{cache="node_info"}
- sum (cache_evicted_total{cache="node_info"}) without (cause)
)
/ cache_capacity{cache="node_info"}
```
To calculate the cache hit ratio, you could use the following query:
```
cache_request_total{cache="node_info", outcome="hit"}
/ sum (cache_request_total{cache="node_info"}) without (outcome)
```
This commit is contained in:
@@ -535,12 +535,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
// add a task to flush the db_schema cache every 10 minutes
|
||||
#[cfg(feature = "rest_broker")]
|
||||
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
|
||||
maintenance_tasks.spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(600)).await;
|
||||
db_schema_cache.0.run_pending_tasks();
|
||||
}
|
||||
});
|
||||
maintenance_tasks.spawn(db_schema_cache.maintain());
|
||||
}
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
|
||||
36
proxy/src/cache/common.rs
vendored
36
proxy/src/cache/common.rs
vendored
@@ -2,8 +2,12 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use moka::Expiry;
|
||||
use moka::notification::RemovalCause;
|
||||
|
||||
use crate::control_plane::messages::ControlPlaneErrorMessage;
|
||||
use crate::metrics::{
|
||||
CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
|
||||
};
|
||||
|
||||
/// Default TTL used when caching errors from control plane.
|
||||
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
|
||||
@@ -130,3 +134,35 @@ impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
|
||||
self.expire_early(value, updated_at)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
|
||||
let cause = match cause {
|
||||
RemovalCause::Expired => CacheRemovalCause::Expired,
|
||||
RemovalCause::Explicit => CacheRemovalCause::Explicit,
|
||||
RemovalCause::Replaced => CacheRemovalCause::Replaced,
|
||||
RemovalCause::Size => CacheRemovalCause::Size,
|
||||
};
|
||||
Metrics::get()
|
||||
.cache
|
||||
.evicted_total
|
||||
.inc(CacheEviction { cache: kind, cause });
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
|
||||
let outcome = if cache_result.is_some() {
|
||||
CacheOutcome::Hit
|
||||
} else {
|
||||
CacheOutcome::Miss
|
||||
};
|
||||
Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
|
||||
cache: kind,
|
||||
outcome,
|
||||
});
|
||||
cache_result
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn count_cache_insert(kind: CacheKind) {
|
||||
Metrics::get().cache.inserted_total.inc(kind);
|
||||
}
|
||||
|
||||
21
proxy/src/cache/node_info.rs
vendored
21
proxy/src/cache/node_info.rs
vendored
@@ -1,7 +1,8 @@
|
||||
use crate::cache::common::Cache;
|
||||
use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener};
|
||||
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
|
||||
use crate::config::CacheOptions;
|
||||
use crate::control_plane::NodeInfo;
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::types::EndpointCacheKey;
|
||||
|
||||
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
|
||||
@@ -19,18 +20,30 @@ impl Cache for NodeInfoCache {
|
||||
impl NodeInfoCache {
|
||||
pub fn new(config: CacheOptions) -> Self {
|
||||
let builder = moka::sync::Cache::builder()
|
||||
.name("node_info_cache")
|
||||
.name("node_info")
|
||||
.expire_after(CplaneExpiry::default());
|
||||
let builder = config.moka(builder);
|
||||
|
||||
if let Some(size) = config.size {
|
||||
Metrics::get()
|
||||
.cache
|
||||
.capacity
|
||||
.set(CacheKind::NodeInfo, size as i64);
|
||||
}
|
||||
|
||||
let builder = builder
|
||||
.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause));
|
||||
|
||||
Self(builder.build())
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
|
||||
count_cache_insert(CacheKind::NodeInfo);
|
||||
self.0.insert(key, value);
|
||||
}
|
||||
|
||||
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
|
||||
self.0.get(key)
|
||||
pub fn get(&self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
|
||||
count_cache_outcome(CacheKind::NodeInfo, self.0.get(key))
|
||||
}
|
||||
|
||||
pub fn get_entry(
|
||||
|
||||
43
proxy/src/cache/project_info.rs
vendored
43
proxy/src/cache/project_info.rs
vendored
@@ -5,11 +5,14 @@ use clashmap::ClashMap;
|
||||
use moka::sync::Cache;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
|
||||
use crate::cache::common::{
|
||||
ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener,
|
||||
};
|
||||
use crate::config::ProjectInfoCacheOptions;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
|
||||
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
/// Cache for project info.
|
||||
@@ -82,17 +85,32 @@ impl ProjectInfoCache {
|
||||
|
||||
impl ProjectInfoCache {
|
||||
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
|
||||
Metrics::get().cache.capacity.set(
|
||||
CacheKind::ProjectInfoRoles,
|
||||
(config.size * config.max_roles) as i64,
|
||||
);
|
||||
Metrics::get()
|
||||
.cache
|
||||
.capacity
|
||||
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
|
||||
|
||||
// we cache errors for 30 seconds, unless retry_at is set.
|
||||
let expiry = CplaneExpiry::default();
|
||||
Self {
|
||||
role_controls: Cache::builder()
|
||||
.name("role_access_controls")
|
||||
.name("project_info_roles")
|
||||
.eviction_listener(|_k, _v, cause| {
|
||||
eviction_listener(CacheKind::ProjectInfoRoles, cause);
|
||||
})
|
||||
.max_capacity(config.size * config.max_roles)
|
||||
.time_to_live(config.ttl)
|
||||
.expire_after(expiry)
|
||||
.build(),
|
||||
ep_controls: Cache::builder()
|
||||
.name("endpoint_access_controls")
|
||||
.name("project_info_endpoints")
|
||||
.eviction_listener(|_k, _v, cause| {
|
||||
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
|
||||
})
|
||||
.max_capacity(config.size)
|
||||
.time_to_live(config.ttl)
|
||||
.expire_after(expiry)
|
||||
@@ -111,7 +129,10 @@ impl ProjectInfoCache {
|
||||
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
|
||||
let role_name = RoleNameInt::get(role_name)?;
|
||||
|
||||
self.role_controls.get(&(endpoint_id, role_name))
|
||||
count_cache_outcome(
|
||||
CacheKind::ProjectInfoRoles,
|
||||
self.role_controls.get(&(endpoint_id, role_name)),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_endpoint_access(
|
||||
@@ -120,7 +141,10 @@ impl ProjectInfoCache {
|
||||
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
|
||||
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
|
||||
|
||||
self.ep_controls.get(&endpoint_id)
|
||||
count_cache_outcome(
|
||||
CacheKind::ProjectInfoEndpoints,
|
||||
self.ep_controls.get(&endpoint_id),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn insert_endpoint_access(
|
||||
@@ -144,6 +168,9 @@ impl ProjectInfoCache {
|
||||
"created a cache entry for endpoint access"
|
||||
);
|
||||
|
||||
count_cache_insert(CacheKind::ProjectInfoEndpoints);
|
||||
count_cache_insert(CacheKind::ProjectInfoRoles);
|
||||
|
||||
self.ep_controls.insert(endpoint_id, Ok(controls));
|
||||
self.role_controls
|
||||
.insert((endpoint_id, role_name), Ok(role_controls));
|
||||
@@ -172,10 +199,14 @@ impl ProjectInfoCache {
|
||||
// leave the entry alone if it's already Ok
|
||||
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
|
||||
// replace the entry
|
||||
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
|
||||
_ => {
|
||||
count_cache_insert(CacheKind::ProjectInfoEndpoints);
|
||||
moka::ops::compute::Op::Put(Err(msg.clone()))
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
count_cache_insert(CacheKind::ProjectInfoRoles);
|
||||
self.role_controls
|
||||
.insert((endpoint_id, role_name), Err(msg));
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ use measured::label::{
|
||||
use measured::metric::histogram::Thresholds;
|
||||
use measured::metric::name::MetricName;
|
||||
use measured::{
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
|
||||
MetricGroup,
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
|
||||
LabelGroup, MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
|
||||
use tokio::time::{self, Instant};
|
||||
@@ -29,6 +29,9 @@ pub struct Metrics {
|
||||
|
||||
#[metric(namespace = "service")]
|
||||
pub service: ServiceMetrics,
|
||||
|
||||
#[metric(namespace = "cache")]
|
||||
pub cache: CacheMetrics,
|
||||
}
|
||||
|
||||
static SELF: OnceLock<Metrics> = OnceLock::new();
|
||||
@@ -219,13 +222,6 @@ pub enum Bool {
|
||||
False,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "outcome")]
|
||||
pub enum CacheOutcome {
|
||||
Hit,
|
||||
Miss,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = ConsoleRequestSet)]
|
||||
pub struct ConsoleRequest<'a> {
|
||||
@@ -704,3 +700,59 @@ pub enum ServiceState {
|
||||
Running,
|
||||
Terminating,
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new())]
|
||||
pub struct CacheMetrics {
|
||||
/// The capacity of the cache
|
||||
pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
|
||||
/// The total number of entries inserted into the cache
|
||||
pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
|
||||
/// The total number of entries removed from the cache
|
||||
pub evicted_total: CounterVec<CacheEvictionSet>,
|
||||
/// The total number of cache requests
|
||||
pub request_total: CounterVec<CacheOutcomeSet>,
|
||||
}
|
||||
|
||||
impl Default for CacheMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
#[label(singleton = "cache")]
|
||||
pub enum CacheKind {
|
||||
NodeInfo,
|
||||
ProjectInfoEndpoints,
|
||||
ProjectInfoRoles,
|
||||
Schema,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
pub enum CacheRemovalCause {
|
||||
Expired,
|
||||
Explicit,
|
||||
Replaced,
|
||||
Size,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = CacheEvictionSet)]
|
||||
pub struct CacheEviction {
|
||||
pub cache: CacheKind,
|
||||
pub cause: CacheRemovalCause,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
pub enum CacheOutcome {
|
||||
Hit,
|
||||
Miss,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = CacheOutcomeSet)]
|
||||
pub struct CacheOutcomeGroup {
|
||||
pub cache: CacheKind,
|
||||
pub outcome: CacheOutcome,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -54,11 +55,12 @@ use super::http_util::{
|
||||
};
|
||||
use super::json::JsonConversionError;
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::read_body_with_limit;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::serverless::sql_over_http::HEADER_VALUE_TRUE;
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::util::deserialize_json_string;
|
||||
@@ -138,15 +140,31 @@ pub struct ApiConfig {
|
||||
}
|
||||
|
||||
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
|
||||
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
|
||||
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
|
||||
impl DbSchemaCache {
|
||||
pub fn new(config: crate::config::CacheOptions) -> Self {
|
||||
let builder = Cache::builder().name("db_schema_cache");
|
||||
let builder = Cache::builder().name("schema");
|
||||
let builder = config.moka(builder);
|
||||
|
||||
let metrics = &Metrics::get().cache;
|
||||
if let Some(size) = config.size {
|
||||
metrics.capacity.set(CacheKind::Schema, size as i64);
|
||||
}
|
||||
|
||||
let builder =
|
||||
builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause));
|
||||
|
||||
Self(builder.build())
|
||||
}
|
||||
|
||||
pub async fn maintain(&self) -> Result<Infallible, anyhow::Error> {
|
||||
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
self.0.run_pending_tasks();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_cached_or_remote(
|
||||
&self,
|
||||
endpoint_id: &EndpointCacheKey,
|
||||
@@ -156,7 +174,8 @@ impl DbSchemaCache {
|
||||
ctx: &RequestContext,
|
||||
config: &'static ProxyConfig,
|
||||
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
|
||||
match self.0.get(endpoint_id) {
|
||||
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
|
||||
match cache_result {
|
||||
Some(v) => Ok(v),
|
||||
None => {
|
||||
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
|
||||
@@ -180,6 +199,7 @@ impl DbSchemaCache {
|
||||
db_extra_search_path: None,
|
||||
};
|
||||
let value = Arc::new((api_config, schema_owned));
|
||||
count_cache_insert(CacheKind::Schema);
|
||||
self.0.insert(endpoint_id.clone(), value);
|
||||
return Err(e);
|
||||
}
|
||||
@@ -188,6 +208,7 @@ impl DbSchemaCache {
|
||||
}
|
||||
};
|
||||
let value = Arc::new((api_config, schema_owned));
|
||||
count_cache_insert(CacheKind::Schema);
|
||||
self.0.insert(endpoint_id.clone(), value.clone());
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user