fixup! remove values that are never read for redis notifications

This commit is contained in:
Conrad Ludgate
2025-06-07 20:11:02 +01:00
parent 1f62ee5f5c
commit 26dc39053e
2 changed files with 34 additions and 1 deletions

View File

@@ -7,6 +7,7 @@ use async_trait::async_trait;
use clashmap::ClashMap;
use clashmap::mapref::one::Ref;
use rand::{Rng, thread_rng};
use tokio::sync::Mutex;
use tokio::time::Instant;
use tracing::{debug, info};
@@ -21,6 +22,8 @@ pub(crate) trait ProjectInfoCache {
fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt);
fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt);
fn invalidate_role_secret_for_project(&self, project_id: ProjectIdInt, role_name: RoleNameInt);
async fn decrement_active_listeners(&self);
async fn increment_active_listeners(&self);
}
struct Entry<T> {
@@ -93,6 +96,7 @@ pub struct ProjectInfoCacheImpl {
start_time: Instant,
ttl_disabled_since_us: AtomicU64,
active_listeners_lock: Mutex<usize>,
}
#[async_trait]
@@ -148,6 +152,29 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
}
}
}
async fn decrement_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
if *listeners_guard == 0 {
tracing::error!("active_listeners count is already 0, something is broken");
return;
}
*listeners_guard -= 1;
if *listeners_guard == 0 {
self.ttl_disabled_since_us
.store(u64::MAX, std::sync::atomic::Ordering::SeqCst);
}
}
async fn increment_active_listeners(&self) {
let mut listeners_guard = self.active_listeners_lock.lock().await;
*listeners_guard += 1;
if *listeners_guard == 1 {
let new_ttl = (self.start_time.elapsed() + self.config.ttl).as_micros() as u64;
self.ttl_disabled_since_us
.store(new_ttl, std::sync::atomic::Ordering::SeqCst);
}
}
}
impl ProjectInfoCacheImpl {
@@ -159,6 +186,7 @@ impl ProjectInfoCacheImpl {
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
start_time: Instant::now(),
active_listeners_lock: Mutex::new(0),
}
}

View File

@@ -265,7 +265,10 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
return Ok(());
}
let mut conn = match try_connect(&redis).await {
Ok(conn) => conn,
Ok(conn) => {
handler.cache.increment_active_listeners().await;
conn
}
Err(e) => {
tracing::error!(
"failed to connect to redis: {e}, will try to reconnect in {RECONNECT_TIMEOUT:#?}"
@@ -284,9 +287,11 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
}
}
if cancellation_token.is_cancelled() {
handler.cache.decrement_active_listeners().await;
return Ok(());
}
}
handler.cache.decrement_active_listeners().await;
}
}