Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-20 14:48:10 +08:00
parent bebcfc69b5
commit 54c19174eb

View File

@@ -23,9 +23,7 @@ pub(crate) mod manifest_cache;
pub(crate) mod test_util;
pub(crate) mod write_cache;
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use std::mem;
use std::ops::Range;
use std::sync::{Arc, RwLock};
@@ -1221,27 +1219,11 @@ impl PageRangeLookup {
type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>;
type PageFragmentIndex = HashMap<PageFragmentGroupKey, PageFragmentRangeIndex>;
type PageFragmentIndexShard = RwLock<PageFragmentIndex>;
const PAGE_FRAGMENT_INDEX_SHARDS: usize = 64;
fn new_page_fragment_index_shards() -> Box<[PageFragmentIndexShard]> {
(0..PAGE_FRAGMENT_INDEX_SHARDS)
.map(|_| RwLock::new(HashMap::new()))
.collect::<Vec<_>>()
.into_boxed_slice()
}
fn page_fragment_index_shard(group_key: &PageFragmentGroupKey) -> usize {
let mut hasher = DefaultHasher::new();
group_key.hash(&mut hasher);
hasher.finish() as usize % PAGE_FRAGMENT_INDEX_SHARDS
}
/// Byte-fragment cache for Parquet row-group reads.
pub struct PageRangeCache {
cache: Cache<PageFragmentKey, Bytes>,
index: Box<[PageFragmentIndexShard]>,
index: RwLock<PageFragmentIndex>,
}
impl PageRangeCache {
@@ -1259,8 +1241,10 @@ impl PageRangeCache {
.with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)])
.inc();
if let Some(cache) = weak_cache.upgrade() {
cache.remove_stale_index_entries(vec![*k]);
if let Some(cache) = weak_cache.upgrade()
&& !matches!(cause, RemovalCause::Replaced)
{
cache.remove_index_entry(*k);
}
}
})
@@ -1268,7 +1252,7 @@ impl PageRangeCache {
PageRangeCache {
cache,
index: new_page_fragment_index_shards(),
index: RwLock::new(HashMap::new()),
}
})
}
@@ -1308,7 +1292,9 @@ impl PageRangeCache {
stale_keys.push(fragment_key);
}
}
self.remove_stale_index_entries(stale_keys);
for key in stale_keys {
self.remove_uncached_index_entry(key);
}
let mut cursor = range.start;
let mut compacted_parts: Vec<PageRangePart> = Vec::with_capacity(parts.len());
@@ -1371,7 +1357,11 @@ impl PageRangeCache {
let size = page_cache_weight(&key, &bytes);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
self.cache.insert(key, bytes);
self.insert_index_entry(key);
let mut index = self.index.write().unwrap();
index
.entry(key.group_key())
.or_default()
.insert((key.start, key.end), key);
}
}
@@ -1385,7 +1375,7 @@ impl PageRangeCache {
file_id,
row_group_idx,
};
let index = self.index_shard(&group_key).read().unwrap();
let index = self.index.read().unwrap();
index
.get(&group_key)
.map(|ranges| {
@@ -1399,32 +1389,29 @@ impl PageRangeCache {
.unwrap_or_default()
}
fn insert_index_entry(&self, key: PageFragmentKey) {
fn remove_uncached_index_entry(&self, key: PageFragmentKey) {
let group_key = key.group_key();
let mut index = self.index_shard(&group_key).write().unwrap();
index
.entry(group_key)
.or_default()
.insert((key.start, key.end), key);
}
fn remove_stale_index_entries(&self, keys: Vec<PageFragmentKey>) {
for key in keys {
if self.cache.contains_key(&key) {
continue;
}
if self.remove_index_entry(key) && self.cache.contains_key(&key) {
self.insert_index_entry(key);
}
let mut index = self.index.write().unwrap();
if self.cache.contains_key(&key) {
return;
}
Self::remove_index_entry_locked(&mut index, group_key, key);
}
fn remove_index_entry(&self, key: PageFragmentKey) -> bool {
fn remove_index_entry(&self, key: PageFragmentKey) {
let group_key = key.group_key();
let mut index = self.index_shard(&group_key).write().unwrap();
let mut index = self.index.write().unwrap();
Self::remove_index_entry_locked(&mut index, group_key, key);
}
fn remove_index_entry_locked(
index: &mut PageFragmentIndex,
group_key: PageFragmentGroupKey,
key: PageFragmentKey,
) {
let Some(ranges) = index.get_mut(&group_key) else {
return false;
return;
};
let removed = ranges
@@ -1436,12 +1423,6 @@ impl PageRangeCache {
if ranges.is_empty() {
index.remove(&group_key);
}
removed
}
fn index_shard(&self, group_key: &PageFragmentGroupKey) -> &PageFragmentIndexShard {
&self.index[page_fragment_index_shard(group_key)]
}
}