diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index b7787e400d..32f62a5775 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -44,6 +44,10 @@ pub struct BloomFilterReadMetrics { pub total_ranges: usize, /// Elapsed time to fetch data. pub fetch_elapsed: Duration, + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, } /// Safely converts bytes to Vec using bytemuck for optimal performance. diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 9dcaf879a5..b15554fcb7 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -39,6 +39,10 @@ pub struct InvertedIndexReadMetrics { pub total_ranges: usize, /// Elapsed time to fetch data. pub fetch_elapsed: Duration, + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, } /// InvertedIndexReader defines an asynchronous reader of inverted index data diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 350ef34b2a..7393773a89 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata"; /// Metrics for index content. const INDEX_CONTENT_TYPE: &str = "index_content"; +/// Metrics collected from IndexCache operations. +#[derive(Debug, Default, Clone)] +pub struct IndexCacheMetrics { + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, + /// Number of pages accessed. + pub num_pages: usize, + /// Total bytes from pages. + pub page_bytes: u64, +} + +impl IndexCacheMetrics { + /// Merges another set of metrics into this one. + pub fn merge(&mut self, other: &Self) { + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + self.num_pages += other.num_pages; + self.page_bytes += other.page_bytes; + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PageKey { page_id: u64, @@ -160,18 +183,20 @@ where offset: u64, size: u32, load: F, - ) -> Result, E> + ) -> Result<(Vec, IndexCacheMetrics), E> where F: Fn(Vec>) -> Fut, Fut: Future, E>>, E: std::error::Error, { + let mut metrics = IndexCacheMetrics::default(); let page_keys = PageKey::generate_page_keys(offset, size, self.page_size).collect::>(); // Size is 0, return empty data. if page_keys.is_empty() { - return Ok(Vec::new()); + return Ok((Vec::new(), metrics)); } + metrics.num_pages = page_keys.len(); let mut data = Vec::with_capacity(page_keys.len()); data.resize(page_keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; @@ -182,10 +207,13 @@ where match self.get_page(key, *page_key) { Some(page) => { CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + metrics.cache_hit += 1; + metrics.page_bytes += page.len() as u64; data[i] = page; } None => { CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + metrics.cache_miss += 1; let base_offset = page_key.page_id * self.page_size; let pruned_size = if i == last_index { prune_size(page_keys.iter(), file_size, self.page_size) @@ -201,14 +229,18 @@ where let pages = load(cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { let page_key = page_keys[i]; + metrics.page_bytes += page.len() as u64; data[i] = page.clone(); self.put_page(key, page_key, page.clone()); } } let buffer = Buffer::from_iter(data.into_iter()); - Ok(buffer - .slice(PageKey::calculate_range(offset, size, self.page_size)) - .to_vec()) + Ok(( + buffer + .slice(PageKey::calculate_range(offset, size, self.page_size)) + .to_vec(), + metrics, + )) } fn get_page(&self, key: K, page_key: PageKey) -> Option { diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 4daf516567..0dbb6c403e 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -123,7 +123,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl ) -> Result { let start = metrics.as_ref().map(|_| Instant::now()); let inner = &self.inner; - let result = self + let (result, cache_metrics) = self .cache .get_or_load( (self.file_id, self.column_id, self.tag), @@ -132,18 +132,19 @@ impl BloomFilterReader for CachedBloomFilterIndexBl size, move |ranges| async move { inner.read_vec(&ranges, None).await }, ) - .await - .map(|b| b.into())?; + .await?; if let Some(m) = metrics { - m.total_ranges += 1; - m.total_bytes += size as u64; + m.total_ranges += cache_metrics.num_pages; + m.total_bytes += cache_metrics.page_bytes; + m.cache_hit += cache_metrics.cache_hit; + m.cache_miss += cache_metrics.cache_miss; if let Some(start) = start { m.fetch_elapsed += start.elapsed(); } } - Ok(result) + Ok(result.into()) } async fn read_vec( @@ -154,9 +155,10 @@ impl BloomFilterReader for CachedBloomFilterIndexBl let start = metrics.as_ref().map(|_| Instant::now()); let mut pages = Vec::with_capacity(ranges.len()); + let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default(); for range in ranges { let inner = &self.inner; - let page = self + let (page, cache_metrics) = self .cache .get_or_load( (self.file_id, self.column_id, self.tag), @@ -167,12 +169,15 @@ impl BloomFilterReader for CachedBloomFilterIndexBl ) .await?; + total_cache_metrics.merge(&cache_metrics); pages.push(Bytes::from(page)); } if let Some(m) = metrics { - m.total_ranges += ranges.len(); - m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + m.total_ranges += total_cache_metrics.num_pages; + m.total_bytes += total_cache_metrics.page_bytes; + m.cache_hit += total_cache_metrics.cache_hit; + m.cache_miss += total_cache_metrics.cache_miss; if let Some(start) = start { m.fetch_elapsed += start.elapsed(); } @@ -191,6 +196,9 @@ impl BloomFilterReader for CachedBloomFilterIndexBl .get_metadata((self.file_id, self.column_id, self.tag)) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + if let Some(m) = metrics { + m.cache_hit += 1; + } Ok((*cached).clone()) } else { let meta = self.inner.metadata(metrics).await?; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 274cd44f77..79509d0796 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -93,7 +93,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead let start = metrics.as_ref().map(|_| Instant::now()); let inner = &self.inner; - let result = self + let (result, cache_metrics) = self .cache .get_or_load( self.file_id, @@ -105,8 +105,10 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead .await?; if let Some(m) = metrics { - m.total_bytes += size as u64; - m.total_ranges += 1; + m.total_bytes += cache_metrics.page_bytes; + m.total_ranges += cache_metrics.num_pages; + m.cache_hit += cache_metrics.cache_hit; + m.cache_miss += cache_metrics.cache_miss; m.fetch_elapsed += start.unwrap().elapsed(); } @@ -121,9 +123,10 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead let start = metrics.as_ref().map(|_| Instant::now()); let mut pages = Vec::with_capacity(ranges.len()); + let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default(); for range in ranges { let inner = &self.inner; - let page = self + let (page, cache_metrics) = self .cache .get_or_load( self.file_id, @@ -134,12 +137,15 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead ) .await?; + total_cache_metrics.merge(&cache_metrics); pages.push(Bytes::from(page)); } if let Some(m) = metrics { - m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); - m.total_ranges += ranges.len(); + m.total_bytes += total_cache_metrics.page_bytes; + m.total_ranges += total_cache_metrics.num_pages; + m.cache_hit += total_cache_metrics.cache_hit; + m.cache_miss += total_cache_metrics.cache_miss; m.fetch_elapsed += start.unwrap().elapsed(); } @@ -152,6 +158,9 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead ) -> Result> { if let Some(cached) = self.cache.get_metadata(self.file_id) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + if let Some(m) = metrics { + m.cache_hit += 1; + } Ok(cached) } else { let meta = self.inner.metadata(metrics).await?; @@ -438,7 +447,7 @@ mod test { let size = rng.random_range(0..file_size as u32 - offset as u32); let expected = cached_reader.range_read(offset, size, None).await.unwrap(); let inner = &cached_reader.inner; - let read = cached_reader + let (read, _cache_metrics) = cached_reader .cache .get_or_load( cached_reader.file_id,