From 25af32e8345d04db3ea26617771caae54be767da Mon Sep 17 00:00:00 2001 From: Anna Khanova <32508607+khanova@users.noreply.github.com> Date: Thu, 2 May 2024 11:50:11 +0200 Subject: [PATCH] proxy: keep track on the number of events from redis by type. (#7582) ## Problem It's unclear what is the distribution of messages, proxy is consuming from redis. ## Summary of changes Add counter. --- proxy/src/cache/endpoints.rs | 14 +++++++++++++- proxy/src/metrics.rs | 14 ++++++++++++++ proxy/src/redis/notifications.rs | 17 ++++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs index 02511e6ff7..4bc10a6020 100644 --- a/proxy/src/cache/endpoints.rs +++ b/proxy/src/cache/endpoints.rs @@ -21,7 +21,7 @@ use crate::{ config::EndpointCacheConfig, context::RequestMonitoring, intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}, - metrics::{Metrics, RedisErrors}, + metrics::{Metrics, RedisErrors, RedisEventsCount}, rate_limiter::GlobalRateLimiter, redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider, EndpointId, @@ -100,14 +100,26 @@ impl EndpointsCache { if let Some(endpoint_created) = key.endpoint_created { self.endpoints .insert(EndpointIdInt::from(&endpoint_created.endpoint_id.into())); + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::EndpointCreated); } if let Some(branch_created) = key.branch_created { self.branches .insert(BranchIdInt::from(&branch_created.branch_id.into())); + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::BranchCreated); } if let Some(project_created) = key.project_created { self.projects .insert(ProjectIdInt::from(&project_created.project_id.into())); + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::ProjectCreated); } } pub async fn do_read( diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index c129ece059..4a54857012 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -123,6 +123,9 @@ pub struct ProxyMetrics { /// Number of retries (per outcome, per retry_type). #[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))] pub retries_metric: HistogramVec, + + /// Number of events consumed from redis (per event type). + pub redis_events_count: CounterVec>, } #[derive(MetricGroup)] @@ -530,3 +533,14 @@ pub enum RetryType { WakeCompute, ConnectToCompute, } + +#[derive(FixedCardinalityLabel, Clone, Copy, Debug)] +#[label(singleton = "event")] +pub enum RedisEventsCount { + EndpointCreated, + BranchCreated, + ProjectCreated, + CancelSession, + PasswordUpdate, + AllowedIpsUpdate, +} diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 5a38530faf..ba4dfb755e 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -11,7 +11,7 @@ use crate::{ cache::project_info::ProjectInfoCache, cancellation::{CancelMap, CancellationHandler}, intern::{ProjectIdInt, RoleNameInt}, - metrics::{Metrics, RedisErrors}, + metrics::{Metrics, RedisErrors, RedisEventsCount}, }; const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates"; @@ -118,6 +118,10 @@ impl MessageHandler { "session_id", &tracing::field::display(cancel_session.session_id), ); + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::CancelSession); if let Some(cancel_region) = cancel_session.region_id { // If the message is not for this region, ignore it. if cancel_region != self.region_id { @@ -138,6 +142,17 @@ impl MessageHandler { } _ => { invalidate_cache(self.cache.clone(), msg.clone()); + if matches!(msg, AllowedIpsUpdate { .. }) { + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::AllowedIpsUpdate); + } else if matches!(msg, PasswordUpdate { .. }) { + Metrics::get() + .proxy + .redis_events_count + .inc(RedisEventsCount::PasswordUpdate); + } // It might happen that the invalid entry is on the way to be cached. // To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds. // TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.