From 3d2af1a83fb224a7ffa8ef799ec868b4665fd610 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sat, 26 Jul 2025 10:13:42 +0100 Subject: [PATCH] use crossbeam-skiplist and clever range queries for a lock-free way to represent K->Set --- Cargo.lock | 77 +++++++++++++++++++++++------- proxy/Cargo.toml | 1 + proxy/src/cache/project_info.rs | 83 +++++++++++++++++---------------- proxy/src/intern.rs | 8 ++-- 4 files changed, 107 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b43f4fdea0..46d045ed06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,6 +253,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "atomic-maybe-uninit" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdca051497d8e15fcc5cb1b07ff75ee2afc3789b986ca9ee2af204847c0a9223" + [[package]] name = "atomic-take" version = "1.1.0" @@ -977,7 +983,7 @@ dependencies = [ "bitflags 2.8.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1252,7 +1258,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.19", "hashbrown 0.15.2", "lock_api", "parking_lot_core 0.9.8", @@ -1408,7 +1414,7 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.19", ] [[package]] @@ -1619,7 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" dependencies = [ "cfg-if", - "crossbeam-utils", + "crossbeam-utils 0.8.19", ] [[package]] @@ -1628,8 +1634,8 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.8.19", ] [[package]] @@ -1638,7 +1644,24 @@ version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.19", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" +dependencies = [ + "crossbeam-utils 0.8.21", +] + +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" +dependencies = [ + "crossbeam-epoch 0.9.18 (git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9)", + "crossbeam-utils 0.8.21", ] [[package]] @@ -1647,6 +1670,14 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" +dependencies = [ + "atomic-maybe-uninit", +] + [[package]] name = "crossterm" version = "0.27.0" @@ -1790,7 +1821,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", - "crossbeam-utils", + "crossbeam-utils 0.8.19", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -3337,7 +3368,7 @@ dependencies = [ "ahash", "clap", "crossbeam-channel", - "crossbeam-utils", + "crossbeam-utils 0.8.19", "dashmap 6.1.0", "env_logger", "indexmap 2.10.0", @@ -3439,6 +3470,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -3742,7 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0" dependencies = [ "bytes", - "crossbeam-utils", + "crossbeam-utils 0.8.19", "hashbrown 0.14.5", "itoa", "lasso", @@ -3906,8 +3946,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" dependencies = [ "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.8.19", "loom", "parking_lot 0.12.1", "portable-atomic", @@ -5340,7 +5380,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -5373,7 +5413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.100", @@ -5422,6 +5462,7 @@ dependencies = [ "clashmap", "compute_api", "consumption_metrics", + "crossbeam-skiplist", "ecdsa 0.16.9", "ed25519-dalek", "env_logger", @@ -5527,7 +5568,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.8.19", "libc", "once_cell", "raw-cpuid", @@ -5728,7 +5769,7 @@ checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", - "crossbeam-utils", + "crossbeam-utils 0.8.19", "num_cpus", ] @@ -9014,8 +9055,8 @@ dependencies = [ "clap", "clap_builder", "const-oid", - "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.8.19", "crypto-bigint 0.5.5", "der 0.7.8", "deranged", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index eb8c0ed037..ff78892b00 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -29,6 +29,7 @@ clap = { workspace = true, features = ["derive", "env"] } clashmap.workspace = true compute_api.workspace = true consumption_metrics.workspace = true +crossbeam-skiplist = { git = "https://github.com/crossbeam-rs/crossbeam", rev = "8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" } env_logger.workspace = true framed-websockets.workspace = true futures.workspace = true diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index b7347d8e57..b8aa445e5c 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -1,7 +1,7 @@ -use std::collections::HashSet; use std::convert::Infallible; -use clashmap::ClashMap; +use crossbeam_skiplist::SkipSet; +use crossbeam_skiplist::equivalent::{Comparable, Equivalent}; use moka::sync::Cache; use tracing::{debug, info}; @@ -26,13 +26,38 @@ pub struct ProjectInfoCache { role_controls: Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult>, ep_controls: Cache>, - project2ep: ClashMap>, - // FIXME(stefan): we need a way to GC the account2ep map. - account2ep: ClashMap>, + project2ep: MultiSet, + account2ep: MultiSet, config: ProjectInfoCacheOptions, } +// This is rather hacky. +// We use an ordered set of (K, V). +// We use range queries over `(K, _)..(K+1, _)` to do the invalidation. +type MultiSet = SkipSet>; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] +struct KeyValue(K, V); +struct Key<'a, K>(&'a K, bool); + +impl<'a, K> Key<'a, K> { + fn prefix(key: &'a K) -> std::ops::Range { + Self(key, false)..Self(key, true) + } +} + +impl<'a, K: Ord, V> Equivalent> for KeyValue { + fn equivalent(&self, key: &Key<'a, K>) -> bool { + self.0 == *key.0 && !key.1 + } +} +impl<'a, K: Ord, V> Comparable> for KeyValue { + fn compare(&self, key: &Key<'a, K>) -> std::cmp::Ordering { + self.0.cmp(key.0).then(false.cmp(&key.1)) + } +} + impl ProjectInfoCache { pub fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) { info!("invalidating endpoint access for `{endpoint_id}`"); @@ -41,25 +66,17 @@ impl ProjectInfoCache { pub fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) { info!("invalidating endpoint access for project `{project_id}`"); - let endpoints = self - .project2ep - .get(&project_id) - .map(|kv| kv.value().clone()) - .unwrap_or_default(); - for endpoint_id in endpoints { - self.ep_controls.invalidate(&endpoint_id); + + for entry in self.project2ep.range(Key::prefix(&project_id)) { + self.ep_controls.invalidate(&entry.1); } } pub fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) { info!("invalidating endpoint access for org `{account_id}`"); - let endpoints = self - .account2ep - .get(&account_id) - .map(|kv| kv.value().clone()) - .unwrap_or_default(); - for endpoint_id in endpoints { - self.ep_controls.invalidate(&endpoint_id); + + for entry in self.account2ep.range(Key::prefix(&account_id)) { + self.ep_controls.invalidate(&entry.1); } } @@ -72,13 +89,9 @@ impl ProjectInfoCache { "invalidating role secret for project_id `{}` and role_name `{}`", project_id, role_name, ); - let endpoints = self - .project2ep - .get(&project_id) - .map(|kv| kv.value().clone()) - .unwrap_or_default(); - for endpoint_id in endpoints { - self.role_controls.invalidate(&(endpoint_id, role_name)); + + for entry in self.project2ep.range(Key::prefix(&project_id)) { + self.role_controls.invalidate(&(entry.1, role_name)); } } } @@ -115,8 +128,8 @@ impl ProjectInfoCache { .time_to_live(config.ttl) .expire_after(expiry) .build(), - project2ep: ClashMap::new(), - account2ep: ClashMap::new(), + project2ep: MultiSet::new(), + account2ep: MultiSet::new(), config, } } @@ -212,21 +225,11 @@ impl ProjectInfoCache { } fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) { - if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) { - endpoints.insert(endpoint_id); - } else { - self.project2ep - .insert(project_id, HashSet::from([endpoint_id])); - } + self.project2ep.insert(KeyValue(project_id, endpoint_id)); } fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) { - if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) { - endpoints.insert(endpoint_id); - } else { - self.account2ep - .insert(account_id, HashSet::from([endpoint_id])); - } + self.account2ep.insert(KeyValue(account_id, endpoint_id)); } pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) { diff --git a/proxy/src/intern.rs b/proxy/src/intern.rs index 825f2d1049..913fa1a7a5 100644 --- a/proxy/src/intern.rs +++ b/proxy/src/intern.rs @@ -18,7 +18,7 @@ pub struct StringInterner { _id: PhantomData, } -#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash)] +#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash, PartialOrd, Ord)] pub struct InternedString { inner: Spur, _id: PhantomData, @@ -146,7 +146,7 @@ impl From<&RoleName> for RoleNameInt { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct EndpointIdTag; impl InternId for EndpointIdTag { fn get_interner() -> &'static StringInterner { @@ -186,7 +186,7 @@ impl From for BranchIdInt { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ProjectIdTag; impl InternId for ProjectIdTag { fn get_interner() -> &'static StringInterner { @@ -206,7 +206,7 @@ impl From for ProjectIdInt { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct AccountIdTag; impl InternId for AccountIdTag { fn get_interner() -> &'static StringInterner {