diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index d3eafaea42..6f4969923a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -23,9 +23,7 @@ pub(crate) mod manifest_cache; pub(crate) mod test_util; pub(crate) mod write_cache; -use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap}; -use std::hash::{Hash, Hasher}; use std::mem; use std::ops::Range; use std::sync::{Arc, RwLock}; @@ -1221,27 +1219,11 @@ impl PageRangeLookup { type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>; type PageFragmentIndex = HashMap; -type PageFragmentIndexShard = RwLock; - -const PAGE_FRAGMENT_INDEX_SHARDS: usize = 64; - -fn new_page_fragment_index_shards() -> Box<[PageFragmentIndexShard]> { - (0..PAGE_FRAGMENT_INDEX_SHARDS) - .map(|_| RwLock::new(HashMap::new())) - .collect::>() - .into_boxed_slice() -} - -fn page_fragment_index_shard(group_key: &PageFragmentGroupKey) -> usize { - let mut hasher = DefaultHasher::new(); - group_key.hash(&mut hasher); - hasher.finish() as usize % PAGE_FRAGMENT_INDEX_SHARDS -} /// Byte-fragment cache for Parquet row-group reads. pub struct PageRangeCache { cache: Cache, - index: Box<[PageFragmentIndexShard]>, + index: RwLock, } impl PageRangeCache { @@ -1259,8 +1241,10 @@ impl PageRangeCache { .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) .inc(); - if let Some(cache) = weak_cache.upgrade() { - cache.remove_stale_index_entries(vec![*k]); + if let Some(cache) = weak_cache.upgrade() + && !matches!(cause, RemovalCause::Replaced) + { + cache.remove_index_entry(*k); } } }) @@ -1268,7 +1252,7 @@ impl PageRangeCache { PageRangeCache { cache, - index: new_page_fragment_index_shards(), + index: RwLock::new(HashMap::new()), } }) } @@ -1308,7 +1292,9 @@ impl PageRangeCache { stale_keys.push(fragment_key); } } - self.remove_stale_index_entries(stale_keys); + for key in stale_keys { + self.remove_uncached_index_entry(key); + } let mut cursor = range.start; let mut compacted_parts: Vec = Vec::with_capacity(parts.len()); @@ -1371,7 +1357,11 @@ impl PageRangeCache { let size = page_cache_weight(&key, &bytes); CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into()); self.cache.insert(key, bytes); - self.insert_index_entry(key); + let mut index = self.index.write().unwrap(); + index + .entry(key.group_key()) + .or_default() + .insert((key.start, key.end), key); } } @@ -1385,7 +1375,7 @@ impl PageRangeCache { file_id, row_group_idx, }; - let index = self.index_shard(&group_key).read().unwrap(); + let index = self.index.read().unwrap(); index .get(&group_key) .map(|ranges| { @@ -1399,32 +1389,29 @@ impl PageRangeCache { .unwrap_or_default() } - fn insert_index_entry(&self, key: PageFragmentKey) { + fn remove_uncached_index_entry(&self, key: PageFragmentKey) { let group_key = key.group_key(); - let mut index = self.index_shard(&group_key).write().unwrap(); - index - .entry(group_key) - .or_default() - .insert((key.start, key.end), key); - } - - fn remove_stale_index_entries(&self, keys: Vec) { - for key in keys { - if self.cache.contains_key(&key) { - continue; - } - - if self.remove_index_entry(key) && self.cache.contains_key(&key) { - self.insert_index_entry(key); - } + let mut index = self.index.write().unwrap(); + if self.cache.contains_key(&key) { + return; } + + Self::remove_index_entry_locked(&mut index, group_key, key); } - fn remove_index_entry(&self, key: PageFragmentKey) -> bool { + fn remove_index_entry(&self, key: PageFragmentKey) { let group_key = key.group_key(); - let mut index = self.index_shard(&group_key).write().unwrap(); + let mut index = self.index.write().unwrap(); + Self::remove_index_entry_locked(&mut index, group_key, key); + } + + fn remove_index_entry_locked( + index: &mut PageFragmentIndex, + group_key: PageFragmentGroupKey, + key: PageFragmentKey, + ) { let Some(ranges) = index.get_mut(&group_key) else { - return false; + return; }; let removed = ranges @@ -1436,12 +1423,6 @@ impl PageRangeCache { if ranges.is_empty() { index.remove(&group_key); } - - removed - } - - fn index_shard(&self, group_key: &PageFragmentGroupKey) -> &PageFragmentIndexShard { - &self.index[page_fragment_index_shard(group_key)] } }