remove dashmap

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-18 17:39:36 +08:00
parent 6f54242028
commit 8accd23416

View File

@@ -23,15 +23,16 @@ pub(crate) mod manifest_cache;
pub(crate) mod test_util;
pub(crate) mod write_cache;
use std::collections::BTreeMap;
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use std::mem;
use std::ops::Range;
use std::sync::{Arc, RwLock, Weak};
use std::sync::{Arc, RwLock};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use dashmap::DashMap;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
@@ -1218,47 +1219,48 @@ impl PageRangeLookup {
}
}
type PageFragmentRangeMap = RwLock<BTreeMap<(u64, u64), PageFragmentKey>>;
type PageFragmentIndex = DashMap<PageFragmentGroupKey, Arc<PageFragmentRangeMap>>;
type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>;
type PageFragmentIndex = HashMap<PageFragmentGroupKey, PageFragmentRangeIndex>;
type PageFragmentIndexShard = RwLock<PageFragmentIndex>;
const PAGE_FRAGMENT_INDEX_SHARDS: usize = 64;
fn new_page_fragment_index_shards() -> Box<[PageFragmentIndexShard]> {
(0..PAGE_FRAGMENT_INDEX_SHARDS)
.map(|_| RwLock::new(HashMap::new()))
.collect::<Vec<_>>()
.into_boxed_slice()
}
fn page_fragment_index_shard(group_key: &PageFragmentGroupKey) -> usize {
let mut hasher = DefaultHasher::new();
group_key.hash(&mut hasher);
hasher.finish() as usize % PAGE_FRAGMENT_INDEX_SHARDS
}
/// Byte-fragment cache for Parquet row-group reads.
///
/// Moka owns capacity and eviction. The side index only makes overlap lookup possible.
pub struct PageRangeCache {
cache: Cache<PageFragmentKey, Bytes>,
index: Arc<PageFragmentIndex>,
index: Box<[PageFragmentIndexShard]>,
}
impl PageRangeCache {
fn new(capacity: u64) -> Arc<PageRangeCache> {
let index: Arc<PageFragmentIndex> = Arc::new(DashMap::new());
Arc::new_cyclic(|cache_ref: &Weak<PageRangeCache>| {
let index_for_listener = Arc::clone(&index);
let cache_ref = Weak::clone(cache_ref);
let cache = Cache::builder()
.max_capacity(capacity)
.weigher(page_cache_weight)
.eviction_listener(move |k: Arc<PageFragmentKey>, v: Bytes, cause| {
let key = *k;
let size = page_cache_weight(&key, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)])
.inc();
if cause != RemovalCause::Replaced
&& !cache_ref
.upgrade()
.is_some_and(|cache| cache.cache.contains_key(&key))
{
remove_page_fragment_index_entry(&index_for_listener, key);
}
})
.build();
let cache = Cache::builder()
.max_capacity(capacity)
.weigher(page_cache_weight)
.eviction_listener(|k, v, cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)])
.inc();
})
.build();
PageRangeCache {
cache,
index: Arc::clone(&index),
}
Arc::new(PageRangeCache {
cache,
index: new_page_fragment_index_shards(),
})
}
@@ -1272,15 +1274,7 @@ impl PageRangeCache {
let mut missing_ranges = Vec::new();
let mut cached_range_count = 0;
let mut cached_bytes = 0;
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx,
};
let group = self
.index
.get(&group_key)
.map(|group| Arc::clone(group.value()));
for range in ranges {
if range.start >= range.end {
cached_parts.push(Vec::new());
@@ -1288,40 +1282,25 @@ impl PageRangeCache {
}
let mut parts = Vec::new();
let candidates = self.find_index_candidates(file_id, row_group_idx, range);
let mut stale_keys = Vec::new();
let candidates = if let Some(group) = &group {
let index = group.read().unwrap();
// A simple first-stage interval lookup: inspect fragments whose start is before
// the requested end and keep those that overlap the requested range.
index
.range(..(range.end, 0))
.filter_map(|(_, fragment_key)| {
(fragment_key.end > range.start).then_some(*fragment_key)
})
.collect::<Vec<_>>()
} else {
Vec::new()
};
for fragment_key in candidates {
if let Some(bytes) = self.cache.get(&fragment_key) {
let start = range.start.max(fragment_key.start);
let end = range.end.min(fragment_key.end);
if start < end {
let slice_start = (start - fragment_key.start) as usize;
let slice_end = (end - fragment_key.start) as usize;
parts.push(PageRangePart {
range: start..end,
bytes: bytes.slice(slice_start..slice_end),
});
}
let part_start = range.start.max(fragment_key.start);
let part_end = range.end.min(fragment_key.end);
let slice_start = (part_start - fragment_key.start) as usize;
let slice_end = (part_end - fragment_key.start) as usize;
parts.push(PageRangePart {
range: part_start..part_end,
bytes: bytes.slice(slice_start..slice_end),
});
} else {
stale_keys.push(fragment_key);
}
}
self.remove_stale_index_entries(stale_keys);
parts.sort_unstable_by_key(|part| part.range.start);
let mut cursor = range.start;
let mut compacted_parts: Vec<PageRangePart> = Vec::with_capacity(parts.len());
for part in parts {
@@ -1379,7 +1358,7 @@ impl PageRangeCache {
}
let key = PageFragmentKey::new(file_id, row_group_idx, range);
let bytes = Bytes::copy_from_slice(bytes);
let bytes = Bytes::copy_from_slice(bytes.as_ref());
let size = page_cache_weight(&key, &bytes);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
self.cache.insert(key, bytes);
@@ -1387,26 +1366,37 @@ impl PageRangeCache {
}
}
fn find_index_candidates(
&self,
file_id: FileId,
row_group_idx: usize,
range: &Range<u64>,
) -> Vec<PageFragmentKey> {
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx,
};
let index = self.index_shard(&group_key).read().unwrap();
index
.get(&group_key)
.map(|ranges| {
ranges
.range(..(range.end, 0))
.filter_map(|(_, fragment_key)| {
(fragment_key.end > range.start).then_some(*fragment_key)
})
.collect()
})
.unwrap_or_default()
}
fn insert_index_entry(&self, key: PageFragmentKey) {
let group_key = key.group_key();
loop {
let group = self
.index
.entry(group_key)
.or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new())))
.clone();
{
let mut ranges = group.write().unwrap();
ranges.insert((key.start, key.end), key);
}
if self
.index
.get(&group_key)
.is_some_and(|current| Arc::ptr_eq(current.value(), &group))
{
return;
}
}
let mut index = self.index_shard(&group_key).write().unwrap();
index
.entry(group_key)
.or_default()
.insert((key.start, key.end), key);
}
fn remove_stale_index_entries(&self, keys: Vec<PageFragmentKey>) {
@@ -1415,40 +1405,35 @@ impl PageRangeCache {
continue;
}
if remove_page_fragment_index_entry(&self.index, key) && self.cache.contains_key(&key) {
if self.remove_index_entry(key) && self.cache.contains_key(&key) {
self.insert_index_entry(key);
}
}
}
}
fn remove_page_fragment_index_entry(index: &PageFragmentIndex, key: PageFragmentKey) -> bool {
let group_key = key.group_key();
let Some(group) = index.get(&group_key).map(|group| Arc::clone(group.value())) else {
return false;
};
let (removed, is_empty) = {
let mut ranges = group.write().unwrap();
let removed = if ranges
.get(&(key.start, key.end))
.is_some_and(|current| current == &key)
{
ranges.remove(&(key.start, key.end));
true
} else {
false
fn remove_index_entry(&self, key: PageFragmentKey) -> bool {
let group_key = key.group_key();
let mut index = self.index_shard(&group_key).write().unwrap();
let Some(ranges) = index.get_mut(&group_key) else {
return false;
};
(removed, ranges.is_empty())
};
if is_empty {
index.remove_if(&group_key, |_, current| {
Arc::ptr_eq(current, &group) && current.read().unwrap().is_empty()
});
let removed = ranges
.get(&(key.start, key.end))
.is_some_and(|current| current == &key);
if removed {
ranges.remove(&(key.start, key.end));
}
if ranges.is_empty() {
index.remove(&group_key);
}
removed
}
removed
fn index_shard(&self, group_key: &PageFragmentGroupKey) -> &PageFragmentIndexShard {
&self.index[page_fragment_index_shard(group_key)]
}
}
/// Cache key for time series row selector result.
@@ -1744,53 +1729,9 @@ mod tests {
}
#[test]
fn test_page_cache_removes_empty_index_group_on_lookup_cleanup() {
fn test_page_cache_replaces_fragment() {
let cache = PageRangeCache::new(1000);
let file_id = FileId::random();
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx: 0,
};
let range1 = 0..10;
let range2 = 20..30;
let key1 = PageFragmentKey::new(file_id, 0, &range1);
let key2 = PageFragmentKey::new(file_id, 0, &range2);
cache.insert_ranges(
file_id,
0,
&[range1.clone(), range2.clone()],
&[Bytes::from(vec![1; 10]), Bytes::from(vec![2; 10])],
);
assert_eq!(
2,
cache.index.get(&group_key).unwrap().read().unwrap().len()
);
cache.cache.invalidate(&key1);
cache.cache.run_pending_tasks();
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range1));
assert!(!lookup.is_fully_cached());
assert_eq!(
1,
cache.index.get(&group_key).unwrap().read().unwrap().len()
);
cache.cache.invalidate(&key2);
cache.cache.run_pending_tasks();
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range2));
assert!(!lookup.is_fully_cached());
assert!(!cache.index.contains_key(&group_key));
}
#[test]
fn test_page_cache_replaced_fragment_keeps_index_entry() {
let cache = PageRangeCache::new(1000);
let file_id = FileId::random();
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx: 0,
};
let range = 0..10;
cache.insert_ranges(
@@ -1810,71 +1751,67 @@ mod tests {
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
assert!(lookup.is_fully_cached());
assert_eq!(&vec![2; 10][..], &lookup.cached_parts[0][0].bytes[..]);
assert_eq!(
1,
cache.index.get(&group_key).unwrap().read().unwrap().len()
);
}
#[test]
fn test_page_cache_eviction_removes_index_entries_without_lookup() {
let file_id = FileId::random();
let first_range = 0..10;
let first_key = PageFragmentKey::new(file_id, 0, &first_range);
let page = Bytes::from(vec![1; 10]);
let cache = PageRangeCache::new(page_cache_weight(&first_key, &page) as u64);
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx: 0,
};
let ranges = (0..8)
.map(|idx| idx * 10..idx * 10 + 10)
.collect::<Vec<_>>();
let pages = (0..ranges.len())
.map(|idx| Bytes::from(vec![idx as u8; 10]))
.collect::<Vec<_>>();
cache.insert_ranges(file_id, 0, &ranges, &pages);
cache.cache.run_pending_tasks();
let indexed_keys = cache
.index
.get(&group_key)
.map(|group| group.read().unwrap().values().copied().collect::<Vec<_>>())
.unwrap_or_default();
assert!(indexed_keys.len() < ranges.len());
for key in indexed_keys {
assert!(cache.cache.contains_key(&key));
}
}
#[test]
fn test_page_cache_drop_releases_listener_index_cycle() {
fn test_page_cache_retains_disjoint_inserts_for_same_row_group() {
let cache = PageRangeCache::new(1000);
let index = Arc::downgrade(&cache.index);
let file_id = FileId::random();
let range = 0..10;
let range1 = 0..10;
let range2 = 20..30;
cache.insert_ranges(
file_id,
0,
std::slice::from_ref(&range),
std::slice::from_ref(&range1),
&[Bytes::from(vec![1; 10])],
);
cache.insert_ranges(
file_id,
0,
std::slice::from_ref(&range2),
&[Bytes::from(vec![2; 10])],
);
drop(cache);
assert!(index.upgrade().is_none());
let lookup = cache.lookup(file_id, 0, &[range1, range2]);
assert!(lookup.is_fully_cached());
assert_eq!(2, lookup.cached_range_count);
assert_eq!(&vec![1; 10][..], &lookup.cached_parts[0][0].bytes[..]);
assert_eq!(&vec![2; 10][..], &lookup.cached_parts[1][0].bytes[..]);
}
#[test]
fn test_page_cache_removes_index_entry_when_fragment_is_rejected() {
let cache = PageRangeCache::new(1);
fn test_page_cache_fragment_eviction() {
let file_id = FileId::random();
let range = 0..10;
let key = PageFragmentKey::new(file_id, 0, &range);
let page = Bytes::from(vec![1; 10]);
let cache = PageRangeCache::new(page_cache_weight(&key, &page) as u64);
cache.insert_ranges(
file_id,
0,
std::slice::from_ref(&range),
&[Bytes::from(vec![1; 10])],
);
assert!(
cache
.lookup(file_id, 0, std::slice::from_ref(&range))
.is_fully_cached()
);
cache.cache.invalidate(&key);
cache.cache.run_pending_tasks();
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
assert!(!lookup.is_fully_cached());
assert_eq!(vec![0..10], lookup.missing_ranges);
}
#[test]
fn test_page_cache_rejects_oversized_fragment() {
let cache = PageRangeCache::new(1);
let file_id = FileId::random();
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx: 0,
};
let range = 0..10;
cache.insert_ranges(
@@ -1885,11 +1822,9 @@ mod tests {
);
cache.cache.run_pending_tasks();
assert!(!cache.index.contains_key(&group_key));
let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
assert!(!lookup.is_fully_cached());
assert_eq!(vec![0..10], lookup.missing_ranges);
assert!(!cache.index.contains_key(&group_key));
}
#[test]