From 5e150c9376e810bc49c51bd27b4468b05aadf065 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 22 Jan 2024 14:28:50 +0000 Subject: [PATCH] switch to lru --- proxy/src/bin/proxy.rs | 1 - proxy/src/cache/project_info.rs | 188 +++++++++-------------------- proxy/src/console/provider/neon.rs | 1 + 3 files changed, 60 insertions(+), 130 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index ba113a89eb..b9de64fea7 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -257,7 +257,6 @@ async fn main() -> anyhow::Result<()> { maintenance_tasks .spawn(notifications::task_main(url.to_owned(), cache.clone())); } - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); } #[cfg(feature = "testing")] proxy::console::provider::ConsoleBackend::Postgres(_) => {} diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 57d9e5289d..8f2171c777 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -1,15 +1,15 @@ use std::{ collections::HashSet, - convert::Infallible, sync::{atomic::AtomicU64, Arc}, time::Duration, }; use dashmap::DashMap; -use rand::{thread_rng, Rng}; +use hashlink::LruCache; +use parking_lot::Mutex; use smol_str::SmolStr; use tokio::time::Instant; -use tracing::{debug, info}; +use tracing::info; use crate::{config::ProjectInfoCacheOptions, console::AuthSecret}; @@ -42,56 +42,10 @@ impl From for Entry { } } -#[derive(Default)] -struct EndpointInfo { - secret: std::collections::HashMap>, - allowed_ips: Option>>>, -} - -impl EndpointInfo { - fn check_ignore_cache(ignore_cache_since: Option, created_at: Instant) -> bool { - match ignore_cache_since { - None => false, - Some(t) => t < created_at, - } - } - pub fn get_role_secret( - &self, - role_name: &SmolStr, - valid_since: Instant, - ignore_cache_since: Option, - ) -> Option<(AuthSecret, bool)> { - if let Some(secret) = self.secret.get(role_name) { - if valid_since < secret.created_at { - return Some(( - secret.value.clone(), - Self::check_ignore_cache(ignore_cache_since, secret.created_at), - )); - } - } - None - } - - pub fn get_allowed_ips( - &self, - valid_since: Instant, - ignore_cache_since: Option, - ) -> Option<(Arc>, bool)> { - if let Some(allowed_ips) = &self.allowed_ips { - if valid_since < allowed_ips.created_at { - return Some(( - allowed_ips.value.clone(), - Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at), - )); - } - } - None - } - pub fn invalidate_allowed_ips(&mut self) { - self.allowed_ips = None; - } - pub fn invalidate_role_secret(&mut self, role_name: &SmolStr) { - self.secret.remove(role_name); +fn check_ignore_cache(ignore_cache_since: Option, created_at: Instant) -> bool { + match ignore_cache_since { + None => false, + Some(t) => t < created_at, } } @@ -103,7 +57,8 @@ impl EndpointInfo { /// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available? /// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache. pub struct ProjectInfoCacheImpl { - cache: DashMap, + ip_cache: Mutex>>>>, + role_cache: Mutex>>, project2ep: DashMap>, config: ProjectInfoCacheOptions, @@ -121,9 +76,7 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { .map(|kv| kv.value().clone()) .unwrap_or_default(); for endpoint_id in endpoints { - if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) { - endpoint_info.invalidate_allowed_ips(); - } + self.ip_cache.lock().remove(&endpoint_id); } } fn invalidate_role_secret_for_project(&self, project_id: &SmolStr, role_name: &SmolStr) { @@ -137,9 +90,9 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { .map(|kv| kv.value().clone()) .unwrap_or_default(); for endpoint_id in endpoints { - if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) { - endpoint_info.invalidate_role_secret(role_name); - } + self.role_cache + .lock() + .remove(&(endpoint_id, role_name.clone())); } } fn enable_ttl(&self) { @@ -157,7 +110,8 @@ impl ProjectInfoCache for ProjectInfoCacheImpl { impl ProjectInfoCacheImpl { pub fn new(config: ProjectInfoCacheOptions) -> Self { Self { - cache: DashMap::new(), + ip_cache: Mutex::new(LruCache::new(config.size)), + role_cache: Mutex::new(LruCache::new(config.size)), project2ep: DashMap::new(), config, ttl_disabled_since_us: AtomicU64::new(u64::MAX), @@ -171,9 +125,17 @@ impl ProjectInfoCacheImpl { role_name: &SmolStr, ) -> Option> { let (valid_since, ignore_cache_since) = self.get_cache_times(); - let endpoint_info = self.cache.get(endpoint_id)?; - let (value, ignore_cache) = - endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?; + let (value, ignore_cache) = { + let mut cache = self.role_cache.lock(); + let secret = cache.get(&(endpoint_id.clone(), role_name.clone()))?; + if secret.created_at < valid_since { + return None; + } + ( + secret.value.clone(), + check_ignore_cache(ignore_cache_since, secret.created_at), + ) + }; if !ignore_cache { let cached = Cached { token: Some(( @@ -186,14 +148,23 @@ impl ProjectInfoCacheImpl { } Some(Cached::new_uncached(value)) } + pub fn get_allowed_ips( &self, endpoint_id: &SmolStr, ) -> Option>>> { let (valid_since, ignore_cache_since) = self.get_cache_times(); - let endpoint_info = self.cache.get(endpoint_id)?; - let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since); - let (value, ignore_cache) = value?; + let (value, ignore_cache) = { + let mut cache = self.ip_cache.lock(); + let allowed_ips = cache.get(endpoint_id)?; + if allowed_ips.created_at < valid_since { + return None; + } + ( + allowed_ips.value.clone(), + check_ignore_cache(ignore_cache_since, allowed_ips.created_at), + ) + }; if !ignore_cache { let cached = Cached { token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id.clone()))), @@ -203,6 +174,7 @@ impl ProjectInfoCacheImpl { } Some(Cached::new_uncached(value)) } + pub fn insert_role_secret( &self, project_id: &SmolStr, @@ -210,40 +182,31 @@ impl ProjectInfoCacheImpl { role_name: &SmolStr, secret: AuthSecret, ) { - if self.cache.len() >= self.config.size { - // If there are too many entries, wait until the next gc cycle. - return; - } - self.inser_project2endpoint(project_id, endpoint_id); - let mut entry = self.cache.entry(endpoint_id.clone()).or_default(); - if entry.secret.len() < self.config.max_roles { - entry.secret.insert(role_name.clone(), secret.into()); - } + self.insert_project2endpoint(project_id, endpoint_id); + self.role_cache + .lock() + .insert((endpoint_id.clone(), role_name.clone()), secret.into()); } + pub fn insert_allowed_ips( &self, project_id: &SmolStr, endpoint_id: &SmolStr, allowed_ips: Arc>, ) { - if self.cache.len() >= self.config.size { - // If there are too many entries, wait until the next gc cycle. - return; - } - self.inser_project2endpoint(project_id, endpoint_id); - self.cache - .entry(endpoint_id.clone()) + self.insert_project2endpoint(project_id, endpoint_id); + self.ip_cache + .lock() + .insert(endpoint_id.clone(), allowed_ips.into()); + } + + fn insert_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) { + self.project2ep + .entry(project_id.clone()) .or_default() - .allowed_ips = Some(allowed_ips.into()); - } - fn inser_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) { - if let Some(mut endpoints) = self.project2ep.get_mut(project_id) { - endpoints.insert(endpoint_id.clone()); - } else { - self.project2ep - .insert(project_id.clone(), HashSet::from([endpoint_id.clone()])); - } + .insert(endpoint_id.clone()); } + fn get_cache_times(&self) -> (Instant, Option) { let mut valid_since = Instant::now() - self.config.ttl; // Only ignore cache if ttl is disabled. @@ -260,37 +223,6 @@ impl ProjectInfoCacheImpl { }; (valid_since, ignore_cache_since) } - - pub async fn gc_worker(&self) -> anyhow::Result { - let mut interval = - tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32); - loop { - interval.tick().await; - if self.cache.len() < self.config.size { - // If there are not too many entries, wait until the next gc cycle. - continue; - } - self.gc(); - } - } - - fn gc(&self) { - let shard = thread_rng().gen_range(0..self.project2ep.shards().len()); - debug!(shard, "project_info_cache: performing epoch reclamation"); - - // acquire a random shard lock - let mut removed = 0; - let shard = self.project2ep.shards()[shard].write(); - for (_, endpoints) in shard.iter() { - for endpoint in endpoints.get().iter() { - self.cache.remove(endpoint); - removed += 1; - } - } - // We can drop this shard only after making sure that all endpoints are removed. - drop(shard); - info!("project_info_cache: removed {removed} endpoints"); - } } /// Lookup info for project info cache. @@ -331,14 +263,12 @@ impl Cache for ProjectInfoCacheImpl { fn invalidate(&self, key: &Self::LookupInfo) { match &key.lookup_type { LookupType::RoleSecret(role_name) => { - if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) { - endpoint_info.invalidate_role_secret(role_name); - } + self.role_cache + .lock() + .remove(&(key.endpoint_id.clone(), role_name.clone())); } LookupType::AllowedIps => { - if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) { - endpoint_info.invalidate_allowed_ips(); - } + self.ip_cache.lock().remove(&key.endpoint_id); } } } diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index e8e36815c7..5cdc01810b 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -195,6 +195,7 @@ impl super::Api for Api { Ok(auth_info.secret.map(Cached::new_uncached)) } + #[tracing::instrument(skip_all)] async fn get_allowed_ips( &self, ctx: &mut RequestMonitoring,