From f363d73f72f0151003899207e594bda155a8ca6f Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 21 Nov 2025 15:25:54 +0800 Subject: [PATCH] feat: add metrics for range_read and metadata Signed-off-by: evenyag --- src/index/src/bloom_filter/applier.rs | 2 +- src/index/src/bloom_filter/reader.rs | 83 +++++++++++++++---- .../src/cache/index/bloom_filter_index.rs | 30 +++++-- src/mito2/src/engine/puffin_index.rs | 4 +- .../src/sst/index/bloom_filter/creator.rs | 10 +-- 5 files changed, 101 insertions(+), 28 deletions(-) diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index ec1ee26113..db219c9e61 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -38,7 +38,7 @@ pub struct BloomFilterApplier { impl BloomFilterApplier { pub async fn new(reader: Box) -> Result { - let meta = reader.metadata().await?; + let meta = reader.metadata(None).await?; Ok(Self { reader, meta }) } diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 9df1ad6bad..9b139c1260 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -91,7 +91,12 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec { #[async_trait] pub trait BloomFilterReader: Sync { /// Reads range of bytes from the file. - async fn range_read(&self, offset: u64, size: u32) -> Result; + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result; /// Reads bunch of ranges from the file. async fn read_vec( @@ -104,7 +109,7 @@ pub trait BloomFilterReader: Sync { let mut results = Vec::with_capacity(ranges.len()); for range in ranges { let size = (range.end - range.start) as u32; - let data = self.range_read(range.start, size).await?; + let data = self.range_read(range.start, size, None).await?; results.push(data); } @@ -120,11 +125,18 @@ pub trait BloomFilterReader: Sync { } /// Reads the meta information of the bloom filter. - async fn metadata(&self) -> Result; + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result; /// Reads a bloom filter with the given location. - async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result { - let bytes = self.range_read(loc.offset, loc.size as _).await?; + async fn bloom_filter( + &self, + loc: &BloomFilterLoc, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?; let vec = bytes_to_u64_vec(&bytes); let bm = BloomFilter::from_vec(vec) .seed(&SEED) @@ -171,11 +183,28 @@ impl BloomFilterReaderImpl { #[async_trait] impl BloomFilterReader for BloomFilterReaderImpl { - async fn range_read(&self, offset: u64, size: u32) -> Result { - self.reader + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let start = metrics.as_ref().map(|_| Instant::now()); + let result = self + .reader .read(offset..offset + size as u64) .await - .context(IoSnafu) + .context(IoSnafu)?; + + if let Some(m) = metrics { + m.total_ranges += 1; + m.total_bytes += size as u64; + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + + Ok(result) } async fn read_vec( @@ -197,13 +226,16 @@ impl BloomFilterReader for BloomFilterReaderImpl { Ok(result) } - async fn metadata(&self) -> Result { + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { let metadata = self.reader.metadata().await.context(IoSnafu)?; let file_size = metadata.content_length; let mut meta_reader = BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE)); - meta_reader.metadata().await + meta_reader.metadata(metrics).await } } @@ -229,7 +261,10 @@ impl BloomFilterMetaReader { /// /// It will first prefetch some bytes from the end of the file, /// then parse the metadata from the prefetch bytes. - pub async fn metadata(&mut self) -> Result { + pub async fn metadata( + &mut self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { ensure!( self.file_size >= BLOOM_META_LEN_SIZE, FileSizeTooSmallSnafu { @@ -237,6 +272,7 @@ impl BloomFilterMetaReader { } ); + let start = metrics.as_ref().map(|_| Instant::now()); let meta_start = self.file_size.saturating_sub(self.prefetch_size); let suffix = self .reader @@ -254,8 +290,25 @@ impl BloomFilterMetaReader { .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE) .await .context(IoSnafu)?; + + if let Some(m) = metrics { + m.total_ranges += 2; // suffix read + meta read + m.total_bytes += (self.file_size - meta_start) + length; + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) } else { + if let Some(m) = metrics { + m.total_ranges += 1; // suffix read only + m.total_bytes += self.file_size.min(self.prefetch_size); + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start; let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize]; BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) @@ -336,7 +389,7 @@ mod tests { for prefetch in [0u64, file_size / 2, file_size, file_size + 10] { let mut reader = BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch)); - let meta = reader.metadata().await.unwrap(); + let meta = reader.metadata(None).await.unwrap(); assert_eq!(meta.rows_per_segment, 2); assert_eq!(meta.segment_count, 2); @@ -358,11 +411,11 @@ mod tests { let bytes = mock_bloom_filter_bytes().await; let reader = BloomFilterReaderImpl::new(bytes); - let meta = reader.metadata().await.unwrap(); + let meta = reader.metadata(None).await.unwrap(); assert_eq!(meta.bloom_filter_locs.len(), 2); let bf = reader - .bloom_filter(&meta.bloom_filter_locs[0]) + .bloom_filter(&meta.bloom_filter_locs[0], None) .await .unwrap(); assert!(bf.contains(&b"a")); @@ -371,7 +424,7 @@ mod tests { assert!(bf.contains(&b"d")); let bf = reader - .bloom_filter(&meta.bloom_filter_locs[1]) + .bloom_filter(&meta.bloom_filter_locs[1], None) .await .unwrap(); assert!(bf.contains(&b"e")); diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index e25e324ab2..93ae9af6c1 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -115,9 +115,16 @@ impl CachedBloomFilterIndexBlobReader { #[async_trait] impl BloomFilterReader for CachedBloomFilterIndexBlobReader { - async fn range_read(&self, offset: u64, size: u32) -> Result { + async fn range_read( + &self, + offset: u64, + size: u32, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { + let start = metrics.as_ref().map(|_| Instant::now()); let inner = &self.inner; - self.cache + let result = self + .cache .get_or_load( (self.file_id, self.column_id, self.tag), self.blob_size, @@ -126,7 +133,17 @@ impl BloomFilterReader for CachedBloomFilterIndexBl move |ranges| async move { inner.read_vec(&ranges, None).await }, ) .await - .map(|b| b.into()) + .map(|b| b.into())?; + + if let Some(m) = metrics { + m.total_ranges += 1; + m.total_bytes += size as u64; + if let Some(start) = start { + m.elapsed += start.elapsed(); + } + } + + Ok(result) } async fn read_vec( @@ -165,7 +182,10 @@ impl BloomFilterReader for CachedBloomFilterIndexBl } /// Reads the meta information of the bloom filter. - async fn metadata(&self) -> Result { + async fn metadata( + &self, + metrics: Option<&mut BloomFilterReadMetrics>, + ) -> Result { if let Some(cached) = self .cache .get_metadata((self.file_id, self.column_id, self.tag)) @@ -173,7 +193,7 @@ impl BloomFilterReader for CachedBloomFilterIndexBl CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok((*cached).clone()) } else { - let meta = self.inner.metadata().await?; + let meta = self.inner.metadata(metrics).await?; self.cache.put_metadata( (self.file_id, self.column_id, self.tag), Arc::new(meta.clone()), diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs index 05529db59b..10dc76bc69 100644 --- a/src/mito2/src/engine/puffin_index.rs +++ b/src/mito2/src/engine/puffin_index.rs @@ -318,10 +318,10 @@ async fn try_read_bloom_meta( bloom_reader, cache.clone(), ) - .metadata() + .metadata(None) .await } - _ => bloom_reader.metadata().await, + _ => bloom_reader.metadata(None).await, }; match result { diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 0d16a21d7c..83ea788204 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -637,17 +637,17 @@ pub(crate) mod tests { .unwrap(); let reader = blob_guard.reader().await.unwrap(); let bloom_filter = BloomFilterReaderImpl::new(reader); - let metadata = bloom_filter.metadata().await.unwrap(); + let metadata = bloom_filter.metadata(None).await.unwrap(); assert_eq!(metadata.segment_count, 10); for i in 0..5 { let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); assert!(bf.contains(b"tag1")); } for i in 5..10 { let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); assert!(bf.contains(b"tag2")); } } @@ -662,13 +662,13 @@ pub(crate) mod tests { .unwrap(); let reader = blob_guard.reader().await.unwrap(); let bloom_filter = BloomFilterReaderImpl::new(reader); - let metadata = bloom_filter.metadata().await.unwrap(); + let metadata = bloom_filter.metadata(None).await.unwrap(); assert_eq!(metadata.segment_count, 5); for i in 0u64..20 { let idx = i as usize / 4; let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize]; - let bf = bloom_filter.bloom_filter(loc).await.unwrap(); + let bf = bloom_filter.bloom_filter(loc, None).await.unwrap(); let mut buf = vec![]; IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) .unwrap();