From 84e4e42ee77675bf1398d5d7e71b4f7123c00d1b Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 4 Dec 2025 21:40:18 +0800 Subject: [PATCH] feat: add more verbose metrics to scanners (#7336) * feat: add inverted applier metrics Signed-off-by: evenyag * feat: add metrics to bloom applier Signed-off-by: evenyag * feat: add metrics to fulltext index applier Signed-off-by: evenyag * feat: implement BloomFilterReadMetrics for BloomFilterReader Signed-off-by: evenyag * feat: collect read metrics for inverted index Signed-off-by: evenyag * feat: add metrics for range_read and metadata Signed-off-by: evenyag * refactor: rename elapsed to fetch_elapsed Signed-off-by: evenyag * feat: collect metadata fetch metrics for inverted index Signed-off-by: evenyag * feat: collect cache metrics for inverted and bloom index Signed-off-by: evenyag * feat: collect read metrics in appliers Signed-off-by: evenyag * feat: collect fulltext dir metrics for applier Signed-off-by: evenyag * feat: collect parquet row group metrics Signed-off-by: evenyag * feat: add parquet metadata metrics Signed-off-by: evenyag * feat: add apply metrics Signed-off-by: evenyag * feat: collect more metrics for memory row group Signed-off-by: evenyag * feat: add fetch metrics to ReaderMetrics Signed-off-by: evenyag * feat: init verbose metrics Signed-off-by: evenyag * feat: debug print metrics in ScanMetricsSet Signed-off-by: evenyag * feat: implement debug for new metrics Signed-off-by: evenyag * chore: fix compiler errors Signed-off-by: evenyag * feat: update parquet fetch metrics Signed-off-by: evenyag * feat: collect the whole fetch time Signed-off-by: evenyag * feat: add file_scan_cost Signed-off-by: evenyag * chore: parquet fetch add cache_miss counter Signed-off-by: evenyag * feat: print index read metrics Signed-off-by: evenyag * chore: use actual bytes to increase counter Signed-off-by: evenyag * refactor: remove provided implementations for index reader traits Signed-off-by: evenyag * refactor: change get_parquet_meta_data() method to receive metrics Signed-off-by: evenyag * refactor: rename file_scan_cost to sst_scan_cost Signed-off-by: evenyag * chore: refine ParquetFetchMetrics Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * style: fmt code Signed-off-by: evenyag * refactor: remove useless inner method Signed-off-by: evenyag * refactor: collect page size actual needed Signed-off-by: evenyag * refactor: simplify InvertedIndexReadMetrics Signed-off-by: evenyag * refactor: simplfy InvertedIndexApplyMetrics Debug Signed-off-by: evenyag * refactor: simplify BloomFilterReadMetrics Debug Signed-off-by: evenyag * refactor: simplify BloomFilterIndexApplyMetrics Debug Signed-off-by: evenyag * refactor: simplify FulltextIndexApplyMetrics implementation Signed-off-by: evenyag * refactor: simplify ParquetFetchMetrics Debug Signed-off-by: evenyag * refactor: simplify MetadataCacheMetrics Debug Signed-off-by: evenyag * feat: only print verbose metrics when they are not empty. Signed-off-by: evenyag * refactor: use mutex to protect ParquetFetchMetrics Signed-off-by: evenyag * style: fmt code Signed-off-by: evenyag * refactor: use duration for elapsed in ParquetFetchMetricsData Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/index/src/bloom_filter/applier.rs | 18 +- src/index/src/bloom_filter/reader.rs | 192 ++++++++++++-- src/index/src/fulltext_index/tests.rs | 2 +- src/index/src/inverted_index/format/reader.rs | 132 ++++++++-- .../src/inverted_index/format/reader/blob.rs | 62 ++++- .../inverted_index/format/reader/footer.rs | 35 ++- .../src/inverted_index/format/writer/blob.rs | 48 +++- .../search/fst_values_mapper.rs | 65 +++-- .../src/inverted_index/search/index_apply.rs | 5 +- .../search/index_apply/predicates_apply.rs | 99 +++---- src/mito2/src/cache.rs | 61 +++-- src/mito2/src/cache/index.rs | 42 ++- .../src/cache/index/bloom_filter_index.rs | 65 ++++- src/mito2/src/cache/index/inverted_index.rs | 113 ++++++-- src/mito2/src/engine/puffin_index.rs | 8 +- src/mito2/src/read/scan_util.rs | 141 +++++++++- .../src/sst/index/bloom_filter/applier.rs | 103 +++++++- .../src/sst/index/bloom_filter/creator.rs | 10 +- .../src/sst/index/fulltext_index/applier.rs | 191 ++++++++++++-- .../src/sst/index/fulltext_index/creator.rs | 9 +- .../src/sst/index/inverted_index/applier.rs | 119 ++++++++- .../src/sst/index/inverted_index/creator.rs | 2 +- src/mito2/src/sst/index/puffin_manager.rs | 2 +- src/mito2/src/sst/parquet/file_range.rs | 19 +- src/mito2/src/sst/parquet/reader.rs | 180 +++++++++++-- src/mito2/src/sst/parquet/row_group.rs | 249 +++++++++++++++++- src/puffin/src/puffin_manager.rs | 14 +- .../fs_puffin_manager/reader.rs | 8 +- src/puffin/src/puffin_manager/stager.rs | 7 +- .../puffin_manager/stager/bounded_stager.rs | 37 ++- src/puffin/src/puffin_manager/tests.rs | 2 +- 31 files changed, 1716 insertions(+), 324 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index 18332d4815..db219c9e61 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use crate::Bytes; use crate::bloom_filter::error::Result; -use crate::bloom_filter::reader::BloomFilterReader; +use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; /// `InListPredicate` contains a list of acceptable values. A value needs to match at least /// one of the elements (logical OR semantic) for the predicate to be satisfied. @@ -38,7 +38,7 @@ pub struct BloomFilterApplier { impl BloomFilterApplier { pub async fn new(reader: Box) -> Result { - let meta = reader.metadata().await?; + let meta = reader.metadata(None).await?; Ok(Self { reader, meta }) } @@ -50,6 +50,7 @@ impl BloomFilterApplier { &mut self, predicates: &[InListPredicate], search_ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, ) -> Result>> { if predicates.is_empty() { // If no predicates, return empty result @@ -57,7 +58,7 @@ impl BloomFilterApplier { } let segments = self.row_ranges_to_segments(search_ranges); - let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?; + let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?; let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates); Ok(intersect_ranges(search_ranges, &matching_row_ranges)) } @@ -95,6 +96,7 @@ impl BloomFilterApplier { async fn load_bloom_filters( &mut self, segments: &[usize], + metrics: Option<&mut BloomFilterReadMetrics>, ) -> Result<(Vec<(u64, usize)>, Vec)> { let segment_locations = segments .iter() @@ -108,7 +110,10 @@ impl BloomFilterApplier { .map(|i| self.meta.bloom_filter_locs[i as usize]) .collect::>(); - let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?; + let bloom_filters = self + .reader + .bloom_filter_vec(&bloom_filter_locs, metrics) + .await?; Ok((segment_locations, bloom_filters)) } @@ -422,7 +427,10 @@ mod tests { ]; for (predicates, search_range, expected) in cases { - let result = applier.search(&predicates, &[search_range]).await.unwrap(); + let result = applier + .search(&predicates, &[search_range], None) + .await + .unwrap(); assert_eq!( result, expected, "Expected {:?}, got {:?}", diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 466024f0d7..a9e08694e7 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::ops::{Range, Rem}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use bytemuck::try_cast_slice; @@ -34,6 +35,72 @@ const BLOOM_META_LEN_SIZE: u64 = 4; /// Default prefetch size of bloom filter meta. pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB +/// Metrics for bloom filter read operations. +#[derive(Default, Clone)] +pub struct BloomFilterReadMetrics { + /// Total byte size to read. + pub total_bytes: u64, + /// Total number of ranges to read. + pub total_ranges: usize, + /// Elapsed time to fetch data. + pub fetch_elapsed: Duration, + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, +} + +impl std::fmt::Debug for BloomFilterReadMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + total_bytes, + total_ranges, + fetch_elapsed, + cache_hit, + cache_miss, + } = self; + + // If both total_bytes and cache_hit are 0, we didn't read anything. + if *total_bytes == 0 && *cache_hit == 0 { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + if *total_bytes > 0 { + write!(f, "\"total_bytes\":{}", total_bytes)?; + } + if *cache_hit > 0 { + if *total_bytes > 0 { + write!(f, ", ")?; + } + write!(f, "\"cache_hit\":{}", cache_hit)?; + } + + if *total_ranges > 0 { + write!(f, ", \"total_ranges\":{}", total_ranges)?; + } + if !fetch_elapsed.is_zero() { + write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?; + } + if *cache_miss > 0 { + write!(f, ", \"cache_miss\":{}", cache_miss)?; + } + + write!(f, "}}") + } +} + +impl BloomFilterReadMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.total_bytes += other.total_bytes; + self.total_ranges += other.total_ranges; + self.fetch_elapsed += other.fetch_elapsed; + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + } +} + /// Safely converts bytes to Vec using bytemuck for optimal performance. /// Faster than chunking and converting each piece individually. /// @@ -79,25 +146,33 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec { #[async_trait] pub trait BloomFilterReader: Sync { /// Reads range of bytes from the file. - async fn range_read(&self, offset: u64, size: u32) -> Result; + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result; /// Reads bunch of ranges from the file. - async fn read_vec(&self, ranges: &[Range]) -> Result> { - let mut results = Vec::with_capacity(ranges.len()); - for range in ranges { - let size = (range.end - range.start) as u32; - let data = self.range_read(range.start, size).await?; - results.push(data); - } - Ok(results) - } + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result>; /// Reads the meta information of the bloom filter. - async fn metadata(&self) -> Result; + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result; /// Reads a bloom filter with the given location. - async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result { - let bytes = self.range_read(loc.offset, loc.size as _).await?; + async fn bloom_filter( + &self, + loc: &BloomFilterLoc, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?; let vec = bytes_to_u64_vec(&bytes); let bm = BloomFilter::from_vec(vec) .seed(&SEED) @@ -105,12 +180,16 @@ pub trait BloomFilterReader: Sync { Ok(bm) } - async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result> { + async fn bloom_filter_vec( + &self, + locs: &[BloomFilterLoc], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { let ranges = locs .iter() .map(|l| l.offset..l.offset + l.size) .collect::>(); - let bss = self.read_vec(&ranges).await?; + let bss = self.read_vec(&ranges, metrics).await?; let mut result = Vec::with_capacity(bss.len()); for (bs, loc) in bss.into_iter().zip(locs.iter()) { @@ -140,24 +219,59 @@ impl BloomFilterReaderImpl { #[async_trait] impl BloomFilterReader for BloomFilterReaderImpl { - async fn range_read(&self, offset: u64, size: u32) -> Result { - self.reader + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let start = metrics.as_ref().map(|_| Instant::now()); + let result = self + .reader .read(offset..offset + size as u64) .await - .context(IoSnafu) + .context(IoSnafu)?; + + if let Some(m) = metrics { + m.total_ranges += 1; + m.total_bytes += size as u64; + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + + Ok(result) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { - self.reader.read_vec(ranges).await.context(IoSnafu) + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let result = self.reader.read_vec(ranges).await.context(IoSnafu)?; + + if let Some(m) = metrics { + m.total_ranges += ranges.len(); + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + + Ok(result) } - async fn metadata(&self) -> Result { + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { let metadata = self.reader.metadata().await.context(IoSnafu)?; let file_size = metadata.content_length; let mut meta_reader = BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE)); - meta_reader.metadata().await + meta_reader.metadata(metrics).await } } @@ -183,7 +297,10 @@ impl BloomFilterMetaReader { /// /// It will first prefetch some bytes from the end of the file, /// then parse the metadata from the prefetch bytes. - pub async fn metadata(&mut self) -> Result { + pub async fn metadata( + &mut self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { ensure!( self.file_size >= BLOOM_META_LEN_SIZE, FileSizeTooSmallSnafu { @@ -191,6 +308,7 @@ impl BloomFilterMetaReader { } ); + let start = metrics.as_ref().map(|_| Instant::now()); let meta_start = self.file_size.saturating_sub(self.prefetch_size); let suffix = self .reader @@ -208,8 +326,28 @@ impl BloomFilterMetaReader { .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE) .await .context(IoSnafu)?; + + if let Some(m) = metrics { + // suffix read + meta read + m.total_ranges += 2; + // Ignores the meta length size to simplify the calculation. + m.total_bytes += self.file_size.min(self.prefetch_size) + length; + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) } else { + if let Some(m) = metrics { + // suffix read only + m.total_ranges += 1; + m.total_bytes += self.file_size.min(self.prefetch_size); + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start; let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize]; BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) @@ -290,7 +428,7 @@ mod tests { for prefetch in [0u64, file_size / 2, file_size, file_size + 10] { let mut reader = BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch)); - let meta = reader.metadata().await.unwrap(); + let meta = reader.metadata(None).await.unwrap(); assert_eq!(meta.rows_per_segment, 2); assert_eq!(meta.segment_count, 2); @@ -312,11 +450,11 @@ mod tests { let bytes = mock_bloom_filter_bytes().await; let reader = BloomFilterReaderImpl::new(bytes); - let meta = reader.metadata().await.unwrap(); + let meta = reader.metadata(None).await.unwrap(); assert_eq!(meta.bloom_filter_locs.len(), 2); let bf = reader - .bloom_filter(&meta.bloom_filter_locs[0]) + .bloom_filter(&meta.bloom_filter_locs[0], None) .await .unwrap(); assert!(bf.contains(&b"a")); @@ -325,7 +463,7 @@ mod tests { assert!(bf.contains(&b"d")); let bf = reader - .bloom_filter(&meta.bloom_filter_locs[1]) + .bloom_filter(&meta.bloom_filter_locs[1], None) .await .unwrap(); assert!(bf.contains(&b"e")); diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index abdf20e22d..2198ea67b3 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -74,7 +74,7 @@ async fn test_search( writer.finish().await.unwrap(); let reader = puffin_manager.reader(&file_name).await.unwrap(); - let index_dir = reader.dir(&blob_key).await.unwrap(); + let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap(); let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap(); for (query, expected) in query_expected { let results = searcher.search(query).await.unwrap(); diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 40fa22130a..ff67284e51 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; @@ -29,37 +30,115 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader; mod blob; mod footer; +/// Metrics for inverted index read operations. +#[derive(Default, Clone)] +pub struct InvertedIndexReadMetrics { + /// Total byte size to read. + pub total_bytes: u64, + /// Total number of ranges to read. + pub total_ranges: usize, + /// Elapsed time to fetch data. + pub fetch_elapsed: Duration, + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, +} + +impl std::fmt::Debug for InvertedIndexReadMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + total_bytes, + total_ranges, + fetch_elapsed, + cache_hit, + cache_miss, + } = self; + + // If both total_bytes and cache_hit are 0, we didn't read anything. + if *total_bytes == 0 && *cache_hit == 0 { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + if *total_bytes > 0 { + write!(f, "\"total_bytes\":{}", total_bytes)?; + } + if *cache_hit > 0 { + if *total_bytes > 0 { + write!(f, ", ")?; + } + write!(f, "\"cache_hit\":{}", cache_hit)?; + } + + if *total_ranges > 0 { + write!(f, ", \"total_ranges\":{}", total_ranges)?; + } + if !fetch_elapsed.is_zero() { + write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?; + } + if *cache_miss > 0 { + write!(f, ", \"cache_miss\":{}", cache_miss)?; + } + + write!(f, "}}") + } +} + +impl InvertedIndexReadMetrics { + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.total_bytes += other.total_bytes; + self.total_ranges += other.total_ranges; + self.fetch_elapsed += other.fetch_elapsed; + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + } +} + /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] pub trait InvertedIndexReader: Send + Sync { /// Seeks to given offset and reads data with exact size as provided. - async fn range_read(&self, offset: u64, size: u32) -> Result>; + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&self, ranges: &[Range]) -> Result> { - let mut result = Vec::with_capacity(ranges.len()); - for range in ranges { - let data = self - .range_read(range.start, (range.end - range.start) as u32) - .await?; - result.push(Bytes::from(data)); - } - Ok(result) - } + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result>; /// Retrieves metadata of all inverted indices stored within the blob. - async fn metadata(&self) -> Result>; + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result>; /// Retrieves the finite state transducer (FST) map from the given offset and size. - async fn fst(&self, offset: u64, size: u32) -> Result { - let fst_data = self.range_read(offset, size).await?; + async fn fst<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result { + let fst_data = self.range_read(offset, size, metrics).await?; FstMap::new(fst_data).context(DecodeFstSnafu) } /// Retrieves the multiple finite state transducer (FST) maps from the given ranges. - async fn fst_vec(&mut self, ranges: &[Range]) -> Result> { - self.read_vec(ranges) + async fn fst_vec<'a>( + &mut self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + self.read_vec(ranges, metrics) .await? .into_iter() .map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu)) @@ -67,19 +146,28 @@ pub trait InvertedIndexReader: Send + Sync { } /// Retrieves the bitmap from the given offset and size. - async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result { - self.range_read(offset, size).await.and_then(|bytes| { - Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) - }) + async fn bitmap<'a>( + &self, + offset: u64, + size: u32, + bitmap_type: BitmapType, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result { + self.range_read(offset, size, metrics) + .await + .and_then(|bytes| { + Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) + }) } /// Retrieves the multiple bitmaps from the given ranges. - async fn bitmap_deque( + async fn bitmap_deque<'a>( &mut self, ranges: &[(Range, BitmapType)], + metrics: Option<&'a mut InvertedIndexReadMetrics>, ) -> Result> { let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip(); - let bytes = self.read_vec(&ranges).await?; + let bytes = self.read_vec(&ranges, metrics).await?; bytes .into_iter() .zip(types) diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index f48791e8f4..05f8f40047 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -14,6 +14,7 @@ use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; @@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure}; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::format::MIN_BLOB_SIZE; -use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::reader::footer::{ DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader, }; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// Inverted index blob reader, implements [`InvertedIndexReader`] pub struct InvertedIndexBlobReader { @@ -53,27 +54,58 @@ impl InvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for InvertedIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result> { + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let buf = self .source .read(offset..offset + size as u64) .await .context(CommonIoSnafu)?; + + if let Some(m) = metrics { + m.total_bytes += size as u64; + m.total_ranges += 1; + m.fetch_elapsed += start.unwrap().elapsed(); + } + Ok(buf.into()) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { - self.source.read_vec(ranges).await.context(CommonIoSnafu) + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + + let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?; + + if let Some(m) = metrics { + m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::(); + m.total_ranges += ranges.len(); + m.fetch_elapsed += start.unwrap().elapsed(); + } + + Ok(result) } - async fn metadata(&self) -> Result> { + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { let metadata = self.source.metadata().await.context(CommonIoSnafu)?; let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size) .with_prefetch_size(DEFAULT_PREFETCH_SIZE); - footer_reader.metadata().await.map(Arc::new) + footer_reader.metadata(metrics).await.map(Arc::new) } } @@ -173,7 +205,7 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); assert_eq!(metas.metas.len(), 2); let meta0 = metas.metas.get("tag0").unwrap(); @@ -200,13 +232,14 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); let fst_map = blob_reader .fst( meta.base_offset + meta.relative_fst_offset as u64, meta.fst_size, + None, ) .await .unwrap(); @@ -219,6 +252,7 @@ mod tests { .fst( meta.base_offset + meta.relative_fst_offset as u64, meta.fst_size, + None, ) .await .unwrap(); @@ -232,30 +266,30 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); let bitmap = blob_reader - .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .bitmap(meta.base_offset, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); let bitmap = blob_reader - .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag1").unwrap(); let bitmap = blob_reader - .bitmap(meta.base_offset, 26, BitmapType::Roaring) + .bitmap(meta.base_offset, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); let bitmap = blob_reader - .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) + .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None) .await .unwrap(); assert_eq!(bitmap, mock_bitmap()); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 2609eb6cbb..866021c6e6 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use common_base::range_read::RangeReader; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use prost::Message; @@ -23,6 +25,7 @@ use crate::inverted_index::error::{ UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; +use crate::inverted_index::format::reader::InvertedIndexReadMetrics; pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB @@ -54,12 +57,17 @@ impl InvertedIndexFooterReader { } impl InvertedIndexFooterReader { - pub async fn metadata(&mut self) -> Result { + pub async fn metadata( + &mut self, + mut metrics: Option<&mut InvertedIndexReadMetrics>, + ) -> Result { ensure!( self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE, BlobSizeTooSmallSnafu ); + let start = metrics.as_ref().map(|_| Instant::now()); + let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); let suffix = self .source @@ -73,19 +81,36 @@ impl InvertedIndexFooterReader { let footer_size = FOOTER_PAYLOAD_SIZE_SIZE; // Did not fetch the entire file metadata in the initial read, need to make a second request. - if length > suffix_len as u64 - footer_size { + let result = if length > suffix_len as u64 - footer_size { let metadata_start = self.blob_size - length - footer_size; let meta = self .source .read(metadata_start..self.blob_size - footer_size) .await .context(CommonIoSnafu)?; + + if let Some(m) = metrics.as_deref_mut() { + m.total_bytes += self.blob_size.min(self.prefetch_size()) + length; + m.total_ranges += 2; + } + self.parse_payload(&meta, length) } else { + if let Some(m) = metrics.as_deref_mut() { + m.total_bytes += self.blob_size.min(self.prefetch_size()); + m.total_ranges += 1; + } + let metadata_start = self.blob_size - length - footer_size - footer_start; let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; self.parse_payload(meta, length) + }; + + if let Some(m) = metrics { + m.fetch_elapsed += start.unwrap().elapsed(); } + + result } fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { @@ -186,7 +211,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let metas = reader.metadata().await.unwrap(); + let metas = reader.metadata(None).await.unwrap(); assert_eq!(metas.metas.len(), 1); let index_meta = &metas.metas.get("test").unwrap(); assert_eq!(index_meta.name, "test"); @@ -210,7 +235,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let result = reader.metadata().await; + let result = reader.metadata(None).await; assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. })); } } @@ -233,7 +258,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let result = reader.metadata().await; + let result = reader.metadata(None).await; assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. })); } } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 5991284869..58d8593591 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -122,7 +122,7 @@ mod tests { .unwrap(); let reader = InvertedIndexBlobReader::new(blob); - let metadata = reader.metadata().await.unwrap(); + let metadata = reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 0); @@ -182,7 +182,7 @@ mod tests { .unwrap(); let reader = InvertedIndexBlobReader::new(blob); - let metadata = reader.metadata().await.unwrap(); + let metadata = reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 2); @@ -198,13 +198,19 @@ mod tests { .fst( tag0.base_offset + tag0.relative_fst_offset as u64, tag0.fst_size, + None, ) .await .unwrap(); assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -213,7 +219,12 @@ mod tests { ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -222,7 +233,12 @@ mod tests { ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -241,13 +257,19 @@ mod tests { .fst( tag1.base_offset + tag1.relative_fst_offset as u64, tag1.fst_size, + None, ) .await .unwrap(); assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -256,7 +278,12 @@ mod tests { ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -265,7 +292,12 @@ mod tests { ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( diff --git a/src/index/src/inverted_index/search/fst_values_mapper.rs b/src/index/src/inverted_index/search/fst_values_mapper.rs index f9c15c40d8..38df713c8d 100644 --- a/src/index/src/inverted_index/search/fst_values_mapper.rs +++ b/src/index/src/inverted_index/search/fst_values_mapper.rs @@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta}; use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their /// corresponding bitmaps within an inverted index. @@ -35,7 +35,8 @@ impl<'a> ParallelFstValuesMapper<'a> { pub async fn map_values_vec( &mut self, - value_and_meta_vec: &[(Vec, &'a InvertedIndexMeta)], + value_and_meta_vec: &[(Vec, &InvertedIndexMeta)], + metrics: Option<&mut InvertedIndexReadMetrics>, ) -> Result> { let groups = value_and_meta_vec .iter() @@ -64,7 +65,7 @@ impl<'a> ParallelFstValuesMapper<'a> { } common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges); - let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?; + let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges, metrics).await?; let mut output = Vec::with_capacity(groups.len()); for counter in groups { @@ -95,23 +96,25 @@ mod tests { #[tokio::test] async fn test_map_values_vec() { let mut mock_reader = MockInvertedIndexReader::new(); - mock_reader.expect_bitmap_deque().returning(|ranges| { - let mut output = VecDeque::new(); - for (range, bitmap_type) in ranges { - let offset = range.start; - let size = range.end - range.start; - match (offset, size, bitmap_type) { - (1, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + mock_reader + .expect_bitmap_deque() + .returning(|ranges, _metrics| { + let mut output = VecDeque::new(); + for (range, bitmap_type) in ranges { + let offset = range.start; + let size = range.end - range.start; + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type)) + } + _ => unreachable!(), } - (2, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type)) - } - _ => unreachable!(), } - } - Ok(output) - }); + Ok(output) + }); let meta = InvertedIndexMeta { bitmap_type: BitmapType::Roaring.into(), @@ -120,13 +123,13 @@ mod tests { let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader); let result = values_mapper - .map_values_vec(&[(vec![], &meta)]) + .map_values_vec(&[(vec![], &meta)], None) .await .unwrap(); assert_eq!(result[0].count_ones(), 0); let result = values_mapper - .map_values_vec(&[(vec![value(1, 1)], &meta)]) + .map_values_vec(&[(vec![value(1, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -135,7 +138,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1)], &meta)]) + .map_values_vec(&[(vec![value(2, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -144,7 +147,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)]) + .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -153,7 +156,7 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)]) + .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None) .await .unwrap(); assert_eq!( @@ -162,7 +165,10 @@ mod tests { ); let result = values_mapper - .map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)]) + .map_values_vec( + &[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)], + None, + ) .await .unwrap(); assert_eq!( @@ -174,10 +180,13 @@ mod tests { Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) ); let result = values_mapper - .map_values_vec(&[ - (vec![value(2, 1), value(1, 1)], &meta), - (vec![value(1, 1)], &meta), - ]) + .map_values_vec( + &[ + (vec![value(2, 1), value(1, 1)], &meta), + (vec![value(1, 1)], &meta), + ], + None, + ) .await .unwrap(); assert_eq!( diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index a80f102e02..02a1f96450 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier; use crate::bitmap::Bitmap; use crate::inverted_index::error::Result; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; /// The output of an apply operation. #[derive(Clone, Debug, PartialEq)] @@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). #[allow(unused_parens)] - async fn apply<'a>( + async fn apply<'a, 'b>( &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), + metrics: Option<&'b mut InvertedIndexReadMetrics>, ) -> Result; /// Returns the memory usage of the applier. diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index ae22e79c74..441a4b4304 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas; use crate::bitmap::Bitmap; use crate::inverted_index::error::{IndexNotFoundSnafu, Result}; -use crate::inverted_index::format::reader::InvertedIndexReader; +use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use crate::inverted_index::search::fst_apply::{ FstApplier, IntersectionFstApplier, KeysFstApplier, }; @@ -43,12 +43,14 @@ pub struct PredicatesIndexApplier { impl IndexApplier for PredicatesIndexApplier { /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual /// bitmaps obtained for each index to result in a final set of indices. - async fn apply<'a>( + async fn apply<'a, 'b>( &self, context: SearchContext, reader: &mut (dyn InvertedIndexReader + 'a), + metrics: Option<&'b mut InvertedIndexReadMetrics>, ) -> Result { - let metadata = reader.metadata().await?; + let mut metrics = metrics; + let metadata = reader.metadata(metrics.as_deref_mut()).await?; let mut output = ApplyOutput { matched_segment_ids: Bitmap::new_bitvec(), total_row_count: metadata.total_row_count as _, @@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier { return Ok(output); } - let fsts = reader.fst_vec(&fst_ranges).await?; + let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?; let value_and_meta_vec = fsts .into_iter() .zip(appliers) @@ -92,7 +94,7 @@ impl IndexApplier for PredicatesIndexApplier { .collect::>(); let mut mapper = ParallelFstValuesMapper::new(reader); - let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?; + let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?; let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty for bm in bm_vec { @@ -221,26 +223,28 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); - mock_reader.expect_fst_vec().returning(|_ranges| { + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); + mock_reader.expect_fst_vec().returning(|_ranges, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(), ]) }); - mock_reader.expect_bitmap_deque().returning(|arg| { - assert_eq!(arg.len(), 1); - let range = &arg[0].0; - let bitmap_type = arg[0].1; - assert_eq!(*range, 2..3); - assert_eq!(bitmap_type, BitmapType::Roaring); - Ok(VecDeque::from([Bitmap::from_lsb0_bytes( - &[0b10101010], - bitmap_type, - )])) - }); + mock_reader + .expect_bitmap_deque() + .returning(|arg, _metrics| { + assert_eq!(arg.len(), 1); + let range = &arg[0].0; + let bitmap_type = arg[0].1; + assert_eq!(*range, 2..3); + assert_eq!(bitmap_type, BitmapType::Roaring); + Ok(VecDeque::from([Bitmap::from_lsb0_bytes( + &[0b10101010], + bitmap_type, + )])) + }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!( @@ -252,14 +256,14 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); - mock_reader.expect_fst_vec().returning(|_range| { + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); + mock_reader.expect_fst_vec().returning(|_range, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(), ]) }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!(output.matched_segment_ids.count_ones(), 0); @@ -279,8 +283,8 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); - mock_reader.expect_fst_vec().returning(|ranges| { + .returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); + mock_reader.expect_fst_vec().returning(|ranges, _metrics| { let mut output = vec![]; for range in ranges { match range.start { @@ -293,27 +297,29 @@ mod tests { } Ok(output) }); - mock_reader.expect_bitmap_deque().returning(|ranges| { - let mut output = VecDeque::new(); - for (range, bitmap_type) in ranges { - let offset = range.start; - let size = range.end - range.start; - match (offset, size, bitmap_type) { - (1, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + mock_reader + .expect_bitmap_deque() + .returning(|ranges, _metrics| { + let mut output = VecDeque::new(); + for (range, bitmap_type) in ranges { + let offset = range.start; + let size = range.end - range.start; + match (offset, size, bitmap_type) { + (1, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) + } + (2, 1, BitmapType::Roaring) => { + output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type)) + } + _ => unreachable!(), } - (2, 1, BitmapType::Roaring) => { - output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type)) - } - _ => unreachable!(), } - } - Ok(output) - }); + Ok(output) + }); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!( @@ -331,10 +337,10 @@ mod tests { let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan @@ -343,7 +349,7 @@ mod tests { #[tokio::test] async fn test_index_applier_with_empty_index() { let mut mock_reader = MockInvertedIndexReader::new(); - mock_reader.expect_metadata().returning(move || { + mock_reader.expect_metadata().returning(move |_| { Ok(Arc::new(InvertedIndexMetas { total_row_count: 0, // No rows segment_row_count: 1, @@ -359,7 +365,7 @@ mod tests { }; let output = applier - .apply(SearchContext::default(), &mut mock_reader) + .apply(SearchContext::default(), &mut mock_reader, None) .await .unwrap(); assert!(output.matched_segment_ids.is_empty()); @@ -370,7 +376,7 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas(vec![]))); + .returning(|_| Ok(mock_metas(vec![]))); let mut mock_fst_applier = MockFstApplier::new(); mock_fst_applier.expect_apply().never(); @@ -385,6 +391,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::ThrowError, }, &mut mock_reader, + None, ) .await; assert!(matches!(result, Err(Error::IndexNotFound { .. }))); @@ -395,6 +402,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }, &mut mock_reader, + None, ) .await .unwrap(); @@ -406,6 +414,7 @@ mod tests { index_not_found_strategy: IndexNotFoundStrategy::Ignore, }, &mut mock_reader, + None, ) .await .unwrap(); diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index e82903ccac..4e7e842da9 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -45,6 +45,7 @@ use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; use crate::sst::file::RegionFileId; +use crate::sst::parquet::reader::MetadataCacheMetrics; /// Metrics type key for sst meta. const SST_META_TYPE: &str = "sst_meta"; @@ -75,19 +76,24 @@ pub enum CacheStrategy { } impl CacheStrategy { - /// Calls [CacheManager::get_parquet_meta_data()]. - pub async fn get_parquet_meta_data( + /// Gets parquet metadata with cache metrics tracking. + /// Returns the metadata and updates the provided metrics. + pub(crate) async fn get_parquet_meta_data( &self, file_id: RegionFileId, + metrics: &mut MetadataCacheMetrics, ) -> Option> { match self { CacheStrategy::EnableAll(cache_manager) => { - cache_manager.get_parquet_meta_data(file_id).await + cache_manager.get_parquet_meta_data(file_id, metrics).await } CacheStrategy::Compaction(cache_manager) => { - cache_manager.get_parquet_meta_data(file_id).await + cache_manager.get_parquet_meta_data(file_id, metrics).await + } + CacheStrategy::Disabled => { + metrics.cache_miss += 1; + None } - CacheStrategy::Disabled => None, } } @@ -292,16 +298,17 @@ impl CacheManager { CacheManagerBuilder::default() } - /// Gets cached [ParquetMetaData] from in-memory cache first. - /// If not found, tries to get it from write cache and fill the in-memory cache. - pub async fn get_parquet_meta_data( + /// Gets cached [ParquetMetaData] with metrics tracking. + /// Tries in-memory cache first, then file cache, updating metrics accordingly. + pub(crate) async fn get_parquet_meta_data( &self, file_id: RegionFileId, + metrics: &mut MetadataCacheMetrics, ) -> Option> { // Try to get metadata from sst meta cache - let metadata = self.get_parquet_meta_data_from_mem_cache(file_id); - if metadata.is_some() { - return metadata; + if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) { + metrics.mem_cache_hit += 1; + return Some(metadata); } // Try to get metadata from write cache @@ -309,11 +316,13 @@ impl CacheManager { if let Some(write_cache) = &self.write_cache && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await { + metrics.file_cache_hit += 1; let metadata = Arc::new(metadata); // Put metadata into sst meta cache self.put_parquet_meta_data(file_id, metadata.clone()); return Some(metadata); }; + metrics.cache_miss += 1; None } @@ -826,8 +835,14 @@ mod tests { let region_id = RegionId::new(1, 1); let file_id = RegionFileId::new(region_id, FileId::random()); let metadata = parquet_meta(); + let mut metrics = MetadataCacheMetrics::default(); cache.put_parquet_meta_data(file_id, metadata); - assert!(cache.get_parquet_meta_data(file_id).await.is_none()); + assert!( + cache + .get_parquet_meta_data(file_id, &mut metrics) + .await + .is_none() + ); let value = Value::Int64(10); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); @@ -849,14 +864,30 @@ mod tests { #[tokio::test] async fn test_parquet_meta_cache() { let cache = CacheManager::builder().sst_meta_cache_size(2000).build(); + let mut metrics = MetadataCacheMetrics::default(); let region_id = RegionId::new(1, 1); let file_id = RegionFileId::new(region_id, FileId::random()); - assert!(cache.get_parquet_meta_data(file_id).await.is_none()); + assert!( + cache + .get_parquet_meta_data(file_id, &mut metrics) + .await + .is_none() + ); let metadata = parquet_meta(); cache.put_parquet_meta_data(file_id, metadata); - assert!(cache.get_parquet_meta_data(file_id).await.is_some()); + assert!( + cache + .get_parquet_meta_data(file_id, &mut metrics) + .await + .is_some() + ); cache.remove_parquet_meta_data(file_id); - assert!(cache.get_parquet_meta_data(file_id).await.is_none()); + assert!( + cache + .get_parquet_meta_data(file_id, &mut metrics) + .await + .is_none() + ); } #[test] diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 350ef34b2a..7393773a89 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata"; /// Metrics for index content. const INDEX_CONTENT_TYPE: &str = "index_content"; +/// Metrics collected from IndexCache operations. +#[derive(Debug, Default, Clone)] +pub struct IndexCacheMetrics { + /// Number of cache hits. + pub cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, + /// Number of pages accessed. + pub num_pages: usize, + /// Total bytes from pages. + pub page_bytes: u64, +} + +impl IndexCacheMetrics { + /// Merges another set of metrics into this one. + pub fn merge(&mut self, other: &Self) { + self.cache_hit += other.cache_hit; + self.cache_miss += other.cache_miss; + self.num_pages += other.num_pages; + self.page_bytes += other.page_bytes; + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PageKey { page_id: u64, @@ -160,18 +183,20 @@ where offset: u64, size: u32, load: F, - ) -> Result, E> + ) -> Result<(Vec, IndexCacheMetrics), E> where F: Fn(Vec>) -> Fut, Fut: Future, E>>, E: std::error::Error, { + let mut metrics = IndexCacheMetrics::default(); let page_keys = PageKey::generate_page_keys(offset, size, self.page_size).collect::>(); // Size is 0, return empty data. if page_keys.is_empty() { - return Ok(Vec::new()); + return Ok((Vec::new(), metrics)); } + metrics.num_pages = page_keys.len(); let mut data = Vec::with_capacity(page_keys.len()); data.resize(page_keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; @@ -182,10 +207,13 @@ where match self.get_page(key, *page_key) { Some(page) => { CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + metrics.cache_hit += 1; + metrics.page_bytes += page.len() as u64; data[i] = page; } None => { CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); + metrics.cache_miss += 1; let base_offset = page_key.page_id * self.page_size; let pruned_size = if i == last_index { prune_size(page_keys.iter(), file_size, self.page_size) @@ -201,14 +229,18 @@ where let pages = load(cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { let page_key = page_keys[i]; + metrics.page_bytes += page.len() as u64; data[i] = page.clone(); self.put_page(key, page_key, page.clone()); } } let buffer = Buffer::from_iter(data.into_iter()); - Ok(buffer - .slice(PageKey::calculate_range(offset, size, self.page_size)) - .to_vec()) + Ok(( + buffer + .slice(PageKey::calculate_range(offset, size, self.page_size)) + .to_vec(), + metrics, + )) } fn get_page(&self, key: K, page_key: PageKey) -> Option { diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index b4e7804b93..0dbb6c403e 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -14,12 +14,13 @@ use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use api::v1::index::{BloomFilterLoc, BloomFilterMeta}; use async_trait::async_trait; use bytes::Bytes; use index::bloom_filter::error::Result; -use index::bloom_filter::reader::BloomFilterReader; +use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader}; use store_api::storage::{ColumnId, FileId}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; @@ -114,51 +115,93 @@ impl CachedBloomFilterIndexBlobReader { #[async_trait] impl BloomFilterReader for CachedBloomFilterIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result { + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let start = metrics.as_ref().map(|_| Instant::now()); let inner = &self.inner; - self.cache + let (result, cache_metrics) = self + .cache .get_or_load( (self.file_id, self.column_id, self.tag), self.blob_size, offset, size, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) - .await - .map(|b| b.into()) + .await?; + + if let Some(m) = metrics { + m.total_ranges += cache_metrics.num_pages; + m.total_bytes += cache_metrics.page_bytes; + m.cache_hit += cache_metrics.cache_hit; + m.cache_miss += cache_metrics.cache_miss; + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + + Ok(result.into()) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec( + &self, + ranges: &[Range], + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let mut pages = Vec::with_capacity(ranges.len()); + let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default(); for range in ranges { let inner = &self.inner; - let page = self + let (page, cache_metrics) = self .cache .get_or_load( (self.file_id, self.column_id, self.tag), self.blob_size, range.start, (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await?; + total_cache_metrics.merge(&cache_metrics); pages.push(Bytes::from(page)); } + if let Some(m) = metrics { + m.total_ranges += total_cache_metrics.num_pages; + m.total_bytes += total_cache_metrics.page_bytes; + m.cache_hit += total_cache_metrics.cache_hit; + m.cache_miss += total_cache_metrics.cache_miss; + if let Some(start) = start { + m.fetch_elapsed += start.elapsed(); + } + } + Ok(pages) } /// Reads the meta information of the bloom filter. - async fn metadata(&self) -> Result { + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { if let Some(cached) = self .cache .get_metadata((self.file_id, self.column_id, self.tag)) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + if let Some(m) = metrics { + m.cache_hit += 1; + } Ok((*cached).clone()) } else { - let meta = self.inner.metadata().await?; + let meta = self.inner.metadata(metrics).await?; self.cache.put_metadata( (self.file_id, self.column_id, self.tag), Arc::new(meta.clone()), diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 06a7a3f6d4..79509d0796 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -14,12 +14,13 @@ use core::ops::Range; use std::sync::Arc; +use std::time::Instant; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; use bytes::Bytes; use index::inverted_index::error::Result; -use index::inverted_index::format::reader::InvertedIndexReader; +use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader}; use prost::Message; use store_api::storage::FileId; @@ -83,46 +84,86 @@ impl CachedInvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for CachedInvertedIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result> { + async fn range_read<'a>( + &self, + offset: u64, + size: u32, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let inner = &self.inner; - self.cache + let (result, cache_metrics) = self + .cache .get_or_load( self.file_id, self.blob_size, offset, size, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) - .await + .await?; + + if let Some(m) = metrics { + m.total_bytes += cache_metrics.page_bytes; + m.total_ranges += cache_metrics.num_pages; + m.cache_hit += cache_metrics.cache_hit; + m.cache_miss += cache_metrics.cache_miss; + m.fetch_elapsed += start.unwrap().elapsed(); + } + + Ok(result) } - async fn read_vec(&self, ranges: &[Range]) -> Result> { + async fn read_vec<'a>( + &self, + ranges: &[Range], + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { + let start = metrics.as_ref().map(|_| Instant::now()); + let mut pages = Vec::with_capacity(ranges.len()); + let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default(); for range in ranges { let inner = &self.inner; - let page = self + let (page, cache_metrics) = self .cache .get_or_load( self.file_id, self.blob_size, range.start, (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, + move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await?; + total_cache_metrics.merge(&cache_metrics); pages.push(Bytes::from(page)); } + if let Some(m) = metrics { + m.total_bytes += total_cache_metrics.page_bytes; + m.total_ranges += total_cache_metrics.num_pages; + m.cache_hit += total_cache_metrics.cache_hit; + m.cache_miss += total_cache_metrics.cache_miss; + m.fetch_elapsed += start.unwrap().elapsed(); + } + Ok(pages) } - async fn metadata(&self) -> Result> { + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { if let Some(cached) = self.cache.get_metadata(self.file_id) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + if let Some(m) = metrics { + m.cache_hit += 1; + } Ok(cached) } else { - let meta = self.inner.metadata().await?; + let meta = self.inner.metadata(metrics).await?; self.cache.put_metadata(self.file_id, meta.clone()); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) @@ -277,7 +318,7 @@ mod test { reader, Arc::new(InvertedIndexCache::new(8192, 8192, 50)), ); - let metadata = cached_reader.metadata().await.unwrap(); + let metadata = cached_reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 2); @@ -292,13 +333,19 @@ mod test { .fst( tag0.base_offset + tag0.relative_fst_offset as u64, tag0.fst_size, + None, ) .await .unwrap(); assert_eq!(fst0.len(), 3); let [offset, size] = unpack(fst0.get(b"a").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -307,7 +354,12 @@ mod test { ); let [offset, size] = unpack(fst0.get(b"b").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -316,7 +368,12 @@ mod test { ); let [offset, size] = unpack(fst0.get(b"c").unwrap()); let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag0.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -335,13 +392,19 @@ mod test { .fst( tag1.base_offset + tag1.relative_fst_offset as u64, tag1.fst_size, + None, ) .await .unwrap(); assert_eq!(fst1.len(), 3); let [offset, size] = unpack(fst1.get(b"x").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -350,7 +413,12 @@ mod test { ); let [offset, size] = unpack(fst1.get(b"y").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -359,7 +427,12 @@ mod test { ); let [offset, size] = unpack(fst1.get(b"z").unwrap()); let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) + .bitmap( + tag1.base_offset + offset as u64, + size, + BitmapType::Roaring, + None, + ) .await .unwrap(); assert_eq!( @@ -372,16 +445,16 @@ mod test { for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.random_range(0..file_size); let size = rng.random_range(0..file_size as u32 - offset as u32); - let expected = cached_reader.range_read(offset, size).await.unwrap(); + let expected = cached_reader.range_read(offset, size, None).await.unwrap(); let inner = &cached_reader.inner; - let read = cached_reader + let (read, _cache_metrics) = cached_reader .cache .get_or_load( cached_reader.file_id, file_size, offset, size, - |ranges| async move { inner.read_vec(&ranges).await }, + |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await .unwrap(); diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs index 05529db59b..925d547eb9 100644 --- a/src/mito2/src/engine/puffin_index.rs +++ b/src/mito2/src/engine/puffin_index.rs @@ -233,7 +233,7 @@ async fn collect_inverted_entries( InvertedIndexBlobReader::new(blob_reader), cache.clone(), ); - match reader.metadata().await { + match reader.metadata(None).await { Ok(metas) => metas, Err(err) => { warn!( @@ -247,7 +247,7 @@ async fn collect_inverted_entries( } } else { let reader = InvertedIndexBlobReader::new(blob_reader); - match reader.metadata().await { + match reader.metadata(None).await { Ok(metas) => metas, Err(err) => { warn!( @@ -318,10 +318,10 @@ async fn try_read_bloom_meta( bloom_reader, cache.clone(), ) - .metadata() + .metadata(None) .await } - _ => bloom_reader.metadata().await, + _ => bloom_reader.metadata(None).await, }; match result { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index de8875c4f6..5d4769703f 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -41,10 +41,14 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; +use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; +use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; +use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::flat_format::time_index_column_index; -use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; +use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics}; +use crate::sst::parquet::row_group::ParquetFetchMetrics; /// Verbose scan metrics for a partition. #[derive(Default)] @@ -81,6 +85,8 @@ pub(crate) struct ScanMetricsSet { // SST related metrics: /// Duration to build file ranges. build_parts_cost: Duration, + /// Duration to scan SST files. + sst_scan_cost: Duration, /// Number of row groups before filtering. rg_total: usize, /// Number of row groups filtered by fulltext index. @@ -126,6 +132,18 @@ pub(crate) struct ScanMetricsSet { /// The stream reached EOF stream_eof: bool, + + // Optional verbose metrics: + /// Inverted index apply metrics. + inverted_index_apply_metrics: Option, + /// Bloom filter index apply metrics. + bloom_filter_apply_metrics: Option, + /// Fulltext index apply metrics. + fulltext_index_apply_metrics: Option, + /// Parquet fetch metrics. + fetch_metrics: Option, + /// Metadata cache metrics. + metadata_cache_metrics: Option, } impl fmt::Debug for ScanMetricsSet { @@ -141,6 +159,7 @@ impl fmt::Debug for ScanMetricsSet { num_mem_ranges, num_file_ranges, build_parts_cost, + sst_scan_cost, rg_total, rg_fulltext_filtered, rg_inverted_filtered, @@ -166,6 +185,11 @@ impl fmt::Debug for ScanMetricsSet { mem_rows, mem_batches, mem_series, + inverted_index_apply_metrics, + bloom_filter_apply_metrics, + fulltext_index_apply_metrics, + fetch_metrics, + metadata_cache_metrics, } = self; // Write core metrics @@ -181,6 +205,7 @@ impl fmt::Debug for ScanMetricsSet { \"num_mem_ranges\":{num_mem_ranges}, \ \"num_file_ranges\":{num_file_ranges}, \ \"build_parts_cost\":\"{build_parts_cost:?}\", \ + \"sst_scan_cost\":\"{sst_scan_cost:?}\", \ \"rg_total\":{rg_total}, \ \"rows_before_filter\":{rows_before_filter}, \ \"num_sst_record_batches\":{num_sst_record_batches}, \ @@ -255,6 +280,33 @@ impl fmt::Debug for ScanMetricsSet { write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?; } + // Write optional verbose metrics if they are not empty + if let Some(metrics) = inverted_index_apply_metrics + && !metrics.is_empty() + { + write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = bloom_filter_apply_metrics + && !metrics.is_empty() + { + write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = fulltext_index_apply_metrics + && !metrics.is_empty() + { + write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = fetch_metrics + && !metrics.is_empty() + { + write!(f, ", \"fetch_metrics\":{:?}", metrics)?; + } + if let Some(metrics) = metadata_cache_metrics + && !metrics.is_empty() + { + write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?; + } + write!(f, ", \"stream_eof\":{stream_eof}}}") } } @@ -304,14 +356,20 @@ impl ScanMetricsSet { rows_inverted_filtered, rows_bloom_filtered, rows_precise_filtered, + inverted_index_apply_metrics, + bloom_filter_apply_metrics, + fulltext_index_apply_metrics, }, num_record_batches, num_batches, num_rows, - scan_cost: _, + scan_cost, + metadata_cache_metrics, + fetch_metrics, } = other; self.build_parts_cost += *build_cost; + self.sst_scan_cost += *scan_cost; self.rg_total += *rg_total; self.rg_fulltext_filtered += *rg_fulltext_filtered; @@ -328,6 +386,31 @@ impl ScanMetricsSet { self.num_sst_record_batches += *num_record_batches; self.num_sst_batches += *num_batches; self.num_sst_rows += *num_rows; + + // Merge optional verbose metrics + if let Some(metrics) = inverted_index_apply_metrics { + self.inverted_index_apply_metrics + .get_or_insert_with(InvertedIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = bloom_filter_apply_metrics { + self.bloom_filter_apply_metrics + .get_or_insert_with(BloomFilterIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = fulltext_index_apply_metrics { + self.fulltext_index_apply_metrics + .get_or_insert_with(FulltextIndexApplyMetrics::default) + .merge_from(metrics); + } + if let Some(metrics) = fetch_metrics { + self.fetch_metrics + .get_or_insert_with(ParquetFetchMetrics::default) + .merge_from(metrics); + } + self.metadata_cache_metrics + .get_or_insert_with(MetadataCacheMetrics::default) + .merge_from(metadata_cache_metrics); } /// Sets distributor metrics. @@ -615,6 +698,11 @@ impl PartitionMetrics { let mut metrics_set = self.0.metrics.lock().unwrap(); metrics_set.set_distributor_metrics(metrics); } + + /// Returns whether verbose explain is enabled. + pub(crate) fn explain_verbose(&self) -> bool { + self.0.explain_verbose + } } impl fmt::Debug for PartitionMetrics { @@ -768,6 +856,21 @@ fn can_split_series(num_rows: u64, num_series: u64) -> bool { num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD } +/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized +/// based on the `explain_verbose` flag. +fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics { + if explain_verbose { + ReaderFilterMetrics { + inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()), + bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()), + fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()), + ..Default::default() + } + } else { + ReaderFilterMetrics::default() + } +} + /// Scans file ranges at `index`. pub(crate) async fn scan_file_ranges( stream_ctx: Arc, @@ -776,7 +879,10 @@ pub(crate) async fn scan_file_ranges( read_type: &'static str, range_builder: Arc, ) -> Result>> { - let mut reader_metrics = ReaderMetrics::default(); + let mut reader_metrics = ReaderMetrics { + filter_metrics: new_filter_metrics(part_metrics.explain_verbose()), + ..Default::default() + }; let ranges = range_builder .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; @@ -799,7 +905,10 @@ pub(crate) async fn scan_flat_file_ranges( read_type: &'static str, range_builder: Arc, ) -> Result>> { - let mut reader_metrics = ReaderMetrics::default(); + let mut reader_metrics = ReaderMetrics { + filter_metrics: new_filter_metrics(part_metrics.explain_verbose()), + ..Default::default() + }; let ranges = range_builder .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .await?; @@ -822,10 +931,18 @@ pub fn build_file_range_scan_stream( ranges: SmallVec<[FileRange; 2]>, ) -> impl Stream> { try_stream! { - let reader_metrics = &mut ReaderMetrics::default(); + let fetch_metrics = if part_metrics.explain_verbose() { + Some(Arc::new(ParquetFetchMetrics::default())) + } else { + None + }; + let reader_metrics = &mut ReaderMetrics { + fetch_metrics: fetch_metrics.clone(), + ..Default::default() + }; for range in ranges { let build_reader_start = Instant::now(); - let reader = range.reader(stream_ctx.input.series_row_selector).await?; + let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); let compat_batch = range.compat_batch(); @@ -857,10 +974,18 @@ pub fn build_flat_file_range_scan_stream( ranges: SmallVec<[FileRange; 2]>, ) -> impl Stream> { try_stream! { - let reader_metrics = &mut ReaderMetrics::default(); + let fetch_metrics = if part_metrics.explain_verbose() { + Some(Arc::new(ParquetFetchMetrics::default())) + } else { + None + }; + let reader_metrics = &mut ReaderMetrics { + fetch_metrics: fetch_metrics.clone(), + ..Default::default() + }; for range in ranges { let build_reader_start = Instant::now(); - let mut reader = range.flat_reader().await?; + let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?; let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 3fa387c8dc..ec4fb6125d 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -17,11 +17,14 @@ mod builder; use std::collections::BTreeMap; use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; -use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; +use index::bloom_filter::reader::{ + BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl, +}; use index::target::IndexTarget; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; @@ -47,6 +50,62 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +/// Metrics for tracking bloom filter index apply operations. +#[derive(Default, Clone)] +pub struct BloomFilterIndexApplyMetrics { + /// Total time spent applying the index. + pub apply_elapsed: std::time::Duration, + /// Number of blob cache misses. + pub blob_cache_miss: usize, + /// Total size of blobs read (in bytes). + pub blob_read_bytes: u64, + /// Metrics for bloom filter read operations. + pub read_metrics: BloomFilterReadMetrics, +} + +impl std::fmt::Debug for BloomFilterIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + apply_elapsed, + blob_cache_miss, + blob_read_bytes, + read_metrics, + } = self; + + if self.is_empty() { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?; + + if *blob_cache_miss > 0 { + write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?; + } + if *blob_read_bytes > 0 { + write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?; + } + write!(f, ", \"read_metrics\":{:?}", read_metrics)?; + + write!(f, "}}") + } +} + +impl BloomFilterIndexApplyMetrics { + /// Returns true if the metrics are empty (contain no meaningful data). + pub fn is_empty(&self) -> bool { + self.apply_elapsed.is_zero() + } + + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.blob_read_bytes += other.blob_read_bytes; + self.read_metrics.merge_from(&other.read_metrics); + } +} + pub(crate) type BloomFilterIndexApplierRef = Arc; /// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file. @@ -133,15 +192,20 @@ impl BloomFilterIndexApplier { /// /// Row group id existing in the returned result means that the row group is searched. /// Empty ranges means that the row group is searched but no rows are found. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `row_groups` - Iterator of row group lengths and whether to search in the row group + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, file_id: RegionFileId, file_size_hint: Option, row_groups: impl Iterator, + mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> Result>)>> { - let _timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) - .start_timer(); + let apply_start = Instant::now(); // Calculates row groups' ranges based on start of the file. let mut input = Vec::with_capacity(row_groups.size_hint().0); @@ -163,7 +227,7 @@ impl BloomFilterIndexApplier { for (column_id, predicates) in self.predicates.iter() { let blob = match self - .blob_reader(file_id, *column_id, file_size_hint) + .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut()) .await? { Some(blob) => blob, @@ -173,6 +237,9 @@ impl BloomFilterIndexApplier { // Create appropriate reader based on whether we have caching enabled if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + if let Some(m) = &mut metrics { + m.blob_read_bytes += blob_size; + } let reader = CachedBloomFilterIndexBlobReader::new( file_id.file_id(), *column_id, @@ -181,12 +248,12 @@ impl BloomFilterIndexApplier { BloomFilterReaderImpl::new(blob), bloom_filter_cache.clone(), ); - self.apply_predicates(reader, predicates, &mut output) + self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut()) .await .context(ApplyBloomFilterIndexSnafu)?; } else { let reader = BloomFilterReaderImpl::new(blob); - self.apply_predicates(reader, predicates, &mut output) + self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut()) .await .context(ApplyBloomFilterIndexSnafu)?; } @@ -201,6 +268,16 @@ impl BloomFilterIndexApplier { } } + // Record elapsed time to histogram and collect metrics if requested + let elapsed = apply_start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_BLOOM_FILTER_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(m) = metrics { + m.apply_elapsed += elapsed; + } + Ok(output) } @@ -212,6 +289,7 @@ impl BloomFilterIndexApplier { file_id: RegionFileId, column_id: ColumnId, file_size_hint: Option, + metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> Result> { let reader = match self .cached_blob_reader(file_id, column_id, file_size_hint) @@ -219,6 +297,9 @@ impl BloomFilterIndexApplier { { Ok(Some(puffin_reader)) => puffin_reader, other => { + if let Some(m) = metrics { + m.blob_cache_miss += 1; + } if let Err(err) = other { // Blob not found means no index for this column if is_blob_not_found(&err) { @@ -320,6 +401,7 @@ impl BloomFilterIndexApplier { reader: R, predicates: &[InListPredicate], output: &mut [(usize, Vec>)], + mut metrics: Option<&mut BloomFilterIndexApplyMetrics>, ) -> std::result::Result<(), index::bloom_filter::error::Error> { let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; @@ -329,7 +411,10 @@ impl BloomFilterIndexApplier { continue; } - *row_group_output = applier.search(predicates, row_group_output).await?; + let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics); + *row_group_output = applier + .search(predicates, row_group_output, read_metrics) + .await?; } Ok(()) @@ -393,7 +478,7 @@ mod tests { let applier = builder.build(&exprs).unwrap().unwrap(); applier - .apply(file_id, None, row_groups.into_iter()) + .apply(file_id, None, row_groups.into_iter(), None) .await .unwrap() .into_iter() diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 0d16a21d7c..83ea788204 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -637,17 +637,17 @@ pub(crate) mod tests { .unwrap(); let reader = blob_guard.reader().await.unwrap(); let bloom_filter = BloomFilterReaderImpl::new(reader); - let metadata = bloom_filter.metadata().await.unwrap(); + let metadata = bloom_filter.metadata(None).await.unwrap(); assert_eq!(metadata.segment_count, 10); for i in 0..5 { let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); assert!(bf.contains(b"tag1")); } for i in 5..10 { let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); assert!(bf.contains(b"tag2")); } } @@ -662,13 +662,13 @@ pub(crate) mod tests { .unwrap(); let reader = blob_guard.reader().await.unwrap(); let bloom_filter = BloomFilterReaderImpl::new(reader); - let metadata = bloom_filter.metadata().await.unwrap(); + let metadata = bloom_filter.metadata(None).await.unwrap(); assert_eq!(metadata.segment_count, 5); for i in 0u64..20 { let idx = i as usize / 4; let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); let mut buf = vec![]; IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) .unwrap(); diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 6b68fc348d..86949bf039 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -16,11 +16,12 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::iter; use std::ops::Range; use std::sync::Arc; +use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; -use index::bloom_filter::reader::BloomFilterReaderImpl; +use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl}; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer}; use index::fulltext_index::{Analyzer, Config}; @@ -53,6 +54,95 @@ use crate::sst::index::puffin_manager::{ pub mod builder; +/// Metrics for tracking fulltext index apply operations. +#[derive(Default, Clone)] +pub struct FulltextIndexApplyMetrics { + /// Total time spent applying the index. + pub apply_elapsed: std::time::Duration, + /// Number of blob cache misses. + pub blob_cache_miss: usize, + /// Number of directory cache hits. + pub dir_cache_hit: usize, + /// Number of directory cache misses. + pub dir_cache_miss: usize, + /// Elapsed time to initialize directory data. + pub dir_init_elapsed: std::time::Duration, + /// Metrics for bloom filter reads. + pub bloom_filter_read_metrics: BloomFilterReadMetrics, +} + +impl std::fmt::Debug for FulltextIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + apply_elapsed, + blob_cache_miss, + dir_cache_hit, + dir_cache_miss, + dir_init_elapsed, + bloom_filter_read_metrics, + } = self; + + if self.is_empty() { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?; + + if *blob_cache_miss > 0 { + write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?; + } + if *dir_cache_hit > 0 { + write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?; + } + if *dir_cache_miss > 0 { + write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?; + } + if !dir_init_elapsed.is_zero() { + write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?; + } + write!( + f, + ", \"bloom_filter_read_metrics\":{:?}", + bloom_filter_read_metrics + )?; + + write!(f, "}}") + } +} + +impl FulltextIndexApplyMetrics { + /// Returns true if the metrics are empty (contain no meaningful data). + pub fn is_empty(&self) -> bool { + self.apply_elapsed.is_zero() + } + + /// Collects metrics from a directory read operation. + pub fn collect_dir_metrics( + &mut self, + elapsed: std::time::Duration, + dir_metrics: puffin::puffin_manager::DirMetrics, + ) { + self.dir_init_elapsed += elapsed; + if dir_metrics.cache_hit { + self.dir_cache_hit += 1; + } else { + self.dir_cache_miss += 1; + } + } + + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.dir_cache_hit += other.dir_cache_hit; + self.dir_cache_miss += other.dir_cache_miss; + self.dir_init_elapsed += other.dir_init_elapsed; + self.bloom_filter_read_metrics + .merge_from(&other.bloom_filter_read_metrics); + } +} + /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files pub struct FulltextIndexApplier { /// Requests to be applied. @@ -124,14 +214,18 @@ impl FulltextIndexApplier { impl FulltextIndexApplier { /// Applies fine-grained fulltext index to the specified SST file. /// Returns the row ids that match the queries. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply_fine( &self, file_id: RegionFileId, file_size_hint: Option, + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { - let timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_FULLTEXT_INDEX]) - .start_timer(); + let apply_start = Instant::now(); let mut row_ids: Option> = None; for (column_id, request) in self.requests.iter() { @@ -140,7 +234,13 @@ impl FulltextIndexApplier { } let Some(result) = self - .apply_fine_one_column(file_size_hint, file_id, *column_id, request) + .apply_fine_one_column( + file_size_hint, + file_id, + *column_id, + request, + metrics.as_deref_mut(), + ) .await? else { continue; @@ -159,9 +259,16 @@ impl FulltextIndexApplier { } } - if row_ids.is_none() { - timer.stop_and_discard(); + // Record elapsed time to histogram and collect metrics if requested + let elapsed = apply_start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_FULLTEXT_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(m) = metrics { + m.apply_elapsed += elapsed; } + Ok(row_ids) } @@ -171,6 +278,7 @@ impl FulltextIndexApplier { file_id: RegionFileId, column_id: ColumnId, request: &FulltextRequest, + metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { let blob_key = format!( "{INDEX_BLOB_TYPE_TANTIVY}-{}", @@ -178,7 +286,7 @@ impl FulltextIndexApplier { ); let dir = self .index_source - .dir(file_id, &blob_key, file_size_hint) + .dir(file_id, &blob_key, file_size_hint, metrics) .await?; let dir = match &dir { @@ -240,15 +348,20 @@ impl FulltextIndexApplier { /// /// Row group id existing in the returned result means that the row group is searched. /// Empty ranges means that the row group is searched but no rows are found. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `row_groups` - Iterator of row group lengths and whether to search in the row group + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply_coarse( &self, file_id: RegionFileId, file_size_hint: Option, row_groups: impl Iterator, + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>)>>> { - let timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_FULLTEXT_INDEX]) - .start_timer(); + let apply_start = Instant::now(); let (input, mut output) = Self::init_coarse_output(row_groups); let mut applied = false; @@ -266,16 +379,27 @@ impl FulltextIndexApplier { *column_id, &request.terms, &mut output, + metrics.as_deref_mut(), ) .await?; } if !applied { - timer.stop_and_discard(); return Ok(None); } Self::adjust_coarse_output(input, &mut output); + + // Record elapsed time to histogram and collect metrics if requested + let elapsed = apply_start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_FULLTEXT_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(m) = metrics { + m.apply_elapsed += elapsed; + } + Ok(Some(output)) } @@ -286,6 +410,7 @@ impl FulltextIndexApplier { column_id: ColumnId, terms: &[FulltextTerm], output: &mut [(usize, Vec>)], + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result { let blob_key = format!( "{INDEX_BLOB_TYPE_BLOOM}-{}", @@ -293,7 +418,7 @@ impl FulltextIndexApplier { ); let Some(reader) = self .index_source - .blob(file_id, &blob_key, file_size_hint) + .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut()) .await? else { return Ok(false); @@ -336,7 +461,13 @@ impl FulltextIndexApplier { } *row_group_output = applier - .search(&predicates, row_group_output) + .search( + &predicates, + row_group_output, + metrics + .as_deref_mut() + .map(|m| &mut m.bloom_filter_read_metrics), + ) .await .context(ApplyBloomFilterIndexSnafu)?; } @@ -483,8 +614,15 @@ impl IndexSource { file_id: RegionFileId, key: &str, file_size_hint: Option, + metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; + + // Track cache miss if fallbacked to remote + if fallbacked && let Some(m) = metrics { + m.blob_cache_miss += 1; + } + let res = reader.blob(key).await; match res { Ok(blob) => Ok(Some(blob)), @@ -514,11 +652,25 @@ impl IndexSource { file_id: RegionFileId, key: &str, file_size_hint: Option, + mut metrics: Option<&mut FulltextIndexApplyMetrics>, ) -> Result>> { let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; + + // Track cache miss if fallbacked to remote + if fallbacked && let Some(m) = &mut metrics { + m.blob_cache_miss += 1; + } + + let start = metrics.as_ref().map(|_| Instant::now()); let res = reader.dir(key).await; match res { - Ok(dir) => Ok(Some(dir)), + Ok((dir, dir_metrics)) => { + if let Some(m) = metrics { + // Safety: start is Some when metrics is Some + m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics); + } + Ok(Some(dir)) + } Err(err) if err.is_blob_not_found() => Ok(None), Err(err) => { if fallbacked { @@ -526,9 +678,16 @@ impl IndexSource { } else { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file."); let reader = self.build_remote(file_id, file_size_hint).await?; + let start = metrics.as_ref().map(|_| Instant::now()); let res = reader.dir(key).await; match res { - Ok(dir) => Ok(Some(dir)), + Ok((dir, dir_metrics)) => { + if let Some(m) = metrics { + // Safety: start is Some when metrics is Some + m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics); + } + Ok(Some(dir)) + } Err(err) if err.is_blob_not_found() => Ok(None), Err(err) => Err(err).context(PuffinReadBlobSnafu), } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 2efa154ec4..32ad178d3b 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -723,15 +723,16 @@ mod tests { let backend = backend.clone(); async move { match backend { - FulltextBackend::Tantivy => { - applier.apply_fine(region_file_id, None).await.unwrap() - } + FulltextBackend::Tantivy => applier + .apply_fine(region_file_id, None, None) + .await + .unwrap(), FulltextBackend::Bloom => { let coarse_mask = coarse_mask.unwrap_or_default(); let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i])); // row group id == row id let resp = applier - .apply_coarse(region_file_id, None, row_groups) + .apply_coarse(region_file_id, None, row_groups, None) .await .unwrap(); resp.map(|r| { diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 350880cc9f..f22b886131 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -16,10 +16,11 @@ pub mod builder; use std::collections::BTreeMap; use std::sync::Arc; +use std::time::Instant; use common_base::range_read::RangeReader; use common_telemetry::warn; -use index::inverted_index::format::reader::InvertedIndexBlobReader; +use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics}; use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; @@ -44,6 +45,67 @@ use crate::sst::index::TYPE_INVERTED_INDEX; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +/// Metrics for tracking inverted index apply operations. +#[derive(Default, Clone)] +pub struct InvertedIndexApplyMetrics { + /// Total time spent applying the index. + pub apply_elapsed: std::time::Duration, + /// Number of blob cache misses (0 or 1). + pub blob_cache_miss: usize, + /// Total size of blobs read (in bytes). + pub blob_read_bytes: u64, + /// Metrics for inverted index reads. + pub inverted_index_read_metrics: InvertedIndexReadMetrics, +} + +impl std::fmt::Debug for InvertedIndexApplyMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + apply_elapsed, + blob_cache_miss, + blob_read_bytes, + inverted_index_read_metrics, + } = self; + + if self.is_empty() { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?; + + if *blob_cache_miss > 0 { + write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?; + } + if *blob_read_bytes > 0 { + write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?; + } + write!( + f, + ", \"inverted_index_read_metrics\":{:?}", + inverted_index_read_metrics + )?; + + write!(f, "}}") + } +} + +impl InvertedIndexApplyMetrics { + /// Returns true if the metrics are empty (contain no meaningful data). + pub fn is_empty(&self) -> bool { + self.apply_elapsed.is_zero() + } + + /// Merges another metrics into this one. + pub fn merge_from(&mut self, other: &Self) { + self.apply_elapsed += other.apply_elapsed; + self.blob_cache_miss += other.blob_cache_miss; + self.blob_read_bytes += other.blob_read_bytes; + self.inverted_index_read_metrics + .merge_from(&other.inverted_index_read_metrics); + } +} + /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. pub(crate) struct InvertedIndexApplier { @@ -124,24 +186,30 @@ impl InvertedIndexApplier { self } - /// Applies predicates to the provided SST file id and returns the relevant row group ids + /// Applies predicates to the provided SST file id and returns the relevant row group ids. + /// + /// # Arguments + /// * `file_id` - The region file ID to apply predicates to + /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads + /// * `metrics` - Optional mutable reference to collect metrics on demand pub async fn apply( &self, file_id: RegionFileId, file_size_hint: Option, + mut metrics: Option<&mut InvertedIndexApplyMetrics>, ) -> Result { - let _timer = INDEX_APPLY_ELAPSED - .with_label_values(&[TYPE_INVERTED_INDEX]) - .start_timer(); + let start = Instant::now(); let context = SearchContext { // Encountering a non-existing column indicates that it doesn't match predicates. index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, }; + let mut cache_miss = 0; let blob = match self.cached_blob_reader(file_id, file_size_hint).await { Ok(Some(puffin_reader)) => puffin_reader, other => { + cache_miss += 1; if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } @@ -149,8 +217,9 @@ impl InvertedIndexApplier { } }; - if let Some(index_cache) = &self.inverted_index_cache { - let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; + + let result = if let Some(index_cache) = &self.inverted_index_cache { let mut index_reader = CachedInvertedIndexBlobReader::new( file_id.file_id(), blob_size, @@ -158,16 +227,42 @@ impl InvertedIndexApplier { index_cache.clone(), ); self.index_applier - .apply(context, &mut index_reader) + .apply( + context, + &mut index_reader, + metrics + .as_deref_mut() + .map(|m| &mut m.inverted_index_read_metrics), + ) .await .context(ApplyInvertedIndexSnafu) } else { let mut index_reader = InvertedIndexBlobReader::new(blob); self.index_applier - .apply(context, &mut index_reader) + .apply( + context, + &mut index_reader, + metrics + .as_deref_mut() + .map(|m| &mut m.inverted_index_read_metrics), + ) .await .context(ApplyInvertedIndexSnafu) + }; + + // Record elapsed time to histogram and collect metrics if requested + let elapsed = start.elapsed(); + INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_INVERTED_INDEX]) + .observe(elapsed.as_secs_f64()); + + if let Some(metrics) = metrics { + metrics.apply_elapsed = elapsed; + metrics.blob_cache_miss = cache_miss; + metrics.blob_read_bytes = blob_size; } + + result } /// Creates a blob reader from the cached index file. @@ -281,7 +376,7 @@ mod tests { let mut mock_index_applier = MockIndexApplier::new(); mock_index_applier.expect_memory_usage().returning(|| 100); - mock_index_applier.expect_apply().returning(|_, _| { + mock_index_applier.expect_apply().returning(|_, _, _| { Ok(ApplyOutput { matched_segment_ids: Bitmap::new_bitvec(), total_row_count: 100, @@ -297,7 +392,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let output = sst_index_applier.apply(file_id, None).await.unwrap(); + let output = sst_index_applier.apply(file_id, None, None).await.unwrap(); assert_eq!( output, ApplyOutput { @@ -345,7 +440,7 @@ mod tests { puffin_manager_factory, Default::default(), ); - let res = sst_index_applier.apply(file_id, None).await; + let res = sst_index_applier.apply(file_id, None, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index f31cfaf1dc..a784d01c21 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -615,7 +615,7 @@ mod tests { .unwrap(); Box::pin(async move { applier - .apply(sst_file_id, None) + .apply(sst_file_id, None, None) .await .unwrap() .matched_segment_ids diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 3f8d3f8819..0a9d25e1fe 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -245,7 +245,7 @@ mod tests { let bs = blob_reader.read(0..meta.content_length).await.unwrap(); assert_eq!(&*bs, raw_data); - let dir_guard = reader.dir(dir_key).await.unwrap(); + let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap(); let file = dir_guard.path().join("hello"); let data = tokio::fs::read(file).await.unwrap(); assert_eq!(data, raw_data); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 689a8de599..46cd53e6ea 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -45,6 +45,7 @@ use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{ FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, }; +use crate::sst::parquet::row_group::ParquetFetchMetrics; /// Checks if a row group contains delete operations by examining the min value of op_type column. /// @@ -117,11 +118,16 @@ impl FileRange { pub(crate) async fn reader( &self, selector: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { let parquet_reader = self .context .reader_builder - .build(self.row_group_idx, self.row_selection.clone()) + .build( + self.row_group_idx, + self.row_selection.clone(), + fetch_metrics, + ) .await?; let use_last_row_reader = if selector @@ -168,11 +174,18 @@ impl FileRange { } /// Creates a flat reader that returns RecordBatch. - pub(crate) async fn flat_reader(&self) -> Result { + pub(crate) async fn flat_reader( + &self, + fetch_metrics: Option<&ParquetFetchMetrics>, + ) -> Result { let parquet_reader = self .context .reader_builder - .build(self.row_group_idx, self.row_selection.clone()) + .build( + self.row_group_idx, + self.row_selection.clone(), + fetch_metrics, + ) .await?; // Compute skip_fields once for this row group diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 2c77145e5b..769bb0fd2b 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -52,15 +52,21 @@ use crate::metrics::{ use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; -use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; -use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; -use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; +use crate::sst::index::bloom_filter::applier::{ + BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics, +}; +use crate::sst::index::fulltext_index::applier::{ + FulltextIndexApplierRef, FulltextIndexApplyMetrics, +}; +use crate::sst::index::inverted_index::applier::{ + InvertedIndexApplierRef, InvertedIndexApplyMetrics, +}; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; -use crate::sst::parquet::row_group::InMemoryRowGroup; +use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -253,7 +259,9 @@ impl ParquetReaderBuilder { let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. - let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; + let parquet_meta = self + .read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics) + .await?; // Decodes region metadata. let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); // Gets the metadata stored in the SST. @@ -378,25 +386,34 @@ impl ParquetReaderBuilder { &self, file_path: &str, file_size: u64, + cache_metrics: &mut MetadataCacheMetrics, ) -> Result> { + let start = Instant::now(); let _t = READ_STAGE_ELAPSED .with_label_values(&["read_parquet_metadata"]) .start_timer(); let file_id = self.file_handle.file_id(); - // Tries to get from global cache. - if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await { + // Tries to get from cache with metrics tracking. + if let Some(metadata) = self + .cache_strategy + .get_parquet_meta_data(file_id, cache_metrics) + .await + { + cache_metrics.metadata_load_cost += start.elapsed(); return Ok(metadata); } // Cache miss, load metadata directly. let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata = metadata_loader.load().await?; + let metadata = Arc::new(metadata); // Cache the metadata. self.cache_strategy .put_parquet_meta_data(file_id, metadata.clone()); + cache_metrics.metadata_load_cost += start.elapsed(); Ok(metadata) } @@ -527,7 +544,11 @@ impl ParquetReaderBuilder { // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_res = index_applier - .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) + .apply_fine( + self.file_handle.file_id(), + Some(file_size_hint), + metrics.fulltext_index_apply_metrics.as_mut(), + ) .await; let selection = match apply_res { Ok(Some(res)) => { @@ -595,13 +616,17 @@ impl ParquetReaderBuilder { // Slow path: apply the index from the file. let file_size_hint = self.file_handle.meta_ref().index_file_size(); let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint)) + .apply( + self.file_handle.file_id(), + Some(file_size_hint), + metrics.inverted_index_apply_metrics.as_mut(), + ) .await; let selection = match apply_res { - Ok(output) => RowGroupSelection::from_inverted_index_apply_output( + Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output( row_group_size, num_row_groups, - output, + apply_output, ), Err(err) => { handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); @@ -670,7 +695,12 @@ impl ParquetReaderBuilder { ) }); let apply_res = index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) + .apply( + self.file_handle.file_id(), + Some(file_size_hint), + rgs, + metrics.bloom_filter_apply_metrics.as_mut(), + ) .await; let mut selection = match apply_res { Ok(apply_output) => { @@ -748,7 +778,12 @@ impl ParquetReaderBuilder { ) }); let apply_res = index_applier - .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) + .apply_coarse( + self.file_handle.file_id(), + Some(file_size_hint), + rgs, + metrics.fulltext_index_apply_metrics.as_mut(), + ) .await; let mut selection = match apply_res { Ok(Some(apply_output)) => { @@ -892,7 +927,7 @@ fn all_required_row_groups_searched( } /// Metrics of filtering rows groups and rows. -#[derive(Debug, Default, Clone, Copy)] +#[derive(Debug, Default, Clone)] pub(crate) struct ReaderFilterMetrics { /// Number of row groups before filtering. pub(crate) rg_total: usize, @@ -915,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics { pub(crate) rows_bloom_filtered: usize, /// Number of rows filtered by precise filter. pub(crate) rows_precise_filtered: usize, + + /// Optional metrics for inverted index applier. + pub(crate) inverted_index_apply_metrics: Option, + /// Optional metrics for bloom filter index applier. + pub(crate) bloom_filter_apply_metrics: Option, + /// Optional metrics for fulltext index applier. + pub(crate) fulltext_index_apply_metrics: Option, } impl ReaderFilterMetrics { @@ -931,6 +973,23 @@ impl ReaderFilterMetrics { self.rows_inverted_filtered += other.rows_inverted_filtered; self.rows_bloom_filtered += other.rows_bloom_filtered; self.rows_precise_filtered += other.rows_precise_filtered; + + // Merge optional applier metrics + if let Some(other_metrics) = &other.inverted_index_apply_metrics { + self.inverted_index_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } + if let Some(other_metrics) = &other.bloom_filter_apply_metrics { + self.bloom_filter_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } + if let Some(other_metrics) = &other.fulltext_index_apply_metrics { + self.fulltext_index_apply_metrics + .get_or_insert_with(Default::default) + .merge_from(other_metrics); + } } /// Reports metrics. @@ -987,6 +1046,64 @@ impl ReaderFilterMetrics { } } +/// Metrics for parquet metadata cache operations. +#[derive(Default, Clone, Copy)] +pub(crate) struct MetadataCacheMetrics { + /// Number of memory cache hits for parquet metadata. + pub(crate) mem_cache_hit: usize, + /// Number of file cache hits for parquet metadata. + pub(crate) file_cache_hit: usize, + /// Number of cache misses for parquet metadata. + pub(crate) cache_miss: usize, + /// Duration to load parquet metadata. + pub(crate) metadata_load_cost: Duration, +} + +impl std::fmt::Debug for MetadataCacheMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + mem_cache_hit, + file_cache_hit, + cache_miss, + metadata_load_cost, + } = self; + + if self.is_empty() { + return write!(f, "{{}}"); + } + write!(f, "{{")?; + + write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?; + + if *mem_cache_hit > 0 { + write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?; + } + if *file_cache_hit > 0 { + write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?; + } + if *cache_miss > 0 { + write!(f, ", \"cache_miss\":{}", cache_miss)?; + } + + write!(f, "}}") + } +} + +impl MetadataCacheMetrics { + /// Returns true if the metrics are empty (contain no meaningful data). + pub(crate) fn is_empty(&self) -> bool { + self.metadata_load_cost.is_zero() + } + + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) { + self.mem_cache_hit += other.mem_cache_hit; + self.file_cache_hit += other.file_cache_hit; + self.cache_miss += other.cache_miss; + self.metadata_load_cost += other.metadata_load_cost; + } +} + /// Parquet reader metrics. #[derive(Debug, Default, Clone)] pub struct ReaderMetrics { @@ -1002,6 +1119,10 @@ pub struct ReaderMetrics { pub(crate) num_batches: usize, /// Number of rows read. pub(crate) num_rows: usize, + /// Metrics for parquet metadata cache. + pub(crate) metadata_cache_metrics: MetadataCacheMetrics, + /// Optional metrics for page/row group fetch operations. + pub(crate) fetch_metrics: Option>, } impl ReaderMetrics { @@ -1013,6 +1134,15 @@ impl ReaderMetrics { self.num_record_batches += other.num_record_batches; self.num_batches += other.num_batches; self.num_rows += other.num_rows; + self.metadata_cache_metrics + .merge_from(&other.metadata_cache_metrics); + if let Some(other_fetch) = &other.fetch_metrics { + if let Some(self_fetch) = &self.fetch_metrics { + self_fetch.merge_from(other_fetch); + } else { + self.fetch_metrics = Some(other_fetch.clone()); + } + } } /// Reports total rows. @@ -1067,7 +1197,10 @@ impl RowGroupReaderBuilder { &self, row_group_idx: usize, row_selection: Option, + fetch_metrics: Option<&ParquetFetchMetrics>, ) -> Result { + let fetch_start = Instant::now(); + let mut row_group = InMemoryRowGroup::create( self.file_handle.region_id(), self.file_handle.file_id().file_id(), @@ -1079,12 +1212,17 @@ impl RowGroupReaderBuilder { ); // Fetches data into memory. row_group - .fetch(&self.projection, row_selection.as_ref()) + .fetch(&self.projection, row_selection.as_ref(), fetch_metrics) .await .context(ReadParquetSnafu { path: &self.file_path, })?; + // Record total fetch elapsed time. + if let Some(metrics) = fetch_metrics { + metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed(); + } + // Builds the parquet reader. // Now the row selection is None. ParquetRecordBatchReader::try_new_with_row_groups( @@ -1228,6 +1366,8 @@ pub struct ParquetReader { selection: RowGroupSelection, /// Reader of current row group. reader_state: ReaderState, + /// Metrics for tracking row group fetch operations. + fetch_metrics: ParquetFetchMetrics, } #[async_trait] @@ -1247,7 +1387,11 @@ impl BatchReader for ParquetReader { let parquet_reader = self .context .reader_builder() - .build(row_group_idx, Some(row_selection)) + .build( + row_group_idx, + Some(row_selection), + Some(&self.fetch_metrics), + ) .await?; // Resets the parquet reader. @@ -1303,11 +1447,12 @@ impl ParquetReader { context: FileRangeContextRef, mut selection: RowGroupSelection, ) -> Result { + let fetch_metrics = ParquetFetchMetrics::default(); // No more items in current row group, reads next row group. let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let parquet_reader = context .reader_builder() - .build(row_group_idx, Some(row_selection)) + .build(row_group_idx, Some(row_selection), Some(&fetch_metrics)) .await?; // Compute skip_fields once for this row group let skip_fields = context.should_skip_fields(row_group_idx); @@ -1324,6 +1469,7 @@ impl ParquetReader { context, selection, reader_state, + fetch_metrics, }) } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index d10526057d..b8baf7960f 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -35,6 +35,175 @@ use crate::cache::{CacheStrategy, PageKey, PageValue}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges}; +/// Inner data for ParquetFetchMetrics. +#[derive(Default, Debug, Clone)] +pub struct ParquetFetchMetricsData { + /// Number of page cache hits. + pub page_cache_hit: usize, + /// Number of write cache hits. + pub write_cache_hit: usize, + /// Number of cache misses. + pub cache_miss: usize, + /// Number of pages to fetch from mem cache. + pub pages_to_fetch_mem: usize, + /// Total size in bytes of pages to fetch from mem cache. + pub page_size_to_fetch_mem: u64, + /// Number of pages to fetch from write cache. + pub pages_to_fetch_write_cache: usize, + /// Total size in bytes of pages to fetch from write cache. + pub page_size_to_fetch_write_cache: u64, + /// Number of pages to fetch from store. + pub pages_to_fetch_store: usize, + /// Total size in bytes of pages to fetch from store. + pub page_size_to_fetch_store: u64, + /// Total size in bytes of pages actually returned. + pub page_size_needed: u64, + /// Elapsed time fetching from write cache. + pub write_cache_fetch_elapsed: std::time::Duration, + /// Elapsed time fetching from object store. + pub store_fetch_elapsed: std::time::Duration, + /// Total elapsed time for fetching row groups. + pub total_fetch_elapsed: std::time::Duration, +} + +impl ParquetFetchMetricsData { + /// Returns true if the metrics are empty (contain no meaningful data). + fn is_empty(&self) -> bool { + self.total_fetch_elapsed.is_zero() + } +} + +/// Metrics for tracking page/row group fetch operations. +#[derive(Default)] +pub struct ParquetFetchMetrics { + pub data: std::sync::Mutex, +} + +impl std::fmt::Debug for ParquetFetchMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let data = self.data.lock().unwrap(); + if data.is_empty() { + return write!(f, "{{}}"); + } + + let ParquetFetchMetricsData { + page_cache_hit, + write_cache_hit, + cache_miss, + pages_to_fetch_mem, + page_size_to_fetch_mem, + pages_to_fetch_write_cache, + page_size_to_fetch_write_cache, + pages_to_fetch_store, + page_size_to_fetch_store, + page_size_needed, + write_cache_fetch_elapsed, + store_fetch_elapsed, + total_fetch_elapsed, + } = *data; + + write!(f, "{{")?; + + write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?; + + if page_cache_hit > 0 { + write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?; + } + if write_cache_hit > 0 { + write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?; + } + if cache_miss > 0 { + write!(f, ", \"cache_miss\":{}", cache_miss)?; + } + if pages_to_fetch_mem > 0 { + write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?; + } + if page_size_to_fetch_mem > 0 { + write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?; + } + if pages_to_fetch_write_cache > 0 { + write!( + f, + ", \"pages_to_fetch_write_cache\":{}", + pages_to_fetch_write_cache + )?; + } + if page_size_to_fetch_write_cache > 0 { + write!( + f, + ", \"page_size_to_fetch_write_cache\":{}", + page_size_to_fetch_write_cache + )?; + } + if pages_to_fetch_store > 0 { + write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?; + } + if page_size_to_fetch_store > 0 { + write!( + f, + ", \"page_size_to_fetch_store\":{}", + page_size_to_fetch_store + )?; + } + if page_size_needed > 0 { + write!(f, ", \"page_size_needed\":{}", page_size_needed)?; + } + if !write_cache_fetch_elapsed.is_zero() { + write!( + f, + ", \"write_cache_fetch_elapsed\":\"{:?}\"", + write_cache_fetch_elapsed + )?; + } + if !store_fetch_elapsed.is_zero() { + write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?; + } + + write!(f, "}}") + } +} + +impl ParquetFetchMetrics { + /// Returns true if the metrics are empty (contain no meaningful data). + pub fn is_empty(&self) -> bool { + self.data.lock().unwrap().is_empty() + } + + /// Merges metrics from another [ParquetFetchMetrics]. + pub fn merge_from(&self, other: &ParquetFetchMetrics) { + let ParquetFetchMetricsData { + page_cache_hit, + write_cache_hit, + cache_miss, + pages_to_fetch_mem, + page_size_to_fetch_mem, + pages_to_fetch_write_cache, + page_size_to_fetch_write_cache, + pages_to_fetch_store, + page_size_to_fetch_store, + page_size_needed, + write_cache_fetch_elapsed, + store_fetch_elapsed, + total_fetch_elapsed, + } = *other.data.lock().unwrap(); + + let mut data = self.data.lock().unwrap(); + data.page_cache_hit += page_cache_hit; + data.write_cache_hit += write_cache_hit; + data.cache_miss += cache_miss; + data.pages_to_fetch_mem += pages_to_fetch_mem; + data.page_size_to_fetch_mem += page_size_to_fetch_mem; + data.pages_to_fetch_write_cache += pages_to_fetch_write_cache; + data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache; + data.pages_to_fetch_store += pages_to_fetch_store; + data.page_size_to_fetch_store += page_size_to_fetch_store; + data.page_size_needed += page_size_needed; + data.write_cache_fetch_elapsed += write_cache_fetch_elapsed; + data.store_fetch_elapsed += store_fetch_elapsed; + data.total_fetch_elapsed += total_fetch_elapsed; + } +} + pub(crate) struct RowGroupBase<'a> { metadata: &'a RowGroupMetaData, pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>, @@ -244,13 +413,14 @@ impl<'a> InMemoryRowGroup<'a> { &mut self, projection: &ProjectionMask, selection: Option<&RowSelection>, + metrics: Option<&ParquetFetchMetrics>, ) -> Result<()> { if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { let (fetch_ranges, page_start_offsets) = self.base .calc_sparse_read_ranges(projection, offset_index, selection); - let chunk_data = self.fetch_bytes(&fetch_ranges).await?; + let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; // Assign sparse chunk data to base. self.base .assign_sparse_chunk(projection, chunk_data, page_start_offsets); @@ -268,7 +438,7 @@ impl<'a> InMemoryRowGroup<'a> { } // Fetch data with ranges - let chunk_data = self.fetch_bytes(&fetch_ranges).await?; + let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?; // Assigns fetched data to base. self.base.assign_dense_chunk(projection, chunk_data); @@ -279,31 +449,74 @@ impl<'a> InMemoryRowGroup<'a> { /// Try to fetch data from the memory cache or the WriteCache, /// if not in WriteCache, fetch data from object store directly. - async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { + async fn fetch_bytes( + &self, + ranges: &[Range], + metrics: Option<&ParquetFetchMetrics>, + ) -> Result> { // Now fetch page timer includes the whole time to read pages. let _timer = READ_STAGE_FETCH_PAGES.start_timer(); + let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); if let Some(pages) = self.cache_strategy.get_pages(&page_key) { + if let Some(metrics) = metrics { + let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.page_cache_hit += 1; + metrics_data.pages_to_fetch_mem += ranges.len(); + metrics_data.page_size_to_fetch_mem += total_size; + metrics_data.page_size_needed += total_size; + } return Ok(pages.compressed.clone()); } + // Calculate total range size for metrics. + let (total_range_size, unaligned_size) = compute_total_range_size(ranges); + let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); - let pages = match self.fetch_ranges_from_write_cache(key, ranges).await { - Some(data) => data, + let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now()); + let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await; + let pages = match write_cache_result { + Some(data) => { + if let Some(metrics) = metrics { + let elapsed = fetch_write_cache_start + .map(|start| start.elapsed()) + .unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.write_cache_fetch_elapsed += elapsed; + metrics_data.write_cache_hit += 1; + metrics_data.pages_to_fetch_write_cache += ranges.len(); + metrics_data.page_size_to_fetch_write_cache += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data + } None => { // Fetch data from object store. let _timer = READ_STAGE_ELAPSED .with_label_values(&["cache_miss_read"]) .start_timer(); - fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) + let start = metrics.map(|_| std::time::Instant::now()); + let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) .await - .map_err(|e| ParquetError::External(Box::new(e)))? + .map_err(|e| ParquetError::External(Box::new(e)))?; + if let Some(metrics) = metrics { + let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); + let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.store_fetch_elapsed += elapsed; + metrics_data.cache_miss += 1; + metrics_data.pages_to_fetch_store += ranges.len(); + metrics_data.page_size_to_fetch_store += unaligned_size; + metrics_data.page_size_needed += range_size_needed; + } + data } }; // Put pages back to the cache. - let total_range_size = compute_total_range_size(ranges); let page_value = PageValue::new(pages.clone(), total_range_size); self.cache_strategy .put_pages(page_key, Arc::new(page_value)); @@ -326,17 +539,21 @@ impl<'a> InMemoryRowGroup<'a> { } /// Computes the max possible buffer size to read the given `ranges`. +/// Returns (aligned_size, unaligned_size) where: +/// - aligned_size: total size aligned to pooled buffer size +/// - unaligned_size: actual total size without alignment // See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192 -fn compute_total_range_size(ranges: &[Range]) -> u64 { +fn compute_total_range_size(ranges: &[Range]) -> (u64, u64) { if ranges.is_empty() { - return 0; + return (0, 0); } let gap = MERGE_GAP as u64; let mut sorted_ranges = ranges.to_vec(); sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start)); - let mut total_size = 0; + let mut total_size_aligned = 0; + let mut total_size_unaligned = 0; let mut cur = sorted_ranges[0].clone(); for range in sorted_ranges.into_iter().skip(1) { @@ -345,15 +562,19 @@ fn compute_total_range_size(ranges: &[Range]) -> u64 { cur.end = cur.end.max(range.end); } else { // No overlap and the gap is too large, add current range to total and start a new one - total_size += align_to_pooled_buf_size(cur.end - cur.start); + let range_size = cur.end - cur.start; + total_size_aligned += align_to_pooled_buf_size(range_size); + total_size_unaligned += range_size; cur = range; } } // Add the last range - total_size += align_to_pooled_buf_size(cur.end - cur.start); + let range_size = cur.end - cur.start; + total_size_aligned += align_to_pooled_buf_size(range_size); + total_size_unaligned += range_size; - total_size + (total_size_aligned, total_size_unaligned) } /// Aligns the given size to the multiple of the pooled buffer size. diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 9f287128c1..060bd4237b 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::Result; use crate::file_metadata::FileMetadata; +/// Metrics returned by `PuffinReader::dir` operations. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct DirMetrics { + /// Whether this was a cache hit (true) or cache miss (false). + pub cache_hit: bool, + /// Size of the directory in bytes. + pub dir_size: u64, +} + /// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`. #[async_trait] pub trait PuffinManager { @@ -106,9 +115,10 @@ pub trait PuffinReader { /// Reads a directory from the Puffin file. /// - /// The returned `GuardWithMetadata` is used to access the directory data and its metadata. + /// The returned tuple contains `GuardWithMetadata` and `DirMetrics`. + /// The `GuardWithMetadata` is used to access the directory data and its metadata. /// Users should hold the `GuardWithMetadata` until they are done with the directory data. - async fn dir(&self, key: &str) -> Result>; + async fn dir(&self, key: &str) -> Result<(GuardWithMetadata, DirMetrics)>; } /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 8339d32c95..c660d1e19a 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; -use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader}; +use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. pub struct FsPuffinReader @@ -130,10 +130,10 @@ where Ok(GuardWithMetadata::new(blob, blob_metadata)) } - async fn dir(&self, key: &str) -> Result> { + async fn dir(&self, key: &str) -> Result<(GuardWithMetadata, DirMetrics)> { let mut file = self.puffin_reader().await?; let blob_metadata = self.get_blob_metadata(key, &mut file).await?; - let dir = self + let (dir, metrics) = self .stager .get_dir( &self.handle, @@ -153,7 +153,7 @@ where ) .await?; - Ok(GuardWithMetadata::new(dir, blob_metadata)) + Ok((GuardWithMetadata::new(dir, blob_metadata), metrics)) } } diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 708053bb27..512e94f4e8 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -23,7 +23,7 @@ use futures::AsyncWrite; use futures::future::BoxFuture; use crate::error::Result; -use crate::puffin_manager::{BlobGuard, DirGuard}; +use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics}; pub type BoxWriter = Box; @@ -72,14 +72,15 @@ pub trait Stager: Send + Sync { /// Retrieves a directory, initializing it if necessary using the provided `init_fn`. /// - /// The returned `DirGuard` is used to access the directory in the filesystem. + /// The returned tuple contains the `DirGuard` and `DirMetrics`. + /// The `DirGuard` is used to access the directory in the filesystem. /// The caller is responsible for holding the `DirGuard` until they are done with the directory. async fn get_dir<'a>( &self, handle: &Self::FileHandle, key: &str, init_fn: Box, - ) -> Result; + ) -> Result<(Self::Dir, DirMetrics)>; /// Stores a directory in the staging area. async fn put_dir( diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 380cce7930..dfb9285452 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -41,7 +41,7 @@ use crate::error::{ use crate::puffin_manager::stager::{ BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier, }; -use crate::puffin_manager::{BlobGuard, DirGuard}; +use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics}; const DELETE_QUEUE_SIZE: usize = 10240; const TMP_EXTENSION: &str = "tmp"; @@ -203,7 +203,7 @@ impl Stager for BoundedStager { handle: &Self::FileHandle, key: &str, init_fn: Box, - ) -> Result { + ) -> Result<(Self::Dir, DirMetrics)> { let handle_str = handle.to_string(); let cache_key = Self::encode_cache_key(&handle_str, key); @@ -242,15 +242,22 @@ impl Stager for BoundedStager { .await .context(CacheGetSnafu)?; + let dir_size = v.size(); if let Some(notifier) = self.notifier.as_ref() { if miss { - notifier.on_cache_miss(v.size()); + notifier.on_cache_miss(dir_size); } else { - notifier.on_cache_hit(v.size()); + notifier.on_cache_hit(dir_size); } } + + let metrics = DirMetrics { + cache_hit: !miss, + dir_size, + }; + match v { - CacheValue::Dir(guard) => Ok(guard), + CacheValue::Dir(guard) => Ok((guard, metrics)), _ => unreachable!(), } } @@ -882,7 +889,7 @@ mod tests { let puffin_file_name = "test_get_dir".to_string(); let key = "key"; - let dir_path = stager + let (dir_path, metrics) = stager .get_dir( &puffin_file_name, key, @@ -901,6 +908,9 @@ mod tests { .await .unwrap(); + assert!(!metrics.cache_hit); + assert!(metrics.dir_size > 0); + for (rel_path, content) in &files_in_dir { let file_path = dir_path.path().join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -974,7 +984,7 @@ mod tests { ]; let dir_key = "dir_key"; - let guard = stager + let (guard, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1016,7 +1026,7 @@ mod tests { let buf = reader.read(0..m.content_length).await.unwrap(); assert_eq!(&*buf, b"hello world"); - let dir_path = stager + let (dir_path, metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1024,6 +1034,9 @@ mod tests { ) .await .unwrap(); + + assert!(metrics.cache_hit); + assert!(metrics.dir_size > 0); for (rel_path, content) in &files_in_dir { let file_path = dir_path.path().join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -1151,7 +1164,7 @@ mod tests { ]; // First time to get the directory - let guard_0 = stager + let (guard_0, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1198,7 +1211,7 @@ mod tests { ); // Second time to get the directory - let guard_1 = stager + let (guard_1, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1237,7 +1250,7 @@ mod tests { // Third time to get the directory and all guards are dropped drop(guard_0); drop(guard_1); - let guard_2 = stager + let (guard_2, _metrics) = stager .get_dir( &puffin_file_name, dir_key, @@ -1390,7 +1403,7 @@ mod tests { ]; let dir_key = "dir_key"; - let guard = stager + let (guard, _metrics) = stager .get_dir( &puffin_file_name, dir_key, diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 715668e40e..f1ee9fabd7 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -356,7 +356,7 @@ async fn check_dir( stager: &BoundedStager, puffin_reader: &impl PuffinReader, ) { - let res_dir = puffin_reader.dir(key).await.unwrap(); + let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap(); let metadata = res_dir.metadata(); assert_eq!( metadata.properties,