From 82e4600d1b4994e9e4c8b54cd31209c56325add9 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 17 Oct 2025 11:30:02 +0800 Subject: [PATCH] feat: add index cache eviction support (#7064) Signed-off-by: Zhenchi --- src/mito2/src/cache.rs | 150 ++++++++++++++++++ src/mito2/src/cache/index.rs | 19 +++ .../src/cache/index/bloom_filter_index.rs | 5 + src/mito2/src/cache/index/inverted_index.rs | 5 + src/mito2/src/cache/index/result_cache.rs | 8 + src/puffin/src/puffin_manager/cache.rs | 5 + 6 files changed, 192 insertions(+) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 9b313f1fa8..b371e39b78 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -170,6 +170,19 @@ impl CacheStrategy { } } + /// Calls [CacheManager::evict_puffin_cache()]. + pub async fn evict_puffin_cache(&self, file_id: RegionFileId) { + match self { + CacheStrategy::EnableAll(cache_manager) => { + cache_manager.evict_puffin_cache(file_id).await + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager.evict_puffin_cache(file_id).await + } + CacheStrategy::Disabled => {} + } + } + /// Calls [CacheManager::get_selector_result()]. /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn get_selector_result( @@ -374,6 +387,35 @@ impl CacheManager { } } + /// Evicts every puffin-related cache entry for the given file. + pub async fn evict_puffin_cache(&self, file_id: RegionFileId) { + if let Some(cache) = &self.bloom_filter_index_cache { + cache.invalidate_file(file_id.file_id()); + } + + if let Some(cache) = &self.inverted_index_cache { + cache.invalidate_file(file_id.file_id()); + } + + if let Some(cache) = &self.index_result_cache { + cache.invalidate_file(file_id.file_id()); + } + + if let Some(cache) = &self.puffin_metadata_cache { + cache.remove(&file_id.to_string()); + } + + if let Some(write_cache) = &self.write_cache { + write_cache + .remove(IndexKey::new( + file_id.region_id(), + file_id.file_id(), + FileType::Puffin, + )) + .await; + } + } + /// Gets result of for the selector. pub fn get_selector_result( &self, @@ -760,10 +802,16 @@ type SelectorResultCache = Cache>; mod tests { use std::sync::Arc; + use api::v1::index::{BloomFilterMeta, InvertedIndexMetas}; use datatypes::vectors::Int64Vector; + use puffin::file_metadata::FileMetadata; + use store_api::storage::ColumnId; use super::*; + use crate::cache::index::bloom_filter_index::Tag; + use crate::cache::index::result_cache::PredicateKey; use crate::cache::test_util::parquet_meta; + use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] async fn test_disable_cache() { @@ -852,4 +900,106 @@ mod tests { cache.put_selector_result(key, result); assert!(cache.get_selector_result(&key).is_some()); } + + #[tokio::test] + async fn test_evict_puffin_cache_clears_all_entries() { + use std::collections::{BTreeMap, HashMap}; + + let cache = CacheManager::builder() + .index_metadata_size(128) + .index_content_size(128) + .index_content_page_size(64) + .index_result_cache_size(128) + .puffin_metadata_size(128) + .build(); + let cache = Arc::new(cache); + + let region_id = RegionId::new(1, 1); + let region_file_id = RegionFileId::new(region_id, FileId::random()); + let column_id: ColumnId = 1; + + let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone(); + let inverted_cache = cache.inverted_index_cache().unwrap().clone(); + let result_cache = cache.index_result_cache().unwrap(); + let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone(); + + let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping); + bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default())); + inverted_cache.put_metadata( + region_file_id.file_id(), + Arc::new(InvertedIndexMetas::default()), + ); + let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new())); + let selection = Arc::new(RowGroupSelection::default()); + result_cache.put(predicate.clone(), region_file_id.file_id(), selection); + let file_id_str = region_file_id.to_string(); + let metadata = Arc::new(FileMetadata { + blobs: Vec::new(), + properties: HashMap::new(), + }); + puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata); + + assert!(bloom_cache.get_metadata(bloom_key).is_some()); + assert!( + inverted_cache + .get_metadata(region_file_id.file_id()) + .is_some() + ); + assert!( + result_cache + .get(&predicate, region_file_id.file_id()) + .is_some() + ); + assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some()); + + cache.evict_puffin_cache(region_file_id).await; + + assert!(bloom_cache.get_metadata(bloom_key).is_none()); + assert!( + inverted_cache + .get_metadata(region_file_id.file_id()) + .is_none() + ); + assert!( + result_cache + .get(&predicate, region_file_id.file_id()) + .is_none() + ); + assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none()); + + // Refill caches and evict via CacheStrategy to ensure delegation works. + bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default())); + inverted_cache.put_metadata( + region_file_id.file_id(), + Arc::new(InvertedIndexMetas::default()), + ); + result_cache.put( + predicate.clone(), + region_file_id.file_id(), + Arc::new(RowGroupSelection::default()), + ); + puffin_metadata_cache.put_metadata( + file_id_str.clone(), + Arc::new(FileMetadata { + blobs: Vec::new(), + properties: HashMap::new(), + }), + ); + + let strategy = CacheStrategy::EnableAll(cache.clone()); + strategy.evict_puffin_cache(region_file_id).await; + + assert!(bloom_cache.get_metadata(bloom_key).is_none()); + assert!( + inverted_cache + .get_metadata(region_file_id.file_id()) + .is_none() + ); + assert!( + result_cache + .get(&predicate, region_file_id.file_id()) + .is_none() + ); + assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none()); + } } diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 0a803d7c8b..cf24772994 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -112,6 +112,7 @@ where .with_label_values(&[INDEX_METADATA_TYPE]) .sub(size.into()); }) + .support_invalidation_closures() .build(); let index_cache = moka::sync::CacheBuilder::new(index_content_cap) .name(&format!("index_content_{}", index_type)) @@ -122,6 +123,7 @@ where .with_label_values(&[INDEX_CONTENT_TYPE]) .sub(size.into()); }) + .support_invalidation_closures() .build(); Self { index_metadata, @@ -219,6 +221,23 @@ where .add((self.weight_of_content)(&(key, page_key), &value).into()); self.index.insert((key, page_key), value); } + + /// Invalidates all cache entries whose keys satisfy `predicate`. + pub fn invalidate_if(&self, predicate: F) + where + F: Fn(&K) -> bool + Send + Sync + 'static, + { + let predicate = Arc::new(predicate); + let metadata_predicate = Arc::clone(&predicate); + + self.index_metadata + .invalidate_entries_if(move |key, _| metadata_predicate(key)) + .expect("cache should support invalidation closures"); + + self.index + .invalidate_entries_if(move |(key, _), _| predicate(key)) + .expect("cache should support invalidation closures"); + } } /// Prunes the size of the last page based on the indexes. diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index f4b9477b83..9e8d864d7d 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -50,6 +50,11 @@ impl BloomFilterIndexCache { bloom_filter_index_content_weight, ) } + + /// Removes all cached entries for the given `file_id`. + pub fn invalidate_file(&self, file_id: FileId) { + self.invalidate_if(move |key| key.0 == file_id); + } } /// Calculates weight for bloom filter index metadata. diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 5caf998a12..06a7a3f6d4 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -44,6 +44,11 @@ impl InvertedIndexCache { inverted_index_content_weight, ) } + + /// Removes all cached entries for the given `file_id`. + pub fn invalidate_file(&self, file_id: FileId) { + self.invalidate_if(move |key| *key == file_id); + } } /// Calculates weight for inverted index metadata. diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs index 5ae8425cc8..1b14c9d981 100644 --- a/src/mito2/src/cache/index/result_cache.rs +++ b/src/mito2/src/cache/index/result_cache.rs @@ -63,6 +63,7 @@ impl IndexResultCache { .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)]) .inc(); }) + .support_invalidation_closures() .build(); Self { cache } } @@ -97,6 +98,13 @@ impl IndexResultCache { fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc) -> u32 { k.0.mem_usage() as u32 + v.mem_usage() as u32 } + + /// Removes cached results for the given file. + pub fn invalidate_file(&self, file_id: FileId) { + self.cache + .invalidate_entries_if(move |(_, cached_file_id), _| *cached_file_id == file_id) + .expect("cache should support invalidation closures"); + } } /// Key for different types of index predicates. diff --git a/src/puffin/src/puffin_manager/cache.rs b/src/puffin/src/puffin_manager/cache.rs index 049aa4d4a6..c2c5fb67f4 100644 --- a/src/puffin/src/puffin_manager/cache.rs +++ b/src/puffin/src/puffin_manager/cache.rs @@ -57,4 +57,9 @@ impl PuffinMetadataCache { pub fn put_metadata(&self, file_id: String, metadata: Arc) { self.cache.insert(file_id, metadata); } + + /// Removes the metadata of the given file from the cache, if present. + pub fn remove(&self, file_id: &str) { + self.cache.invalidate(file_id); + } }