From b39498cf6b5fde1a8214d17fa2ae25ceeb163c22 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 26 Jul 2025 18:00:28 +0100 Subject: [PATCH] add basic ref counting --- proxy/src/cache/project_info.rs | 35 +++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 171a4fb735..54780ae468 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -1,7 +1,7 @@ use std::convert::Infallible; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; -use crossbeam_skiplist::SkipSet; +use crossbeam_skiplist::SkipMap; use crossbeam_skiplist::equivalent::{Comparable, Equivalent}; use moka::sync::Cache; use tracing::{debug, info}; @@ -12,6 +12,7 @@ use crate::cache::common::{ use crate::config::ProjectInfoCacheOptions; use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason}; use crate::control_plane::{EndpointAccessControl, RoleAccessControl}; +use crate::ext::LockExt; use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt}; use crate::metrics::{CacheKind, Metrics}; use crate::types::{EndpointId, RoleName}; @@ -28,16 +29,18 @@ pub struct ProjectInfoCache { Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult>>, ep_controls: Cache>>, - project2ep: Arc>, - account2ep: Arc>, + project2ep: Arc>, + account2ep: Arc>, config: ProjectInfoCacheOptions, } +type RefCount = Mutex; // This is rather hacky. -// We use an ordered set of (K, V). +// We use an ordered map of (K, V) -> RefCount. // We use range queries over `(K, _)..(K+1, _)` to do the invalidation. -type MultiSet = SkipSet>; +// We use the RefCount to know when to remove the mappings. +type RefCountMultiSet = SkipMap, RefCount>; #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] struct KeyValue(K, V); @@ -77,7 +80,7 @@ impl ProjectInfoCache { info!("invalidating endpoint access for project `{project_id}`"); for entry in self.project2ep.range(Key::prefix(&project_id)) { - self.ep_controls.invalidate(&entry.1); + self.ep_controls.invalidate(&entry.key().1); } } @@ -85,7 +88,7 @@ impl ProjectInfoCache { info!("invalidating endpoint access for org `{account_id}`"); for entry in self.account2ep.range(Key::prefix(&account_id)) { - self.ep_controls.invalidate(&entry.1); + self.ep_controls.invalidate(&entry.key().1); } } @@ -100,7 +103,7 @@ impl ProjectInfoCache { ); for entry in self.project2ep.range(Key::prefix(&project_id)) { - self.role_controls.invalidate(&(entry.1, role_name)); + self.role_controls.invalidate(&(entry.key().1, role_name)); } } } @@ -116,8 +119,8 @@ impl ProjectInfoCache { .capacity .set(CacheKind::ProjectInfoEndpoints, config.size as i64); - let project2ep = Arc::new(MultiSet::new()); - let account2ep = Arc::new(MultiSet::new()); + let project2ep = Arc::new(RefCountMultiSet::new()); + let account2ep = Arc::new(RefCountMultiSet::new()); // we cache errors for 30 seconds, unless retry_at is set. let expiry = CplaneExpiry::default(); @@ -254,11 +257,17 @@ impl ProjectInfoCache { } fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) { - self.project2ep.insert(KeyValue(project_id, endpoint_id)); + let entry = self + .project2ep + .get_or_insert(KeyValue(project_id, endpoint_id), Mutex::new(0)); + *entry.value().lock_propagate_poison() += 1; } fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) { - self.account2ep.insert(KeyValue(account_id, endpoint_id)); + let entry = self + .account2ep + .get_or_insert(KeyValue(account_id, endpoint_id), Mutex::new(0)); + *entry.value().lock_propagate_poison() += 1; } pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {