From f84e73c323ed088990395574b41dd2c912c454b1 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 7 Jun 2025 17:10:34 +0100 Subject: [PATCH] remove values that are never read for redis notifications --- proxy/src/binary/proxy.rs | 12 ++---------- proxy/src/cache/project_info.rs | 28 ---------------------------- proxy/src/redis/notifications.rs | 24 ++++-------------------- 3 files changed, 6 insertions(+), 58 deletions(-) diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 757c1e988b..d5d3508333 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -503,18 +503,10 @@ pub async fn run() -> anyhow::Result<()> { (client1, client2) => { let cache = api.caches.project_info.clone(); if let Some(client) = client1 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - args.region.clone(), - )); + maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); } if let Some(client) = client2 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - args.region.clone(), - )); + maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); } maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); } diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 9a4be2f904..4da3c22574 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use clashmap::ClashMap; use clashmap::mapref::one::Ref; use rand::{Rng, thread_rng}; -use tokio::sync::Mutex; use tokio::time::Instant; use tracing::{debug, info}; @@ -22,8 +21,6 @@ pub(crate) trait ProjectInfoCache { fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt); fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt); fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt); - async fn decrement_active_listeners(&self); - async fn increment_active_listeners(&self); } struct Entry { @@ -96,7 +93,6 @@ pub struct ProjectInfoCacheImpl { start_time: Instant, ttl_disabled_since_us: AtomicU64, - active_listeners_lock: Mutex, } #[async_trait] @@ -152,29 +148,6 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { } } } - - async fn decrement_active_listeners(&self) { - let mut listeners_guard = self.active_listeners_lock.lock().await; - if *listeners_guard == 0 { - tracing::error!("active_listeners count is already 0, something is broken"); - return; - } - *listeners_guard -= 1; - if *listeners_guard == 0 { - self.ttl_disabled_since_us - .store(u64::MAX, std::sync::atomic::Ordering::SeqCst); - } - } - - async fn increment_active_listeners(&self) { - let mut listeners_guard = self.active_listeners_lock.lock().await; - *listeners_guard += 1; - if *listeners_guard == 1 { - let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64; - self.ttl_disabled_since_us - .store(new_ttl, std::sync::atomic::Ordering::SeqCst); - } - } } impl ProjectInfoCacheImpl { @@ -186,7 +159,6 @@ impl ProjectInfoCacheImpl { config, ttl_disabled_since_us: AtomicU64::new(u64::MAX), start_time: Instant::now(), - active_listeners_lock: Mutex::new(0), } } diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 6c8260027f..a6d376562b 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -141,29 +141,19 @@ where struct MessageHandler { cache: Arc, - region_id: String, } impl Clone for MessageHandler { fn clone(&self) -> Self { Self { cache: self.cache.clone(), - region_id: self.region_id.clone(), } } } impl MessageHandler { - pub(crate) fn new(cache: Arc, region_id: String) -> Self { - Self { cache, region_id } - } - - pub(crate) async fn increment_active_listeners(&self) { - self.cache.increment_active_listeners().await; - } - - pub(crate) async fn decrement_active_listeners(&self) { - self.cache.decrement_active_listeners().await; + pub(crate) fn new(cache: Arc) -> Self { + Self { cache } } #[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))] @@ -275,10 +265,7 @@ async fn handle_messages( return Ok(()); } let mut conn = match try_connect(&redis).await { - Ok(conn) => { - handler.increment_active_listeners().await; - conn - } + Ok(conn) => conn, Err(e) => { tracing::error!( "failed to connect to redis: {e}, will try to reconnect in {RECONNECT_TIMEOUT:#?}" @@ -297,11 +284,9 @@ async fn handle_messages( } } if cancellation_token.is_cancelled() { - handler.decrement_active_listeners().await; return Ok(()); } } - handler.decrement_active_listeners().await; } } @@ -310,12 +295,11 @@ async fn handle_messages( pub async fn task_main( redis: ConnectionWithCredentialsProvider, cache: Arc, - region_id: String, ) -> anyhow::Result where C: ProjectInfoCache + Send + Sync + 'static, { - let handler = MessageHandler::new(cache, region_id); + let handler = MessageHandler::new(cache); // 6h - 1m. // There will be 1 minute overlap between two tasks. But at least we can be sure that no message is lost. let mut interval = tokio::time::interval(std::time::Duration::from_secs(6 * 60 * 60 - 60));