diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index d026bc34de..d3eafaea42 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -1246,21 +1246,30 @@ pub struct PageRangeCache { impl PageRangeCache { fn new(capacity: u64) -> Arc { - let cache = Cache::builder() - .max_capacity(capacity) - .weigher(page_cache_weight) - .eviction_listener(|k, v, cause| { - let size = page_cache_weight(&k, &v); - CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); - CACHE_EVICTION - .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) - .inc(); - }) - .build(); + Arc::new_cyclic(|weak_cache: &std::sync::Weak| { + let cache = Cache::builder() + .max_capacity(capacity) + .weigher(page_cache_weight) + .eviction_listener({ + let weak_cache = weak_cache.clone(); + move |k, v, cause| { + let size = page_cache_weight(&k, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); + CACHE_EVICTION + .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) + .inc(); - Arc::new(PageRangeCache { - cache, - index: new_page_fragment_index_shards(), + if let Some(cache) = weak_cache.upgrade() { + cache.remove_stale_index_entries(vec![*k]); + } + } + }) + .build(); + + PageRangeCache { + cache, + index: new_page_fragment_index_shards(), + } }) } @@ -1747,6 +1756,10 @@ mod tests { &[Bytes::from(vec![2; 10])], ); cache.cache.run_pending_tasks(); + assert_eq!( + vec![PageFragmentKey::new(file_id, 0, &range)], + cache.find_index_candidates(file_id, 0, &range) + ); let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); assert!(lookup.is_fully_cached()); @@ -1802,6 +1815,7 @@ mod tests { cache.cache.invalidate(&key); cache.cache.run_pending_tasks(); + assert!(cache.find_index_candidates(file_id, 0, &range).is_empty()); let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); assert!(!lookup.is_fully_cached());