mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Compare commits
6 Commits
bodobolero
...
conrad/int
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8549b42bca | ||
|
|
b39498cf6b | ||
|
|
68ccdd910a | ||
|
|
8b299bfb4e | ||
|
|
3d2af1a83f | ||
|
|
e4872bec57 |
77
Cargo.lock
generated
77
Cargo.lock
generated
@@ -253,6 +253,12 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-maybe-uninit"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bdca051497d8e15fcc5cb1b07ff75ee2afc3789b986ca9ee2af204847c0a9223"
|
||||
|
||||
[[package]]
|
||||
name = "atomic-take"
|
||||
version = "1.1.0"
|
||||
@@ -977,7 +983,7 @@ dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.13.0",
|
||||
"log",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
@@ -1252,7 +1258,7 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"hashbrown 0.15.2",
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.8",
|
||||
@@ -1408,7 +1414,7 @@ version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1619,7 +1625,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1628,8 +1634,8 @@ version = "0.8.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1638,7 +1644,24 @@ version = "0.9.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.9.18"
|
||||
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
|
||||
dependencies = [
|
||||
"crossbeam-utils 0.8.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-skiplist"
|
||||
version = "0.1.3"
|
||||
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
|
||||
dependencies = [
|
||||
"crossbeam-epoch 0.9.18 (git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9)",
|
||||
"crossbeam-utils 0.8.21",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1647,6 +1670,14 @@ version = "0.8.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.21"
|
||||
source = "git+https://github.com/crossbeam-rs/crossbeam?rev=8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9#8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9"
|
||||
dependencies = [
|
||||
"atomic-maybe-uninit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossterm"
|
||||
version = "0.27.0"
|
||||
@@ -1790,7 +1821,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
@@ -3337,7 +3368,7 @@ dependencies = [
|
||||
"ahash",
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"dashmap 6.1.0",
|
||||
"env_logger",
|
||||
"indexmap 2.10.0",
|
||||
@@ -3439,6 +3470,15 @@ dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.11"
|
||||
@@ -3742,7 +3782,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"hashbrown 0.14.5",
|
||||
"itoa",
|
||||
"lasso",
|
||||
@@ -3906,8 +3946,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"loom",
|
||||
"parking_lot 0.12.1",
|
||||
"portable-atomic",
|
||||
@@ -5340,7 +5380,7 @@ checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"heck 0.5.0",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.13.0",
|
||||
"log",
|
||||
"multimap",
|
||||
"once_cell",
|
||||
@@ -5373,7 +5413,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"itertools 0.12.1",
|
||||
"itertools 0.13.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
@@ -5422,6 +5462,7 @@ dependencies = [
|
||||
"clashmap",
|
||||
"compute_api",
|
||||
"consumption_metrics",
|
||||
"crossbeam-skiplist",
|
||||
"ecdsa 0.16.9",
|
||||
"ed25519-dalek",
|
||||
"env_logger",
|
||||
@@ -5527,7 +5568,7 @@ version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
@@ -5728,7 +5769,7 @@ checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"crossbeam-deque",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"num_cpus",
|
||||
]
|
||||
|
||||
@@ -9014,8 +9055,8 @@ dependencies = [
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"const-oid",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"crossbeam-epoch 0.9.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"crossbeam-utils 0.8.19",
|
||||
"crypto-bigint 0.5.5",
|
||||
"der 0.7.8",
|
||||
"deranged",
|
||||
|
||||
@@ -29,6 +29,7 @@ clap = { workspace = true, features = ["derive", "env"] }
|
||||
clashmap.workspace = true
|
||||
compute_api.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
crossbeam-skiplist = { git = "https://github.com/crossbeam-rs/crossbeam", rev = "8d24b3460bb7a9968e2ee21eeb42c16b0b5d02e9" }
|
||||
env_logger.workspace = true
|
||||
framed-websockets.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@@ -535,12 +535,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
// add a task to flush the db_schema cache every 10 minutes
|
||||
#[cfg(feature = "rest_broker")]
|
||||
if let Some(db_schema_cache) = &config.rest_config.db_schema_cache {
|
||||
maintenance_tasks.spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(600)).await;
|
||||
db_schema_cache.0.run_pending_tasks();
|
||||
}
|
||||
});
|
||||
maintenance_tasks.spawn(db_schema_cache.maintain());
|
||||
}
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
|
||||
36
proxy/src/cache/common.rs
vendored
36
proxy/src/cache/common.rs
vendored
@@ -2,8 +2,12 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use moka::Expiry;
|
||||
use moka::notification::RemovalCause;
|
||||
|
||||
use crate::control_plane::messages::ControlPlaneErrorMessage;
|
||||
use crate::metrics::{
|
||||
CacheEviction, CacheKind, CacheOutcome, CacheOutcomeGroup, CacheRemovalCause, Metrics,
|
||||
};
|
||||
|
||||
/// Default TTL used when caching errors from control plane.
|
||||
pub const DEFAULT_ERROR_TTL: Duration = Duration::from_secs(30);
|
||||
@@ -130,3 +134,35 @@ impl<K, V> Expiry<K, ControlPlaneResult<V>> for CplaneExpiry {
|
||||
self.expire_early(value, updated_at)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eviction_listener(kind: CacheKind, cause: RemovalCause) {
|
||||
let cause = match cause {
|
||||
RemovalCause::Expired => CacheRemovalCause::Expired,
|
||||
RemovalCause::Explicit => CacheRemovalCause::Explicit,
|
||||
RemovalCause::Replaced => CacheRemovalCause::Replaced,
|
||||
RemovalCause::Size => CacheRemovalCause::Size,
|
||||
};
|
||||
Metrics::get()
|
||||
.cache
|
||||
.evicted_total
|
||||
.inc(CacheEviction { cache: kind, cause });
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn count_cache_outcome<T>(kind: CacheKind, cache_result: Option<T>) -> Option<T> {
|
||||
let outcome = if cache_result.is_some() {
|
||||
CacheOutcome::Hit
|
||||
} else {
|
||||
CacheOutcome::Miss
|
||||
};
|
||||
Metrics::get().cache.request_total.inc(CacheOutcomeGroup {
|
||||
cache: kind,
|
||||
outcome,
|
||||
});
|
||||
cache_result
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn count_cache_insert(kind: CacheKind) {
|
||||
Metrics::get().cache.inserted_total.inc(kind);
|
||||
}
|
||||
|
||||
19
proxy/src/cache/node_info.rs
vendored
19
proxy/src/cache/node_info.rs
vendored
@@ -1,7 +1,8 @@
|
||||
use crate::cache::common::Cache;
|
||||
use crate::cache::common::{Cache, count_cache_insert, count_cache_outcome, eviction_listener};
|
||||
use crate::cache::{Cached, ControlPlaneResult, CplaneExpiry};
|
||||
use crate::config::CacheOptions;
|
||||
use crate::control_plane::NodeInfo;
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::types::EndpointCacheKey;
|
||||
|
||||
pub(crate) struct NodeInfoCache(moka::sync::Cache<EndpointCacheKey, ControlPlaneResult<NodeInfo>>);
|
||||
@@ -22,15 +23,27 @@ impl NodeInfoCache {
|
||||
.name("node_info_cache")
|
||||
.expire_after(CplaneExpiry::default());
|
||||
let builder = config.moka(builder);
|
||||
|
||||
if let Some(size) = config.size {
|
||||
Metrics::get()
|
||||
.cache
|
||||
.capacity
|
||||
.set(CacheKind::NodeInfo, size as i64);
|
||||
}
|
||||
|
||||
let builder = builder
|
||||
.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::NodeInfo, cause));
|
||||
|
||||
Self(builder.build())
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: EndpointCacheKey, value: ControlPlaneResult<NodeInfo>) {
|
||||
count_cache_insert(CacheKind::NodeInfo);
|
||||
self.0.insert(key, value);
|
||||
}
|
||||
|
||||
pub fn get(&'static self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
|
||||
self.0.get(key)
|
||||
pub fn get(&self, key: &EndpointCacheKey) -> Option<ControlPlaneResult<NodeInfo>> {
|
||||
count_cache_outcome(CacheKind::NodeInfo, self.0.get(key))
|
||||
}
|
||||
|
||||
pub fn get_entry(
|
||||
|
||||
263
proxy/src/cache/project_info.rs
vendored
263
proxy/src/cache/project_info.rs
vendored
@@ -1,15 +1,20 @@
|
||||
use std::collections::HashSet;
|
||||
use std::convert::Infallible;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use clashmap::ClashMap;
|
||||
use crossbeam_skiplist::SkipMap;
|
||||
use crossbeam_skiplist::equivalent::{Comparable, Equivalent};
|
||||
use moka::sync::Cache;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::cache::common::{ControlPlaneResult, CplaneExpiry};
|
||||
use crate::cache::common::{
|
||||
ControlPlaneResult, CplaneExpiry, count_cache_insert, count_cache_outcome, eviction_listener,
|
||||
};
|
||||
use crate::config::ProjectInfoCacheOptions;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Reason};
|
||||
use crate::control_plane::{EndpointAccessControl, RoleAccessControl};
|
||||
use crate::ext::LockExt;
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt, RoleNameInt};
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
/// Cache for project info.
|
||||
@@ -20,16 +25,83 @@ use crate::types::{EndpointId, RoleName};
|
||||
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
|
||||
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
|
||||
pub struct ProjectInfoCache {
|
||||
role_controls: Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<RoleAccessControl>>,
|
||||
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<EndpointAccessControl>>,
|
||||
role_controls:
|
||||
Cache<(EndpointIdInt, RoleNameInt), ControlPlaneResult<Entry<RoleAccessControl>>>,
|
||||
ep_controls: Cache<EndpointIdInt, ControlPlaneResult<Entry<EndpointAccessControl>>>,
|
||||
|
||||
project2ep: ClashMap<ProjectIdInt, HashSet<EndpointIdInt>>,
|
||||
// FIXME(stefan): we need a way to GC the account2ep map.
|
||||
account2ep: ClashMap<AccountIdInt, HashSet<EndpointIdInt>>,
|
||||
project2ep: Arc<RefCountMultiSet<ProjectIdInt, EndpointIdInt>>,
|
||||
account2ep: Arc<RefCountMultiSet<AccountIdInt, EndpointIdInt>>,
|
||||
|
||||
config: ProjectInfoCacheOptions,
|
||||
}
|
||||
|
||||
type RefCount = Mutex<usize>;
|
||||
|
||||
// This is rather hacky.
|
||||
// We use an ordered map of (K, V) -> RefCount.
|
||||
// We use range queries over `(K, _)..(K+1, _)` to do the invalidation.
|
||||
// We use the RefCount to know when to remove entries.
|
||||
type RefCountMultiSet<K, V> = SkipMap<KeyValue<K, V>, RefCount>;
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
||||
struct KeyValue<K, V>(K, V);
|
||||
struct Key<'a, K>(&'a K, bool);
|
||||
|
||||
impl<'a, K> Key<'a, K> {
|
||||
fn prefix(key: &'a K) -> std::ops::Range<Self> {
|
||||
Self(key, false)..Self(key, true)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K: Ord, V> Equivalent<Key<'a, K>> for KeyValue<K, V> {
|
||||
fn equivalent(&self, key: &Key<'a, K>) -> bool {
|
||||
self.0 == *key.0 && !key.1
|
||||
}
|
||||
}
|
||||
impl<'a, K: Ord, V> Comparable<Key<'a, K>> for KeyValue<K, V> {
|
||||
fn compare(&self, key: &Key<'a, K>) -> std::cmp::Ordering {
|
||||
self.0.cmp(key.0).then(false.cmp(&key.1))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Entry<T> {
|
||||
project_id: Option<ProjectIdInt>,
|
||||
account_id: Option<AccountIdInt>,
|
||||
value: T,
|
||||
}
|
||||
|
||||
impl<T> Entry<T> {
|
||||
fn dec_ref_counts(
|
||||
self,
|
||||
project2ep: &RefCountMultiSet<ProjectIdInt, EndpointIdInt>,
|
||||
account2ep: &RefCountMultiSet<AccountIdInt, EndpointIdInt>,
|
||||
endpoint_id: EndpointIdInt,
|
||||
) {
|
||||
if let Some(project_id) = self.project_id {
|
||||
dec_ref_count(project2ep, project_id, endpoint_id);
|
||||
}
|
||||
if let Some(account_id) = self.account_id {
|
||||
dec_ref_count(account2ep, account_id, endpoint_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn dec_ref_count<Id: Ord + Send + 'static>(
|
||||
id2ep: &RefCountMultiSet<Id, EndpointIdInt>,
|
||||
id: Id,
|
||||
endpoint_id: EndpointIdInt,
|
||||
) {
|
||||
if let Some(entry) = id2ep.get(&KeyValue(id, endpoint_id)) {
|
||||
let mut count = entry.value().lock_propagate_poison();
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
// remove the entry while holding the lock
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProjectInfoCache {
|
||||
pub fn invalidate_endpoint_access(&self, endpoint_id: EndpointIdInt) {
|
||||
info!("invalidating endpoint access for `{endpoint_id}`");
|
||||
@@ -38,25 +110,17 @@ impl ProjectInfoCache {
|
||||
|
||||
pub fn invalidate_endpoint_access_for_project(&self, project_id: ProjectIdInt) {
|
||||
info!("invalidating endpoint access for project `{project_id}`");
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.ep_controls.invalidate(&endpoint_id);
|
||||
|
||||
for entry in self.project2ep.range(Key::prefix(&project_id)) {
|
||||
self.ep_controls.invalidate(&entry.key().1);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn invalidate_endpoint_access_for_org(&self, account_id: AccountIdInt) {
|
||||
info!("invalidating endpoint access for org `{account_id}`");
|
||||
let endpoints = self
|
||||
.account2ep
|
||||
.get(&account_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.ep_controls.invalidate(&endpoint_id);
|
||||
|
||||
for entry in self.account2ep.range(Key::prefix(&account_id)) {
|
||||
self.ep_controls.invalidate(&entry.key().1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,36 +133,68 @@ impl ProjectInfoCache {
|
||||
"invalidating role secret for project_id `{}` and role_name `{}`",
|
||||
project_id, role_name,
|
||||
);
|
||||
let endpoints = self
|
||||
.project2ep
|
||||
.get(&project_id)
|
||||
.map(|kv| kv.value().clone())
|
||||
.unwrap_or_default();
|
||||
for endpoint_id in endpoints {
|
||||
self.role_controls.invalidate(&(endpoint_id, role_name));
|
||||
|
||||
for entry in self.project2ep.range(Key::prefix(&project_id)) {
|
||||
self.role_controls.invalidate(&(entry.key().1, role_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProjectInfoCache {
|
||||
pub(crate) fn new(config: ProjectInfoCacheOptions) -> Self {
|
||||
Metrics::get().cache.capacity.set(
|
||||
CacheKind::ProjectInfoRoles,
|
||||
(config.size * config.max_roles) as i64,
|
||||
);
|
||||
Metrics::get()
|
||||
.cache
|
||||
.capacity
|
||||
.set(CacheKind::ProjectInfoEndpoints, config.size as i64);
|
||||
|
||||
let project2ep = Arc::new(RefCountMultiSet::<ProjectIdInt, EndpointIdInt>::new());
|
||||
let account2ep = Arc::new(RefCountMultiSet::<AccountIdInt, EndpointIdInt>::new());
|
||||
let project2ep1 = Arc::clone(&project2ep);
|
||||
let project2ep2 = Arc::clone(&project2ep);
|
||||
let account2ep1 = Arc::clone(&account2ep);
|
||||
let account2ep2 = Arc::clone(&account2ep);
|
||||
|
||||
// we cache errors for 30 seconds, unless retry_at is set.
|
||||
let expiry = CplaneExpiry::default();
|
||||
Self {
|
||||
role_controls: Cache::builder()
|
||||
.name("role_access_controls")
|
||||
.eviction_listener(
|
||||
move |k, v: ControlPlaneResult<Entry<RoleAccessControl>>, cause| {
|
||||
eviction_listener(CacheKind::ProjectInfoRoles, cause);
|
||||
|
||||
let (endpoint_id, _): (EndpointIdInt, RoleNameInt) = *k;
|
||||
if let Ok(v) = v {
|
||||
v.dec_ref_counts(&project2ep1, &account2ep1, endpoint_id);
|
||||
}
|
||||
},
|
||||
)
|
||||
.max_capacity(config.size * config.max_roles)
|
||||
.time_to_live(config.ttl)
|
||||
.expire_after(expiry)
|
||||
.build(),
|
||||
ep_controls: Cache::builder()
|
||||
.name("endpoint_access_controls")
|
||||
.eviction_listener(
|
||||
move |k, v: ControlPlaneResult<Entry<EndpointAccessControl>>, cause| {
|
||||
eviction_listener(CacheKind::ProjectInfoEndpoints, cause);
|
||||
|
||||
let endpoint_id: EndpointIdInt = *k;
|
||||
if let Ok(v) = v {
|
||||
v.dec_ref_counts(&project2ep2, &account2ep2, endpoint_id);
|
||||
}
|
||||
},
|
||||
)
|
||||
.max_capacity(config.size)
|
||||
.time_to_live(config.ttl)
|
||||
.expire_after(expiry)
|
||||
.build(),
|
||||
project2ep: ClashMap::new(),
|
||||
account2ep: ClashMap::new(),
|
||||
project2ep,
|
||||
account2ep,
|
||||
config,
|
||||
}
|
||||
}
|
||||
@@ -111,7 +207,12 @@ impl ProjectInfoCache {
|
||||
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
|
||||
let role_name = RoleNameInt::get(role_name)?;
|
||||
|
||||
self.role_controls.get(&(endpoint_id, role_name))
|
||||
count_cache_outcome(
|
||||
CacheKind::ProjectInfoRoles,
|
||||
self.role_controls
|
||||
.get(&(endpoint_id, role_name))
|
||||
.map(|e| e.map(|e| e.value)),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_endpoint_access(
|
||||
@@ -120,7 +221,12 @@ impl ProjectInfoCache {
|
||||
) -> Option<ControlPlaneResult<EndpointAccessControl>> {
|
||||
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
|
||||
|
||||
self.ep_controls.get(&endpoint_id)
|
||||
count_cache_outcome(
|
||||
CacheKind::ProjectInfoEndpoints,
|
||||
self.ep_controls
|
||||
.get(&endpoint_id)
|
||||
.map(|e| e.map(|e| e.value)),
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn insert_endpoint_access(
|
||||
@@ -132,11 +238,12 @@ impl ProjectInfoCache {
|
||||
controls: EndpointAccessControl,
|
||||
role_controls: RoleAccessControl,
|
||||
) {
|
||||
// 2 corresponds to how many cache inserts we do.
|
||||
if let Some(account_id) = account_id {
|
||||
self.insert_account2endpoint(account_id, endpoint_id);
|
||||
self.inc_account2ep_ref(account_id, endpoint_id, 2);
|
||||
}
|
||||
if let Some(project_id) = project_id {
|
||||
self.insert_project2endpoint(project_id, endpoint_id);
|
||||
self.inc_project2ep_ref(project_id, endpoint_id, 2);
|
||||
}
|
||||
|
||||
debug!(
|
||||
@@ -144,9 +251,25 @@ impl ProjectInfoCache {
|
||||
"created a cache entry for endpoint access"
|
||||
);
|
||||
|
||||
self.ep_controls.insert(endpoint_id, Ok(controls));
|
||||
self.role_controls
|
||||
.insert((endpoint_id, role_name), Ok(role_controls));
|
||||
count_cache_insert(CacheKind::ProjectInfoEndpoints);
|
||||
count_cache_insert(CacheKind::ProjectInfoRoles);
|
||||
|
||||
self.ep_controls.insert(
|
||||
endpoint_id,
|
||||
Ok(Entry {
|
||||
account_id,
|
||||
project_id,
|
||||
value: controls,
|
||||
}),
|
||||
);
|
||||
self.role_controls.insert(
|
||||
(endpoint_id, role_name),
|
||||
Ok(Entry {
|
||||
account_id,
|
||||
project_id,
|
||||
value: role_controls,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn insert_endpoint_access_err(
|
||||
@@ -172,30 +295,30 @@ impl ProjectInfoCache {
|
||||
// leave the entry alone if it's already Ok
|
||||
Some(entry) if entry.value().is_ok() => moka::ops::compute::Op::Nop,
|
||||
// replace the entry
|
||||
_ => moka::ops::compute::Op::Put(Err(msg.clone())),
|
||||
_ => {
|
||||
count_cache_insert(CacheKind::ProjectInfoEndpoints);
|
||||
moka::ops::compute::Op::Put(Err(msg.clone()))
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
count_cache_insert(CacheKind::ProjectInfoRoles);
|
||||
self.role_controls
|
||||
.insert((endpoint_id, role_name), Err(msg));
|
||||
}
|
||||
|
||||
fn insert_project2endpoint(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.project2ep.get_mut(&project_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
} else {
|
||||
self.project2ep
|
||||
.insert(project_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
fn inc_project2ep_ref(&self, project_id: ProjectIdInt, endpoint_id: EndpointIdInt, x: usize) {
|
||||
let entry = self
|
||||
.project2ep
|
||||
.get_or_insert(KeyValue(project_id, endpoint_id), Mutex::new(0));
|
||||
*entry.value().lock_propagate_poison() += x;
|
||||
}
|
||||
|
||||
fn insert_account2endpoint(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt) {
|
||||
if let Some(mut endpoints) = self.account2ep.get_mut(&account_id) {
|
||||
endpoints.insert(endpoint_id);
|
||||
} else {
|
||||
self.account2ep
|
||||
.insert(account_id, HashSet::from([endpoint_id]));
|
||||
}
|
||||
fn inc_account2ep_ref(&self, account_id: AccountIdInt, endpoint_id: EndpointIdInt, x: usize) {
|
||||
let entry = self
|
||||
.account2ep
|
||||
.get_or_insert(KeyValue(account_id, endpoint_id), Mutex::new(0));
|
||||
*entry.value().lock_propagate_poison() += x;
|
||||
}
|
||||
|
||||
pub fn maybe_invalidate_role_secret(&self, _endpoint_id: &EndpointId, _role_name: &RoleName) {
|
||||
@@ -260,6 +383,16 @@ mod tests {
|
||||
},
|
||||
);
|
||||
|
||||
cache.ep_controls.run_pending_tasks();
|
||||
cache.role_controls.run_pending_tasks();
|
||||
|
||||
// check the project mappings are there
|
||||
assert_eq!(cache.project2ep.len(), 1);
|
||||
|
||||
// check the ref counts
|
||||
let entry = cache.project2ep.front().unwrap();
|
||||
assert_eq!(*entry.value().lock_propagate_poison(), 2);
|
||||
|
||||
cache.insert_endpoint_access(
|
||||
account_id,
|
||||
project_id,
|
||||
@@ -276,6 +409,17 @@ mod tests {
|
||||
},
|
||||
);
|
||||
|
||||
cache.ep_controls.run_pending_tasks();
|
||||
cache.role_controls.run_pending_tasks();
|
||||
|
||||
// check the project mappings are still there
|
||||
assert_eq!(cache.project2ep.len(), 1);
|
||||
|
||||
// check the ref counts
|
||||
let entry = cache.project2ep.front().unwrap();
|
||||
assert_eq!(*entry.value().lock_propagate_poison(), 3);
|
||||
|
||||
// check both entries exist
|
||||
let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap();
|
||||
assert_eq!(cached.unwrap().secret, secret1);
|
||||
|
||||
@@ -303,13 +447,26 @@ mod tests {
|
||||
},
|
||||
);
|
||||
|
||||
cache.ep_controls.run_pending_tasks();
|
||||
cache.role_controls.run_pending_tasks();
|
||||
|
||||
assert_eq!(cache.role_controls.entry_count(), 2);
|
||||
|
||||
// check the project mappings are still there
|
||||
assert_eq!(cache.project2ep.len(), 1);
|
||||
|
||||
// check the ref counts are unchanged.
|
||||
let entry = cache.project2ep.front().unwrap();
|
||||
assert_eq!(*entry.value().lock_propagate_poison(), 3);
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
cache.ep_controls.run_pending_tasks();
|
||||
cache.role_controls.run_pending_tasks();
|
||||
assert_eq!(cache.role_controls.entry_count(), 0);
|
||||
|
||||
// check the project/account mappings are no longer there
|
||||
assert!(cache.project2ep.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -18,7 +18,7 @@ pub struct StringInterner<Id> {
|
||||
_id: PhantomData<Id>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash)]
|
||||
#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash, PartialOrd, Ord)]
|
||||
pub struct InternedString<Id> {
|
||||
inner: Spur,
|
||||
_id: PhantomData<Id>,
|
||||
@@ -146,7 +146,7 @@ impl From<&RoleName> for RoleNameInt {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct EndpointIdTag;
|
||||
impl InternId for EndpointIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
@@ -186,7 +186,7 @@ impl From<BranchId> for BranchIdInt {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct ProjectIdTag;
|
||||
impl InternId for ProjectIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
@@ -206,7 +206,7 @@ impl From<ProjectId> for ProjectIdInt {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct AccountIdTag;
|
||||
impl InternId for AccountIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
|
||||
@@ -8,8 +8,8 @@ use measured::label::{
|
||||
use measured::metric::histogram::Thresholds;
|
||||
use measured::metric::name::MetricName;
|
||||
use measured::{
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
|
||||
MetricGroup,
|
||||
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
|
||||
LabelGroup, MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec, InfoMetric};
|
||||
use tokio::time::{self, Instant};
|
||||
@@ -29,6 +29,9 @@ pub struct Metrics {
|
||||
|
||||
#[metric(namespace = "service")]
|
||||
pub service: ServiceMetrics,
|
||||
|
||||
#[metric(namespace = "cache")]
|
||||
pub cache: CacheMetrics,
|
||||
}
|
||||
|
||||
static SELF: OnceLock<Metrics> = OnceLock::new();
|
||||
@@ -219,13 +222,6 @@ pub enum Bool {
|
||||
False,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
#[label(singleton = "outcome")]
|
||||
pub enum CacheOutcome {
|
||||
Hit,
|
||||
Miss,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = ConsoleRequestSet)]
|
||||
pub struct ConsoleRequest<'a> {
|
||||
@@ -704,3 +700,59 @@ pub enum ServiceState {
|
||||
Running,
|
||||
Terminating,
|
||||
}
|
||||
|
||||
#[derive(MetricGroup)]
|
||||
#[metric(new())]
|
||||
pub struct CacheMetrics {
|
||||
/// The capacity of the cache
|
||||
pub capacity: GaugeVec<StaticLabelSet<CacheKind>>,
|
||||
/// The total number of entries inserted into the cache
|
||||
pub inserted_total: CounterVec<StaticLabelSet<CacheKind>>,
|
||||
/// The total number of entries removed from the cache
|
||||
pub evicted_total: CounterVec<CacheEvictionSet>,
|
||||
/// The total number of cache requests
|
||||
pub request_total: CounterVec<CacheOutcomeSet>,
|
||||
}
|
||||
|
||||
impl Default for CacheMetrics {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
#[label(singleton = "cache")]
|
||||
pub enum CacheKind {
|
||||
NodeInfo,
|
||||
ProjectInfoEndpoints,
|
||||
ProjectInfoRoles,
|
||||
Schema,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Clone, Copy, Debug)]
|
||||
pub enum CacheRemovalCause {
|
||||
Expired,
|
||||
Explicit,
|
||||
Replaced,
|
||||
Size,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = CacheEvictionSet)]
|
||||
pub struct CacheEviction {
|
||||
pub cache: CacheKind,
|
||||
pub cause: CacheRemovalCause,
|
||||
}
|
||||
|
||||
#[derive(FixedCardinalityLabel, Copy, Clone)]
|
||||
pub enum CacheOutcome {
|
||||
Hit,
|
||||
Miss,
|
||||
}
|
||||
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = CacheOutcomeSet)]
|
||||
pub struct CacheOutcomeGroup {
|
||||
pub cache: CacheKind,
|
||||
pub outcome: CacheOutcome,
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -54,11 +55,12 @@ use super::http_util::{
|
||||
};
|
||||
use super::json::JsonConversionError;
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::cache::common::{count_cache_insert, count_cache_outcome, eviction_listener};
|
||||
use crate::config::ProxyConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::read_body_with_limit;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::metrics::{CacheKind, Metrics};
|
||||
use crate::serverless::sql_over_http::HEADER_VALUE_TRUE;
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::util::deserialize_json_string;
|
||||
@@ -138,15 +140,31 @@ pub struct ApiConfig {
|
||||
}
|
||||
|
||||
// The DbSchemaCache is a cache of the ApiConfig and DbSchemaOwned for each endpoint
|
||||
pub(crate) struct DbSchemaCache(pub Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
|
||||
pub(crate) struct DbSchemaCache(Cache<EndpointCacheKey, Arc<(ApiConfig, DbSchemaOwned)>>);
|
||||
impl DbSchemaCache {
|
||||
pub fn new(config: crate::config::CacheOptions) -> Self {
|
||||
let builder = Cache::builder().name("db_schema_cache");
|
||||
let builder = config.moka(builder);
|
||||
|
||||
let metrics = &Metrics::get().cache;
|
||||
if let Some(size) = config.size {
|
||||
metrics.capacity.set(CacheKind::Schema, size as i64);
|
||||
}
|
||||
|
||||
let builder =
|
||||
builder.eviction_listener(|_k, _v, cause| eviction_listener(CacheKind::Schema, cause));
|
||||
|
||||
Self(builder.build())
|
||||
}
|
||||
|
||||
pub async fn maintain(&self) -> Result<Infallible, anyhow::Error> {
|
||||
let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
self.0.run_pending_tasks();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_cached_or_remote(
|
||||
&self,
|
||||
endpoint_id: &EndpointCacheKey,
|
||||
@@ -156,7 +174,8 @@ impl DbSchemaCache {
|
||||
ctx: &RequestContext,
|
||||
config: &'static ProxyConfig,
|
||||
) -> Result<Arc<(ApiConfig, DbSchemaOwned)>, RestError> {
|
||||
match self.0.get(endpoint_id) {
|
||||
let cache_result = count_cache_outcome(CacheKind::Schema, self.0.get(endpoint_id));
|
||||
match cache_result {
|
||||
Some(v) => Ok(v),
|
||||
None => {
|
||||
info!("db_schema cache miss for endpoint: {:?}", endpoint_id);
|
||||
@@ -180,6 +199,7 @@ impl DbSchemaCache {
|
||||
db_extra_search_path: None,
|
||||
};
|
||||
let value = Arc::new((api_config, schema_owned));
|
||||
count_cache_insert(CacheKind::Schema);
|
||||
self.0.insert(endpoint_id.clone(), value);
|
||||
return Err(e);
|
||||
}
|
||||
@@ -188,6 +208,7 @@ impl DbSchemaCache {
|
||||
}
|
||||
};
|
||||
let value = Arc::new((api_config, schema_owned));
|
||||
count_cache_insert(CacheKind::Schema);
|
||||
self.0.insert(endpoint_id.clone(), value.clone());
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user