From 26dc39053ebd6e6709af8d085406b7582afc9bc3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 7 Jun 2025 20:11:02 +0100 Subject: [PATCH] fixup! remove values that are never read for redis notifications --- proxy/src/cache/project_info.rs | 28 ++++++++++++++++++++++++++++ proxy/src/redis/notifications.rs | 7 ++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 4da3c22574..9a4be2f904 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use clashmap::ClashMap; use clashmap::mapref::one::Ref; use rand::{Rng, thread_rng}; +use tokio::sync::Mutex; use tokio::time::Instant; use tracing::{debug, info}; @@ -21,6 +22,8 @@ pub(crate) trait ProjectInfoCache { fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt); fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt); fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt); + async fn decrement_active_listeners(&self); + async fn increment_active_listeners(&self); } struct Entry { @@ -93,6 +96,7 @@ pub struct ProjectInfoCacheImpl { start_time: Instant, ttl_disabled_since_us: AtomicU64, + active_listeners_lock: Mutex, } #[async_trait] @@ -148,6 +152,29 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { } } } + + async fn decrement_active_listeners(&self) { + let mut listeners_guard = self.active_listeners_lock.lock().await; + if *listeners_guard == 0 { + tracing::error!("active_listeners count is already 0, something is broken"); + return; + } + *listeners_guard -= 1; + if *listeners_guard == 0 { + self.ttl_disabled_since_us + .store(u64::MAX, std::sync::atomic::Ordering::SeqCst); + } + } + + async fn increment_active_listeners(&self) { + let mut listeners_guard = self.active_listeners_lock.lock().await; + *listeners_guard += 1; + if *listeners_guard == 1 { + let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64; + self.ttl_disabled_since_us + .store(new_ttl, std::sync::atomic::Ordering::SeqCst); + } + } } impl ProjectInfoCacheImpl { @@ -159,6 +186,7 @@ impl ProjectInfoCacheImpl { config, ttl_disabled_since_us: AtomicU64::new(u64::MAX), start_time: Instant::now(), + active_listeners_lock: Mutex::new(0), } } diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index a6d376562b..973a4c5b02 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -265,7 +265,10 @@ async fn handle_messages( return Ok(()); } let mut conn = match try_connect(&redis).await { - Ok(conn) => conn, + Ok(conn) => { + handler.cache.increment_active_listeners().await; + conn + } Err(e) => { tracing::error!( "failed to connect to redis: {e}, will try to reconnect in {RECONNECT_TIMEOUT:#?}" @@ -284,9 +287,11 @@ async fn handle_messages( } } if cancellation_token.is_cancelled() { + handler.cache.decrement_active_listeners().await; return Ok(()); } } + handler.cache.decrement_active_listeners().await; } }