copy fragements

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-16 11:14:41 +08:00
parent 8eebf15ea7
commit 226a02e6de

View File

@@ -1244,8 +1244,14 @@ impl PageRangeCache {
.with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)])
.inc();
if let Some(group) = eviction_index.get(&key.group_key()) {
group.write().unwrap().remove(&(key.start, key.end));
let group_key = key.group_key();
if let Some(group) = eviction_index.get(&group_key).map(|group| group.clone()) {
let mut ranges = group.write().unwrap();
ranges.remove(&(key.start, key.end));
if ranges.is_empty() {
eviction_index
.remove_if(&group_key, |_, current| Arc::ptr_eq(current, &group));
}
}
})
.build();
@@ -1358,16 +1364,32 @@ impl PageRangeCache {
}
let key = PageFragmentKey::new(file_id, row_group_idx, range);
let size = page_cache_weight(&key, bytes);
let bytes = Bytes::copy_from_slice(bytes);
let size = page_cache_weight(&key, &bytes);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
self.cache.insert(key, bytes.clone());
self.cache.insert(key, bytes);
self.insert_index_entry(key);
}
}
fn insert_index_entry(&self, key: PageFragmentKey) {
let group_key = key.group_key();
loop {
let group = self
.index
.entry(key.group_key())
.entry(group_key)
.or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new())))
.clone();
group.write().unwrap().insert((key.start, key.end), key);
let mut ranges = group.write().unwrap();
if self
.index
.get(&group_key)
.is_some_and(|current| Arc::ptr_eq(current.value(), &group))
{
ranges.insert((key.start, key.end), key);
return;
}
}
}
}
@@ -1641,6 +1663,65 @@ mod tests {
assert_eq!(400..500, lookup.cached_parts[0][0].range);
}
#[test]
fn test_page_cache_detaches_fragment_bytes() {
let cache = PageRangeCache::new(1000);
let file_id = FileId::random();
let backing = Bytes::from(vec![1; 1024]);
let page = backing.slice(512..522);
let page_ptr = page.as_ptr();
let range = 0..10;
cache.insert_ranges(
file_id,
0,
std::slice::from_ref(&range),
std::slice::from_ref(&page),
);
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
assert!(lookup.is_fully_cached());
assert_eq!(1, lookup.cached_parts[0].len());
assert_eq!(&page[..], &lookup.cached_parts[0][0].bytes[..]);
assert_ne!(page_ptr, lookup.cached_parts[0][0].bytes.as_ptr());
}
#[test]
fn test_page_cache_removes_empty_index_group_on_cache_removal() {
let cache = PageRangeCache::new(1000);
let file_id = FileId::random();
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx: 0,
};
let range1 = 0..10;
let range2 = 20..30;
let key1 = PageFragmentKey::new(file_id, 0, &range1);
let key2 = PageFragmentKey::new(file_id, 0, &range2);
cache.insert_ranges(
file_id,
0,
&[range1, range2],
&[Bytes::from(vec![1; 10]), Bytes::from(vec![2; 10])],
);
assert_eq!(
2,
cache.index.get(&group_key).unwrap().read().unwrap().len()
);
cache.cache.invalidate(&key1);
cache.cache.run_pending_tasks();
assert_eq!(
1,
cache.index.get(&group_key).unwrap().read().unwrap().len()
);
cache.cache.invalidate(&key2);
cache.cache.run_pending_tasks();
assert!(!cache.index.contains_key(&group_key));
}
#[test]
fn test_selector_result_cache() {
let cache = CacheManager::builder()