mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
use crossbeam-skiplist and clever range queries for a lock-free way to represent K->Set<V>
This commit is contained in:
83
proxy/src/cache/project_info.rs
vendored
83
proxy/src/cache/project_info.rs
vendored
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::convert::Infallible;
|
||||
|
||||
use clashmap::ClashMap;
|
||||
use crossbeam_skiplist::SkipSet;
|
||||
use crossbeam_skiplist::equivalent::{Comparable, Equivalent};
|
||||
use moka::sync::Cache;
|
||||
use tracing::{debug, info};
|
||||
|
||||
@@ -26,13 +26,38 @@ pub struct ProjectInfoCache {
|
||||
role_controls: Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<RoleAccessControl>>,
|
||||
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<EndpointAccessControl>>,
|
||||
|
||||
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
|
||||
// FIXME(stefan): we need a way to GC the account2ep map.
|
||||
account2ep: ClashMap<AccountIdInt, HashSet<EndpointIdInt>>,
|
||||
project2ep: MultiSet<ProjectIdInt, EndpointIdInt>,
|
||||
account2ep: MultiSet<AccountIdInt, EndpointIdInt>,
|
||||
|
||||
config: ProjectInfoCacheOptions,
|
||||
}
|
||||
|
||||
// This is rather hacky.
|
||||
// We use an ordered set of (K, V).
|
||||
// We use range queries over `(K, _)..(K+1, _)` to do the invalidation.
|
||||
type MultiSet<K, V> = SkipSet<KeyValue<K, V>>;
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||
struct KeyValue<K, V>(K, V);
|
||||
struct Key<'a, K>(&'a K, bool);
|
||||
|
||||
impl<'a, K> Key<'a, K> {
|
||||
fn prefix(key: &'a K) -> std::ops::Range<Self> {
|
||||
Self(key, false)..Self(key, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K: Ord, V> Equivalent<Key<'a, K>> for KeyValue<K, V> {
|
||||
fn equivalent(&self, key: &Key<'a, K>) -> bool {
|
||||
self.0 == *key.0 && !key.1
|
||||
}
|
||||
}
|
||||
impl<'a, K: Ord, V> Comparable<Key<'a, K>> for KeyValue<K, V> {
|
||||
fn compare(&self, key: &Key<'a, K>) -> std::cmp::Ordering {
|
||||
self.0.cmp(key.0).then(false.cmp(&key.1))
|
||||
}
|
||||
}
|
||||
|
||||
impl ProjectInfoCache {
|
||||
pub fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
|
||||
info!("invalidating endpoint access for `{endpoint_id}`");
|
||||
@@ -41,25 +66,17 @@ impl ProjectInfoCache {
|
||||
|
||||
pub fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
|
||||
info!("invalidating endpoint access for project `{project_id}`");
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.ep_controls.invalidate(&endpoint_id);
|
||||
|
||||
for entry in self.project2ep.range(Key::prefix(&project_id)) {
|
||||
self.ep_controls.invalidate(&entry.1);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) {
|
||||
info!("invalidating endpoint access for org `{account_id}`");
|
||||
let endpoints = self
|
||||
.account2ep
|
||||
.get(&account_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.ep_controls.invalidate(&endpoint_id);
|
||||
|
||||
for entry in self.account2ep.range(Key::prefix(&account_id)) {
|
||||
self.ep_controls.invalidate(&entry.1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,13 +89,9 @@ impl ProjectInfoCache {
|
||||
"invalidating role secret for project_id `{}` and role_name `{}`",
|
||||
project_id, role_name,
|
||||
);
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.role_controls.invalidate(&(endpoint_id, role_name));
|
||||
|
||||
for entry in self.project2ep.range(Key::prefix(&project_id)) {
|
||||
self.role_controls.invalidate(&(entry.1, role_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -115,8 +128,8 @@ impl ProjectInfoCache {
|
||||
.time_to_live(config.ttl)
|
||||
.expire_after(expiry)
|
||||
.build(),
|
||||
project2ep: ClashMap::new(),
|
||||
account2ep: ClashMap::new(),
|
||||
project2ep: MultiSet::new(),
|
||||
account2ep: MultiSet::new(),
|
||||
config,
|
||||
}
|
||||
}
|
||||
@@ -212,21 +225,11 @@ impl ProjectInfoCache {
|
||||
}
|
||||
|
||||
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
} else {
|
||||
self.project2ep
|
||||
.insert(project_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
self.project2ep.insert(KeyValue(project_id, endpoint_id));
|
||||
}
|
||||
|
||||
fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
} else {
|
||||
self.account2ep
|
||||
.insert(account_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
self.account2ep.insert(KeyValue(account_id, endpoint_id));
|
||||
}
|
||||
|
||||
pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {
|
||||
|
||||
Reference in New Issue
Block a user