switch to lru

This commit is contained in:
Conrad Ludgate
2024-01-22 14:28:50 +00:00
parent 3290fb09bf
commit 5e150c9376
3 changed files with 60 additions and 130 deletions

View File

@@ -257,7 +257,6 @@ async fn main() -> anyhow::Result<()> {
maintenance_tasks
.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
#[cfg(feature = "testing")]
proxy::console::provider::ConsoleBackend::Postgres(_) => {}

View File

@@ -1,15 +1,15 @@
use std::{
collections::HashSet,
convert::Infallible,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use dashmap::DashMap;
use rand::{thread_rng, Rng};
use hashlink::LruCache;
use parking_lot::Mutex;
use smol_str::SmolStr;
use tokio::time::Instant;
use tracing::{debug, info};
use tracing::info;
use crate::{config::ProjectInfoCacheOptions, console::AuthSecret};
@@ -42,56 +42,10 @@ impl<T> From<T> for Entry<T> {
}
}
#[derive(Default)]
struct EndpointInfo {
secret: std::collections::HashMap<SmolStr, Entry<AuthSecret>>,
allowed_ips: Option<Entry<Arc<Vec<SmolStr>>>>,
}
impl EndpointInfo {
fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
match ignore_cache_since {
None => false,
Some(t) => t < created_at,
}
}
pub fn get_role_secret(
&self,
role_name: &SmolStr,
valid_since: Instant,
ignore_cache_since: Option<Instant>,
) -> Option<(AuthSecret, bool)> {
if let Some(secret) = self.secret.get(role_name) {
if valid_since < secret.created_at {
return Some((
secret.value.clone(),
Self::check_ignore_cache(ignore_cache_since, secret.created_at),
));
}
}
None
}
pub fn get_allowed_ips(
&self,
valid_since: Instant,
ignore_cache_since: Option<Instant>,
) -> Option<(Arc<Vec<SmolStr>>, bool)> {
if let Some(allowed_ips) = &self.allowed_ips {
if valid_since < allowed_ips.created_at {
return Some((
allowed_ips.value.clone(),
Self::check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
));
}
}
None
}
pub fn invalidate_allowed_ips(&mut self) {
self.allowed_ips = None;
}
pub fn invalidate_role_secret(&mut self, role_name: &SmolStr) {
self.secret.remove(role_name);
fn check_ignore_cache(ignore_cache_since: Option<Instant>, created_at: Instant) -> bool {
match ignore_cache_since {
None => false,
Some(t) => t < created_at,
}
}
@@ -103,7 +57,8 @@ impl EndpointInfo {
/// One may ask, why the data is stored per project, when on the user request there is only data about the endpoint available?
/// On the cplane side updates are done per project (or per branch), so it's easier to invalidate the whole project cache.
pub struct ProjectInfoCacheImpl {
cache: DashMap<SmolStr, EndpointInfo>,
ip_cache: Mutex<LruCache<SmolStr, Entry<Arc<Vec<SmolStr>>>>>,
role_cache: Mutex<LruCache<(SmolStr, SmolStr), Entry<AuthSecret>>>,
project2ep: DashMap<SmolStr, HashSet<SmolStr>>,
config: ProjectInfoCacheOptions,
@@ -121,9 +76,7 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_allowed_ips();
}
self.ip_cache.lock().remove(&endpoint_id);
}
}
fn invalidate_role_secret_for_project(&self, project_id: &SmolStr, role_name: &SmolStr) {
@@ -137,9 +90,9 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
.map(|kv| kv.value().clone())
.unwrap_or_default();
for endpoint_id in endpoints {
if let Some(mut endpoint_info) = self.cache.get_mut(&endpoint_id) {
endpoint_info.invalidate_role_secret(role_name);
}
self.role_cache
.lock()
.remove(&(endpoint_id, role_name.clone()));
}
}
fn enable_ttl(&self) {
@@ -157,7 +110,8 @@ impl ProjectInfoCache for ProjectInfoCacheImpl {
impl ProjectInfoCacheImpl {
pub fn new(config: ProjectInfoCacheOptions) -> Self {
Self {
cache: DashMap::new(),
ip_cache: Mutex::new(LruCache::new(config.size)),
role_cache: Mutex::new(LruCache::new(config.size)),
project2ep: DashMap::new(),
config,
ttl_disabled_since_us: AtomicU64::new(u64::MAX),
@@ -171,9 +125,17 @@ impl ProjectInfoCacheImpl {
role_name: &SmolStr,
) -> Option<Cached<&Self, AuthSecret>> {
let (valid_since, ignore_cache_since) = self.get_cache_times();
let endpoint_info = self.cache.get(endpoint_id)?;
let (value, ignore_cache) =
endpoint_info.get_role_secret(role_name, valid_since, ignore_cache_since)?;
let (value, ignore_cache) = {
let mut cache = self.role_cache.lock();
let secret = cache.get(&(endpoint_id.clone(), role_name.clone()))?;
if secret.created_at < valid_since {
return None;
}
(
secret.value.clone(),
check_ignore_cache(ignore_cache_since, secret.created_at),
)
};
if !ignore_cache {
let cached = Cached {
token: Some((
@@ -186,14 +148,23 @@ impl ProjectInfoCacheImpl {
}
Some(Cached::new_uncached(value))
}
pub fn get_allowed_ips(
&self,
endpoint_id: &SmolStr,
) -> Option<Cached<&Self, Arc<Vec<SmolStr>>>> {
let (valid_since, ignore_cache_since) = self.get_cache_times();
let endpoint_info = self.cache.get(endpoint_id)?;
let value = endpoint_info.get_allowed_ips(valid_since, ignore_cache_since);
let (value, ignore_cache) = value?;
let (value, ignore_cache) = {
let mut cache = self.ip_cache.lock();
let allowed_ips = cache.get(endpoint_id)?;
if allowed_ips.created_at < valid_since {
return None;
}
(
allowed_ips.value.clone(),
check_ignore_cache(ignore_cache_since, allowed_ips.created_at),
)
};
if !ignore_cache {
let cached = Cached {
token: Some((self, CachedLookupInfo::new_allowed_ips(endpoint_id.clone()))),
@@ -203,6 +174,7 @@ impl ProjectInfoCacheImpl {
}
Some(Cached::new_uncached(value))
}
pub fn insert_role_secret(
&self,
project_id: &SmolStr,
@@ -210,40 +182,31 @@ impl ProjectInfoCacheImpl {
role_name: &SmolStr,
secret: AuthSecret,
) {
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
self.inser_project2endpoint(project_id, endpoint_id);
let mut entry = self.cache.entry(endpoint_id.clone()).or_default();
if entry.secret.len() < self.config.max_roles {
entry.secret.insert(role_name.clone(), secret.into());
}
self.insert_project2endpoint(project_id, endpoint_id);
self.role_cache
.lock()
.insert((endpoint_id.clone(), role_name.clone()), secret.into());
}
pub fn insert_allowed_ips(
&self,
project_id: &SmolStr,
endpoint_id: &SmolStr,
allowed_ips: Arc<Vec<SmolStr>>,
) {
if self.cache.len() >= self.config.size {
// If there are too many entries, wait until the next gc cycle.
return;
}
self.inser_project2endpoint(project_id, endpoint_id);
self.cache
.entry(endpoint_id.clone())
self.insert_project2endpoint(project_id, endpoint_id);
self.ip_cache
.lock()
.insert(endpoint_id.clone(), allowed_ips.into());
}
fn insert_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) {
self.project2ep
.entry(project_id.clone())
.or_default()
.allowed_ips = Some(allowed_ips.into());
}
fn inser_project2endpoint(&self, project_id: &SmolStr, endpoint_id: &SmolStr) {
if let Some(mut endpoints) = self.project2ep.get_mut(project_id) {
endpoints.insert(endpoint_id.clone());
} else {
self.project2ep
.insert(project_id.clone(), HashSet::from([endpoint_id.clone()]));
}
.insert(endpoint_id.clone());
}
fn get_cache_times(&self) -> (Instant, Option<Instant>) {
let mut valid_since = Instant::now() - self.config.ttl;
// Only ignore cache if ttl is disabled.
@@ -260,37 +223,6 @@ impl ProjectInfoCacheImpl {
};
(valid_since, ignore_cache_since)
}
pub async fn gc_worker(&self) -> anyhow::Result<Infallible> {
let mut interval =
tokio::time::interval(self.config.gc_interval / (self.cache.shards().len()) as u32);
loop {
interval.tick().await;
if self.cache.len() < self.config.size {
// If there are not too many entries, wait until the next gc cycle.
continue;
}
self.gc();
}
}
fn gc(&self) {
let shard = thread_rng().gen_range(0..self.project2ep.shards().len());
debug!(shard, "project_info_cache: performing epoch reclamation");
// acquire a random shard lock
let mut removed = 0;
let shard = self.project2ep.shards()[shard].write();
for (_, endpoints) in shard.iter() {
for endpoint in endpoints.get().iter() {
self.cache.remove(endpoint);
removed += 1;
}
}
// We can drop this shard only after making sure that all endpoints are removed.
drop(shard);
info!("project_info_cache: removed {removed} endpoints");
}
}
/// Lookup info for project info cache.
@@ -331,14 +263,12 @@ impl Cache for ProjectInfoCacheImpl {
fn invalidate(&self, key: &Self::LookupInfo<SmolStr>) {
match &key.lookup_type {
LookupType::RoleSecret(role_name) => {
if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
endpoint_info.invalidate_role_secret(role_name);
}
self.role_cache
.lock()
.remove(&(key.endpoint_id.clone(), role_name.clone()));
}
LookupType::AllowedIps => {
if let Some(mut endpoint_info) = self.cache.get_mut(&key.endpoint_id) {
endpoint_info.invalidate_allowed_ips();
}
self.ip_cache.lock().remove(&key.endpoint_id);
}
}
}

View File

@@ -195,6 +195,7 @@ impl super::Api for Api {
Ok(auth_info.secret.map(Cached::new_uncached))
}
#[tracing::instrument(skip_all)]
async fn get_allowed_ips(
&self,
ctx: &mut RequestMonitoring,