feat: collect cache metrics for inverted and bloom index

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-11-21 17:16:43 +08:00
committed by shuiyisong
parent e00452c4db
commit fea2966dec
5 changed files with 78 additions and 21 deletions

View File

@@ -44,6 +44,10 @@ pub struct BloomFilterReadMetrics {
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.

View File

@@ -39,6 +39,10 @@ pub struct InvertedIndexReadMetrics {
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
/// InvertedIndexReader defines an asynchronous reader of inverted index data

View File

@@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata";
/// Metrics for index content.
const INDEX_CONTENT_TYPE: &str = "index_content";
/// Metrics collected from IndexCache operations.
#[derive(Debug, Default, Clone)]
pub struct IndexCacheMetrics {
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
/// Number of pages accessed.
pub num_pages: usize,
/// Total bytes from pages.
pub page_bytes: u64,
}
impl IndexCacheMetrics {
/// Merges another set of metrics into this one.
pub fn merge(&mut self, other: &Self) {
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
self.num_pages += other.num_pages;
self.page_bytes += other.page_bytes;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageKey {
page_id: u64,
@@ -160,18 +183,20 @@ where
offset: u64,
size: u32,
load: F,
) -> Result<Vec<u8>, E>
) -> Result<(Vec<u8>, IndexCacheMetrics), E>
where
F: Fn(Vec<Range<u64>>) -> Fut,
Fut: Future<Output = Result<Vec<Bytes>, E>>,
E: std::error::Error,
{
let mut metrics = IndexCacheMetrics::default();
let page_keys =
PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
// Size is 0, return empty data.
if page_keys.is_empty() {
return Ok(Vec::new());
return Ok((Vec::new(), metrics));
}
metrics.num_pages = page_keys.len();
let mut data = Vec::with_capacity(page_keys.len());
data.resize(page_keys.len(), Bytes::new());
let mut cache_miss_range = vec![];
@@ -182,10 +207,13 @@ where
match self.get_page(key, *page_key) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_hit += 1;
metrics.page_bytes += page.len() as u64;
data[i] = page;
}
None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_miss += 1;
let base_offset = page_key.page_id * self.page_size;
let pruned_size = if i == last_index {
prune_size(page_keys.iter(), file_size, self.page_size)
@@ -201,14 +229,18 @@ where
let pages = load(cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page_key = page_keys[i];
metrics.page_bytes += page.len() as u64;
data[i] = page.clone();
self.put_page(key, page_key, page.clone());
}
}
let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec())
Ok((
buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec(),
metrics,
))
}
fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {

View File

@@ -123,7 +123,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
let result = self
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
@@ -132,18 +132,19 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
size,
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.map(|b| b.into())?;
.await?;
if let Some(m) = metrics {
m.total_ranges += 1;
m.total_bytes += size as u64;
m.total_ranges += cache_metrics.num_pages;
m.total_bytes += cache_metrics.page_bytes;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
Ok(result.into())
}
async fn read_vec(
@@ -154,9 +155,10 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
@@ -167,12 +169,15 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
m.total_ranges += total_cache_metrics.num_pages;
m.total_bytes += total_cache_metrics.page_bytes;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
@@ -191,6 +196,9 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
.get_metadata((self.file_id, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok((*cached).clone())
} else {
let meta = self.inner.metadata(metrics).await?;

View File

@@ -93,7 +93,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
let result = self
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
@@ -105,8 +105,10 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
.await?;
if let Some(m) = metrics {
m.total_bytes += size as u64;
m.total_ranges += 1;
m.total_bytes += cache_metrics.page_bytes;
m.total_ranges += cache_metrics.num_pages;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
@@ -121,9 +123,10 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
@@ -134,12 +137,15 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
m.total_ranges += ranges.len();
m.total_bytes += total_cache_metrics.page_bytes;
m.total_ranges += total_cache_metrics.num_pages;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
@@ -152,6 +158,9 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok(cached)
} else {
let meta = self.inner.metadata(metrics).await?;
@@ -438,7 +447,7 @@ mod test {
let size = rng.random_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size, None).await.unwrap();
let inner = &cached_reader.inner;
let read = cached_reader
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,