diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index cc983f0c89..d026bc34de 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -23,15 +23,16 @@ pub(crate) mod manifest_cache; pub(crate) mod test_util; pub(crate) mod write_cache; -use std::collections::BTreeMap; +use std::collections::hash_map::DefaultHasher; +use std::collections::{BTreeMap, HashMap}; +use std::hash::{Hash, Hasher}; use std::mem; use std::ops::Range; -use std::sync::{Arc, RwLock, Weak}; +use std::sync::{Arc, RwLock}; use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::warn; -use dashmap::DashMap; use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; use datatypes::vectors::VectorRef; @@ -1218,47 +1219,48 @@ impl PageRangeLookup { } } -type PageFragmentRangeMap = RwLock>; -type PageFragmentIndex = DashMap>; +type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>; +type PageFragmentIndex = HashMap; +type PageFragmentIndexShard = RwLock; + +const PAGE_FRAGMENT_INDEX_SHARDS: usize = 64; + +fn new_page_fragment_index_shards() -> Box<[PageFragmentIndexShard]> { + (0..PAGE_FRAGMENT_INDEX_SHARDS) + .map(|_| RwLock::new(HashMap::new())) + .collect::>() + .into_boxed_slice() +} + +fn page_fragment_index_shard(group_key: &PageFragmentGroupKey) -> usize { + let mut hasher = DefaultHasher::new(); + group_key.hash(&mut hasher); + hasher.finish() as usize % PAGE_FRAGMENT_INDEX_SHARDS +} /// Byte-fragment cache for Parquet row-group reads. -/// -/// Moka owns capacity and eviction. The side index only makes overlap lookup possible. pub struct PageRangeCache { cache: Cache, - index: Arc, + index: Box<[PageFragmentIndexShard]>, } impl PageRangeCache { fn new(capacity: u64) -> Arc { - let index: Arc = Arc::new(DashMap::new()); - Arc::new_cyclic(|cache_ref: &Weak| { - let index_for_listener = Arc::clone(&index); - let cache_ref = Weak::clone(cache_ref); - let cache = Cache::builder() - .max_capacity(capacity) - .weigher(page_cache_weight) - .eviction_listener(move |k: Arc, v: Bytes, cause| { - let key = *k; - let size = page_cache_weight(&key, &v); - CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); - CACHE_EVICTION - .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) - .inc(); - if cause != RemovalCause::Replaced - && !cache_ref - .upgrade() - .is_some_and(|cache| cache.cache.contains_key(&key)) - { - remove_page_fragment_index_entry(&index_for_listener, key); - } - }) - .build(); + let cache = Cache::builder() + .max_capacity(capacity) + .weigher(page_cache_weight) + .eviction_listener(|k, v, cause| { + let size = page_cache_weight(&k, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); + CACHE_EVICTION + .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) + .inc(); + }) + .build(); - PageRangeCache { - cache, - index: Arc::clone(&index), - } + Arc::new(PageRangeCache { + cache, + index: new_page_fragment_index_shards(), }) } @@ -1272,15 +1274,7 @@ impl PageRangeCache { let mut missing_ranges = Vec::new(); let mut cached_range_count = 0; let mut cached_bytes = 0; - let group_key = PageFragmentGroupKey { - file_id, - row_group_idx, - }; - let group = self - .index - .get(&group_key) - .map(|group| Arc::clone(group.value())); for range in ranges { if range.start >= range.end { cached_parts.push(Vec::new()); @@ -1288,40 +1282,25 @@ impl PageRangeCache { } let mut parts = Vec::new(); + let candidates = self.find_index_candidates(file_id, row_group_idx, range); let mut stale_keys = Vec::new(); - let candidates = if let Some(group) = &group { - let index = group.read().unwrap(); - // A simple first-stage interval lookup: inspect fragments whose start is before - // the requested end and keep those that overlap the requested range. - index - .range(..(range.end, 0)) - .filter_map(|(_, fragment_key)| { - (fragment_key.end > range.start).then_some(*fragment_key) - }) - .collect::>() - } else { - Vec::new() - }; for fragment_key in candidates { if let Some(bytes) = self.cache.get(&fragment_key) { - let start = range.start.max(fragment_key.start); - let end = range.end.min(fragment_key.end); - if start < end { - let slice_start = (start - fragment_key.start) as usize; - let slice_end = (end - fragment_key.start) as usize; - parts.push(PageRangePart { - range: start..end, - bytes: bytes.slice(slice_start..slice_end), - }); - } + let part_start = range.start.max(fragment_key.start); + let part_end = range.end.min(fragment_key.end); + let slice_start = (part_start - fragment_key.start) as usize; + let slice_end = (part_end - fragment_key.start) as usize; + parts.push(PageRangePart { + range: part_start..part_end, + bytes: bytes.slice(slice_start..slice_end), + }); } else { stale_keys.push(fragment_key); } } self.remove_stale_index_entries(stale_keys); - parts.sort_unstable_by_key(|part| part.range.start); let mut cursor = range.start; let mut compacted_parts: Vec = Vec::with_capacity(parts.len()); for part in parts { @@ -1379,7 +1358,7 @@ impl PageRangeCache { } let key = PageFragmentKey::new(file_id, row_group_idx, range); - let bytes = Bytes::copy_from_slice(bytes); + let bytes = Bytes::copy_from_slice(bytes.as_ref()); let size = page_cache_weight(&key, &bytes); CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into()); self.cache.insert(key, bytes); @@ -1387,26 +1366,37 @@ impl PageRangeCache { } } + fn find_index_candidates( + &self, + file_id: FileId, + row_group_idx: usize, + range: &Range, + ) -> Vec { + let group_key = PageFragmentGroupKey { + file_id, + row_group_idx, + }; + let index = self.index_shard(&group_key).read().unwrap(); + index + .get(&group_key) + .map(|ranges| { + ranges + .range(..(range.end, 0)) + .filter_map(|(_, fragment_key)| { + (fragment_key.end > range.start).then_some(*fragment_key) + }) + .collect() + }) + .unwrap_or_default() + } + fn insert_index_entry(&self, key: PageFragmentKey) { let group_key = key.group_key(); - loop { - let group = self - .index - .entry(group_key) - .or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new()))) - .clone(); - { - let mut ranges = group.write().unwrap(); - ranges.insert((key.start, key.end), key); - } - if self - .index - .get(&group_key) - .is_some_and(|current| Arc::ptr_eq(current.value(), &group)) - { - return; - } - } + let mut index = self.index_shard(&group_key).write().unwrap(); + index + .entry(group_key) + .or_default() + .insert((key.start, key.end), key); } fn remove_stale_index_entries(&self, keys: Vec) { @@ -1415,40 +1405,35 @@ impl PageRangeCache { continue; } - if remove_page_fragment_index_entry(&self.index, key) && self.cache.contains_key(&key) { + if self.remove_index_entry(key) && self.cache.contains_key(&key) { self.insert_index_entry(key); } } } -} -fn remove_page_fragment_index_entry(index: &PageFragmentIndex, key: PageFragmentKey) -> bool { - let group_key = key.group_key(); - let Some(group) = index.get(&group_key).map(|group| Arc::clone(group.value())) else { - return false; - }; - - let (removed, is_empty) = { - let mut ranges = group.write().unwrap(); - let removed = if ranges - .get(&(key.start, key.end)) - .is_some_and(|current| current == &key) - { - ranges.remove(&(key.start, key.end)); - true - } else { - false + fn remove_index_entry(&self, key: PageFragmentKey) -> bool { + let group_key = key.group_key(); + let mut index = self.index_shard(&group_key).write().unwrap(); + let Some(ranges) = index.get_mut(&group_key) else { + return false; }; - (removed, ranges.is_empty()) - }; - if is_empty { - index.remove_if(&group_key, |_, current| { - Arc::ptr_eq(current, &group) && current.read().unwrap().is_empty() - }); + let removed = ranges + .get(&(key.start, key.end)) + .is_some_and(|current| current == &key); + if removed { + ranges.remove(&(key.start, key.end)); + } + if ranges.is_empty() { + index.remove(&group_key); + } + + removed } - removed + fn index_shard(&self, group_key: &PageFragmentGroupKey) -> &PageFragmentIndexShard { + &self.index[page_fragment_index_shard(group_key)] + } } /// Cache key for time series row selector result. @@ -1744,53 +1729,9 @@ mod tests { } #[test] - fn test_page_cache_removes_empty_index_group_on_lookup_cleanup() { + fn test_page_cache_replaces_fragment() { let cache = PageRangeCache::new(1000); let file_id = FileId::random(); - let group_key = PageFragmentGroupKey { - file_id, - row_group_idx: 0, - }; - let range1 = 0..10; - let range2 = 20..30; - let key1 = PageFragmentKey::new(file_id, 0, &range1); - let key2 = PageFragmentKey::new(file_id, 0, &range2); - - cache.insert_ranges( - file_id, - 0, - &[range1.clone(), range2.clone()], - &[Bytes::from(vec![1; 10]), Bytes::from(vec![2; 10])], - ); - assert_eq!( - 2, - cache.index.get(&group_key).unwrap().read().unwrap().len() - ); - - cache.cache.invalidate(&key1); - cache.cache.run_pending_tasks(); - let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range1)); - assert!(!lookup.is_fully_cached()); - assert_eq!( - 1, - cache.index.get(&group_key).unwrap().read().unwrap().len() - ); - - cache.cache.invalidate(&key2); - cache.cache.run_pending_tasks(); - let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range2)); - assert!(!lookup.is_fully_cached()); - assert!(!cache.index.contains_key(&group_key)); - } - - #[test] - fn test_page_cache_replaced_fragment_keeps_index_entry() { - let cache = PageRangeCache::new(1000); - let file_id = FileId::random(); - let group_key = PageFragmentGroupKey { - file_id, - row_group_idx: 0, - }; let range = 0..10; cache.insert_ranges( @@ -1810,71 +1751,67 @@ mod tests { let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); assert!(lookup.is_fully_cached()); assert_eq!(&vec![2; 10][..], &lookup.cached_parts[0][0].bytes[..]); - assert_eq!( - 1, - cache.index.get(&group_key).unwrap().read().unwrap().len() - ); } #[test] - fn test_page_cache_eviction_removes_index_entries_without_lookup() { - let file_id = FileId::random(); - let first_range = 0..10; - let first_key = PageFragmentKey::new(file_id, 0, &first_range); - let page = Bytes::from(vec![1; 10]); - let cache = PageRangeCache::new(page_cache_weight(&first_key, &page) as u64); - let group_key = PageFragmentGroupKey { - file_id, - row_group_idx: 0, - }; - let ranges = (0..8) - .map(|idx| idx * 10..idx * 10 + 10) - .collect::>(); - let pages = (0..ranges.len()) - .map(|idx| Bytes::from(vec![idx as u8; 10])) - .collect::>(); - - cache.insert_ranges(file_id, 0, &ranges, &pages); - cache.cache.run_pending_tasks(); - - let indexed_keys = cache - .index - .get(&group_key) - .map(|group| group.read().unwrap().values().copied().collect::>()) - .unwrap_or_default(); - assert!(indexed_keys.len() < ranges.len()); - for key in indexed_keys { - assert!(cache.cache.contains_key(&key)); - } - } - - #[test] - fn test_page_cache_drop_releases_listener_index_cycle() { + fn test_page_cache_retains_disjoint_inserts_for_same_row_group() { let cache = PageRangeCache::new(1000); - let index = Arc::downgrade(&cache.index); let file_id = FileId::random(); - let range = 0..10; + let range1 = 0..10; + let range2 = 20..30; cache.insert_ranges( file_id, 0, - std::slice::from_ref(&range), + std::slice::from_ref(&range1), &[Bytes::from(vec![1; 10])], ); + cache.insert_ranges( + file_id, + 0, + std::slice::from_ref(&range2), + &[Bytes::from(vec![2; 10])], + ); - drop(cache); - - assert!(index.upgrade().is_none()); + let lookup = cache.lookup(file_id, 0, &[range1, range2]); + assert!(lookup.is_fully_cached()); + assert_eq!(2, lookup.cached_range_count); + assert_eq!(&vec![1; 10][..], &lookup.cached_parts[0][0].bytes[..]); + assert_eq!(&vec![2; 10][..], &lookup.cached_parts[1][0].bytes[..]); } #[test] - fn test_page_cache_removes_index_entry_when_fragment_is_rejected() { - let cache = PageRangeCache::new(1); + fn test_page_cache_fragment_eviction() { + let file_id = FileId::random(); + let range = 0..10; + let key = PageFragmentKey::new(file_id, 0, &range); + let page = Bytes::from(vec![1; 10]); + let cache = PageRangeCache::new(page_cache_weight(&key, &page) as u64); + + cache.insert_ranges( + file_id, + 0, + std::slice::from_ref(&range), + &[Bytes::from(vec![1; 10])], + ); + assert!( + cache + .lookup(file_id, 0, std::slice::from_ref(&range)) + .is_fully_cached() + ); + + cache.cache.invalidate(&key); + cache.cache.run_pending_tasks(); + + let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); + assert!(!lookup.is_fully_cached()); + assert_eq!(vec![0..10], lookup.missing_ranges); + } + + #[test] + fn test_page_cache_rejects_oversized_fragment() { + let cache = PageRangeCache::new(1); let file_id = FileId::random(); - let group_key = PageFragmentGroupKey { - file_id, - row_group_idx: 0, - }; let range = 0..10; cache.insert_ranges( @@ -1885,11 +1822,9 @@ mod tests { ); cache.cache.run_pending_tasks(); - assert!(!cache.index.contains_key(&group_key)); let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range)); assert!(!lookup.is_fully_cached()); assert_eq!(vec![0..10], lookup.missing_ranges); - assert!(!cache.index.contains_key(&group_key)); } #[test]