feat: add index cache eviction support (#7064)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-10-17 11:30:02 +08:00
committed by GitHub
parent 8a2371a05c
commit 82e4600d1b
6 changed files with 192 additions and 0 deletions

View File

@@ -170,6 +170,19 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::evict_puffin_cache()].
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.evict_puffin_cache(file_id).await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.evict_puffin_cache(file_id).await
}
CacheStrategy::Disabled => {}
}
}
/// Calls [CacheManager::get_selector_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_selector_result(
@@ -374,6 +387,35 @@ impl CacheManager {
}
}
/// Evicts every puffin-related cache entry for the given file.
pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
if let Some(cache) = &self.bloom_filter_index_cache {
cache.invalidate_file(file_id.file_id());
}
if let Some(cache) = &self.inverted_index_cache {
cache.invalidate_file(file_id.file_id());
}
if let Some(cache) = &self.index_result_cache {
cache.invalidate_file(file_id.file_id());
}
if let Some(cache) = &self.puffin_metadata_cache {
cache.remove(&file_id.to_string());
}
if let Some(write_cache) = &self.write_cache {
write_cache
.remove(IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin,
))
.await;
}
}
/// Gets result of for the selector.
pub fn get_selector_result(
&self,
@@ -760,10 +802,16 @@ type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
mod tests {
use std::sync::Arc;
use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
use datatypes::vectors::Int64Vector;
use puffin::file_metadata::FileMetadata;
use store_api::storage::ColumnId;
use super::*;
use crate::cache::index::bloom_filter_index::Tag;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::parquet_meta;
use crate::sst::parquet::row_selection::RowGroupSelection;
#[tokio::test]
async fn test_disable_cache() {
@@ -852,4 +900,106 @@ mod tests {
cache.put_selector_result(key, result);
assert!(cache.get_selector_result(&key).is_some());
}
#[tokio::test]
async fn test_evict_puffin_cache_clears_all_entries() {
use std::collections::{BTreeMap, HashMap};
let cache = CacheManager::builder()
.index_metadata_size(128)
.index_content_size(128)
.index_content_page_size(64)
.index_result_cache_size(128)
.puffin_metadata_size(128)
.build();
let cache = Arc::new(cache);
let region_id = RegionId::new(1, 1);
let region_file_id = RegionFileId::new(region_id, FileId::random());
let column_id: ColumnId = 1;
let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
let inverted_cache = cache.inverted_index_cache().unwrap().clone();
let result_cache = cache.index_result_cache().unwrap();
let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
Arc::new(InvertedIndexMetas::default()),
);
let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
let selection = Arc::new(RowGroupSelection::default());
result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
let file_id_str = region_file_id.to_string();
let metadata = Arc::new(FileMetadata {
blobs: Vec::new(),
properties: HashMap::new(),
});
puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
assert!(bloom_cache.get_metadata(bloom_key).is_some());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_some()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.is_some()
);
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
cache.evict_puffin_cache(region_file_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.is_none()
);
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
// Refill caches and evict via CacheStrategy to ensure delegation works.
bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
inverted_cache.put_metadata(
region_file_id.file_id(),
Arc::new(InvertedIndexMetas::default()),
);
result_cache.put(
predicate.clone(),
region_file_id.file_id(),
Arc::new(RowGroupSelection::default()),
);
puffin_metadata_cache.put_metadata(
file_id_str.clone(),
Arc::new(FileMetadata {
blobs: Vec::new(),
properties: HashMap::new(),
}),
);
let strategy = CacheStrategy::EnableAll(cache.clone());
strategy.evict_puffin_cache(region_file_id).await;
assert!(bloom_cache.get_metadata(bloom_key).is_none());
assert!(
inverted_cache
.get_metadata(region_file_id.file_id())
.is_none()
);
assert!(
result_cache
.get(&predicate, region_file_id.file_id())
.is_none()
);
assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
}
}

View File

@@ -112,6 +112,7 @@ where
.with_label_values(&[INDEX_METADATA_TYPE])
.sub(size.into());
})
.support_invalidation_closures()
.build();
let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
.name(&format!("index_content_{}", index_type))
@@ -122,6 +123,7 @@ where
.with_label_values(&[INDEX_CONTENT_TYPE])
.sub(size.into());
})
.support_invalidation_closures()
.build();
Self {
index_metadata,
@@ -219,6 +221,23 @@ where
.add((self.weight_of_content)(&(key, page_key), &value).into());
self.index.insert((key, page_key), value);
}
/// Invalidates all cache entries whose keys satisfy `predicate`.
pub fn invalidate_if<F>(&self, predicate: F)
where
F: Fn(&K) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);
let metadata_predicate = Arc::clone(&predicate);
self.index_metadata
.invalidate_entries_if(move |key, _| metadata_predicate(key))
.expect("cache should support invalidation closures");
self.index
.invalidate_entries_if(move |(key, _), _| predicate(key))
.expect("cache should support invalidation closures");
}
}
/// Prunes the size of the last page based on the indexes.

View File

@@ -50,6 +50,11 @@ impl BloomFilterIndexCache {
bloom_filter_index_content_weight,
)
}
/// Removes all cached entries for the given `file_id`.
pub fn invalidate_file(&self, file_id: FileId) {
self.invalidate_if(move |key| key.0 == file_id);
}
}
/// Calculates weight for bloom filter index metadata.

View File

@@ -44,6 +44,11 @@ impl InvertedIndexCache {
inverted_index_content_weight,
)
}
/// Removes all cached entries for the given `file_id`.
pub fn invalidate_file(&self, file_id: FileId) {
self.invalidate_if(move |key| *key == file_id);
}
}
/// Calculates weight for inverted index metadata.

View File

@@ -63,6 +63,7 @@ impl IndexResultCache {
.with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
.inc();
})
.support_invalidation_closures()
.build();
Self { cache }
}
@@ -97,6 +98,13 @@ impl IndexResultCache {
fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
k.0.mem_usage() as u32 + v.mem_usage() as u32
}
/// Removes cached results for the given file.
pub fn invalidate_file(&self, file_id: FileId) {
self.cache
.invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id)
.expect("cache should support invalidation closures");
}
}
/// Key for different types of index predicates.

View File

@@ -57,4 +57,9 @@ impl PuffinMetadataCache {
pub fn put_metadata(&self, file_id: String, metadata: Arc<FileMetadata>) {
self.cache.insert(file_id, metadata);
}
/// Removes the metadata of the given file from the cache, if present.
pub fn remove(&self, file_id: &str) {
self.cache.invalidate(file_id);
}
}