add basic ref counting

This commit is contained in:
Conrad Ludgate
2025-07-26 18:00:28 +01:00
parent 68ccdd910a
commit b39498cf6b

View File

@@ -1,7 +1,7 @@
use std::convert::Infallible;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use crossbeam_skiplist::SkipSet;
use crossbeam_skiplist::SkipMap;
use crossbeam_skiplist::equivalent::{Comparable, Equivalent};
use moka::sync::Cache;
use tracing::{debug, info};
@@ -12,6 +12,7 @@ use crate::cache::common::{
use crate::config::ProjectInfoCacheOptions;
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
use crate::ext::LockExt;
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
use crate::metrics::{CacheKind, Metrics};
use crate::types::{EndpointId, RoleName};
@@ -28,16 +29,18 @@ pub struct ProjectInfoCache {
Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<Entry<RoleAccessControl>>>,
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<Entry<EndpointAccessControl>>>,
project2ep: Arc<MultiSet<ProjectIdInt, EndpointIdInt>>,
account2ep: Arc<MultiSet<AccountIdInt, EndpointIdInt>>,
project2ep: Arc<RefCountMultiSet<ProjectIdInt, EndpointIdInt>>,
account2ep: Arc<RefCountMultiSet<AccountIdInt, EndpointIdInt>>,
config: ProjectInfoCacheOptions,
}
type RefCount = Mutex<usize>;
// This is rather hacky.
// We use an ordered set of (K, V).
// We use an ordered map of (K, V) -> RefCount.
// We use range queries over `(K, _)..(K+1, _)` to do the invalidation.
type MultiSet<K, V> = SkipSet<KeyValue<K, V>>;
// We use the RefCount to know when to remove the mappings.
type RefCountMultiSet<K, V> = SkipMap<KeyValue<K, V>, RefCount>;
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
struct KeyValue<K, V>(K, V);
@@ -77,7 +80,7 @@ impl ProjectInfoCache {
info!("invalidating endpoint access for project `{project_id}`");
for entry in self.project2ep.range(Key::prefix(&project_id)) {
self.ep_controls.invalidate(&entry.1);
self.ep_controls.invalidate(&entry.key().1);
}
}
@@ -85,7 +88,7 @@ impl ProjectInfoCache {
info!("invalidating endpoint access for org `{account_id}`");
for entry in self.account2ep.range(Key::prefix(&account_id)) {
self.ep_controls.invalidate(&entry.1);
self.ep_controls.invalidate(&entry.key().1);
}
}
@@ -100,7 +103,7 @@ impl ProjectInfoCache {
);
for entry in self.project2ep.range(Key::prefix(&project_id)) {
self.role_controls.invalidate(&(entry.1, role_name));
self.role_controls.invalidate(&(entry.key().1, role_name));
}
}
}
@@ -116,8 +119,8 @@ impl ProjectInfoCache {
.capacity
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
let project2ep = Arc::new(MultiSet::new());
let account2ep = Arc::new(MultiSet::new());
let project2ep = Arc::new(RefCountMultiSet::new());
let account2ep = Arc::new(RefCountMultiSet::new());
// we cache errors for 30 seconds, unless retry_at is set.
let expiry = CplaneExpiry::default();
@@ -254,11 +257,17 @@ impl ProjectInfoCache {
}
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
self.project2ep.insert(KeyValue(project_id, endpoint_id));
let entry = self
.project2ep
.get_or_insert(KeyValue(project_id, endpoint_id), Mutex::new(0));
*entry.value().lock_propagate_poison() += 1;
}
fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) {
self.account2ep.insert(KeyValue(account_id, endpoint_id));
let entry = self
.account2ep
.get_or_insert(KeyValue(account_id, endpoint_id), Mutex::new(0));
*entry.value().lock_propagate_poison() += 1;
}
pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {