diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 758b352804..e5e498f159 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -1244,8 +1244,14 @@ impl PageRangeCache { .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) .inc(); - if let Some(group) = eviction_index.get(&key.group_key()) { - group.write().unwrap().remove(&(key.start, key.end)); + let group_key = key.group_key(); + if let Some(group) = eviction_index.get(&group_key).map(|group| group.clone()) { + let mut ranges = group.write().unwrap(); + ranges.remove(&(key.start, key.end)); + if ranges.is_empty() { + eviction_index + .remove_if(&group_key, |_, current| Arc::ptr_eq(current, &group)); + } } }) .build(); @@ -1358,16 +1364,32 @@ impl PageRangeCache { } let key = PageFragmentKey::new(file_id, row_group_idx, range); - let size = page_cache_weight(&key, bytes); + let bytes = Bytes::copy_from_slice(bytes); + let size = page_cache_weight(&key, &bytes); CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into()); - self.cache.insert(key, bytes.clone()); + self.cache.insert(key, bytes); + self.insert_index_entry(key); + } + } + + fn insert_index_entry(&self, key: PageFragmentKey) { + let group_key = key.group_key(); + loop { let group = self .index - .entry(key.group_key()) + .entry(group_key) .or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new()))) .clone(); - group.write().unwrap().insert((key.start, key.end), key); + let mut ranges = group.write().unwrap(); + if self + .index + .get(&group_key) + .is_some_and(|current| Arc::ptr_eq(current.value(), &group)) + { + ranges.insert((key.start, key.end), key); + return; + } } } } @@ -1641,6 +1663,65 @@ mod tests { assert_eq!(400..500, lookup.cached_parts[0][0].range); } + #[test] + fn test_page_cache_detaches_fragment_bytes() { + let cache = PageRangeCache::new(1000); + let file_id = FileId::random(); + let backing = Bytes::from(vec![1; 1024]); + let page = backing.slice(512..522); + let page_ptr = page.as_ptr(); + let range = 0..10; + + cache.insert_ranges( + file_id, + 0, + std::slice::from_ref(&range), + std::slice::from_ref(&page), + ); + + let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); + assert!(lookup.is_fully_cached()); + assert_eq!(1, lookup.cached_parts[0].len()); + assert_eq!(&page[..], &lookup.cached_parts[0][0].bytes[..]); + assert_ne!(page_ptr, lookup.cached_parts[0][0].bytes.as_ptr()); + } + + #[test] + fn test_page_cache_removes_empty_index_group_on_cache_removal() { + let cache = PageRangeCache::new(1000); + let file_id = FileId::random(); + let group_key = PageFragmentGroupKey { + file_id, + row_group_idx: 0, + }; + let range1 = 0..10; + let range2 = 20..30; + let key1 = PageFragmentKey::new(file_id, 0, &range1); + let key2 = PageFragmentKey::new(file_id, 0, &range2); + + cache.insert_ranges( + file_id, + 0, + &[range1, range2], + &[Bytes::from(vec![1; 10]), Bytes::from(vec![2; 10])], + ); + assert_eq!( + 2, + cache.index.get(&group_key).unwrap().read().unwrap().len() + ); + + cache.cache.invalidate(&key1); + cache.cache.run_pending_tasks(); + assert_eq!( + 1, + cache.index.get(&group_key).unwrap().read().unwrap().len() + ); + + cache.cache.invalidate(&key2); + cache.cache.run_pending_tasks(); + assert!(!cache.index.contains_key(&group_key)); + } + #[test] fn test_selector_result_cache() { let cache = CacheManager::builder()