diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 56e83eab15..b7787e400d 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -292,8 +292,10 @@ impl BloomFilterMetaReader { .context(IoSnafu)?; if let Some(m) = metrics { - m.total_ranges += 2; // suffix read + meta read - m.total_bytes += (self.file_size - meta_start) + length; + // suffix read + meta read + m.total_ranges += 2; + // Ignores the meta length size to simplify the calculation. + m.total_bytes += self.file_size.min(self.prefetch_size) + length; if let Some(start) = start { m.fetch_elapsed += start.elapsed(); } @@ -302,7 +304,8 @@ impl BloomFilterMetaReader { BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) } else { if let Some(m) = metrics { - m.total_ranges += 1; // suffix read only + // suffix read only + m.total_ranges += 1; m.total_bytes += self.file_size.min(self.prefetch_size); if let Some(start) = start { m.fetch_elapsed += start.elapsed(); diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index fd435fab4e..9dcaf879a5 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -75,7 +75,10 @@ pub trait InvertedIndexReader: Send + Sync { } /// Retrieves metadata of all inverted indices stored within the blob. - async fn metadata(&self) -> Result>; + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result>; /// Retrieves the finite state transducer (FST) map from the given offset and size. async fn fst<'a>( diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index fd10ba9b3f..05f8f40047 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -95,14 +95,17 @@ impl InvertedIndexReader for InvertedIndexBlobReader { Ok(result) } - async fn metadata(&self) -> Result> { + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { let metadata = self.source.metadata().await.context(CommonIoSnafu)?; let blob_size = metadata.content_length; Self::validate_blob_size(blob_size)?; let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size) .with_prefetch_size(DEFAULT_PREFETCH_SIZE); - footer_reader.metadata().await.map(Arc::new) + footer_reader.metadata(metrics).await.map(Arc::new) } } @@ -202,7 +205,7 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); assert_eq!(metas.metas.len(), 2); let meta0 = metas.metas.get("tag0").unwrap(); @@ -229,7 +232,7 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); let fst_map = blob_reader @@ -263,7 +266,7 @@ mod tests { let blob = create_inverted_index_blob(); let blob_reader = InvertedIndexBlobReader::new(blob); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); let bitmap = blob_reader @@ -277,7 +280,7 @@ mod tests { .unwrap(); assert_eq!(bitmap, mock_bitmap()); - let metas = blob_reader.metadata().await.unwrap(); + let metas = blob_reader.metadata(None).await.unwrap(); let meta = metas.metas.get("tag1").unwrap(); let bitmap = blob_reader diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 2609eb6cbb..3758b55e21 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use common_base::range_read::RangeReader; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use prost::Message; @@ -23,6 +25,7 @@ use crate::inverted_index::error::{ UnexpectedZeroSegmentRowCountSnafu, }; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; +use crate::inverted_index::format::reader::InvertedIndexReadMetrics; pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB @@ -54,12 +57,17 @@ impl InvertedIndexFooterReader { } impl InvertedIndexFooterReader { - pub async fn metadata(&mut self) -> Result { + pub async fn metadata<'a>( + &mut self, + mut metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result { ensure!( self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE, BlobSizeTooSmallSnafu ); + let start = metrics.as_ref().map(|_| Instant::now()); + let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); let suffix = self .source @@ -73,19 +81,36 @@ impl InvertedIndexFooterReader { let footer_size = FOOTER_PAYLOAD_SIZE_SIZE; // Did not fetch the entire file metadata in the initial read, need to make a second request. - if length > suffix_len as u64 - footer_size { + let result = if length > suffix_len as u64 - footer_size { let metadata_start = self.blob_size - length - footer_size; let meta = self .source .read(metadata_start..self.blob_size - footer_size) .await .context(CommonIoSnafu)?; + + if let Some(m) = metrics.as_deref_mut() { + m.total_bytes += self.blob_size.min(self.prefetch_size()) + length; + m.total_ranges += 2; + } + self.parse_payload(&meta, length) } else { + if let Some(m) = metrics.as_deref_mut() { + m.total_bytes += self.blob_size.min(self.prefetch_size()); + m.total_ranges += 1; + } + let metadata_start = self.blob_size - length - footer_size - footer_start; let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; self.parse_payload(meta, length) + }; + + if let Some(m) = metrics { + m.fetch_elapsed += start.unwrap().elapsed(); } + + result } fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { @@ -186,7 +211,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let metas = reader.metadata().await.unwrap(); + let metas = reader.metadata(None).await.unwrap(); assert_eq!(metas.metas.len(), 1); let index_meta = &metas.metas.get("test").unwrap(); assert_eq!(index_meta.name, "test"); @@ -210,7 +235,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let result = reader.metadata().await; + let result = reader.metadata(None).await; assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. })); } } @@ -233,7 +258,7 @@ mod tests { reader = reader.with_prefetch_size(prefetch); } - let result = reader.metadata().await; + let result = reader.metadata(None).await; assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. })); } } diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 32a4219179..58d8593591 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -122,7 +122,7 @@ mod tests { .unwrap(); let reader = InvertedIndexBlobReader::new(blob); - let metadata = reader.metadata().await.unwrap(); + let metadata = reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 0); @@ -182,7 +182,7 @@ mod tests { .unwrap(); let reader = InvertedIndexBlobReader::new(blob); - let metadata = reader.metadata().await.unwrap(); + let metadata = reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 2); diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index 0b5297b8b0..99a3a1ebb0 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -50,7 +50,7 @@ impl IndexApplier for PredicatesIndexApplier { metrics: Option<&'b mut InvertedIndexReadMetrics>, ) -> Result { let mut metrics = metrics; - let metadata = reader.metadata().await?; + let metadata = reader.metadata(metrics.as_deref_mut()).await?; let mut output = ApplyOutput { matched_segment_ids: Bitmap::new_bitvec(), total_row_count: metadata.total_row_count as _, @@ -225,7 +225,7 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); mock_reader.expect_fst_vec().returning(|_ranges, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(), @@ -258,7 +258,7 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); mock_reader.expect_fst_vec().returning(|_range, _metrics| { Ok(vec![ FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(), @@ -285,7 +285,7 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); + .returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); mock_reader.expect_fst_vec().returning(|ranges, _metrics| { let mut output = vec![]; for range in ranges { @@ -339,7 +339,7 @@ mod tests { let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas([("tag-0", 0)]))); + .returning(|_| Ok(mock_metas([("tag-0", 0)]))); let output = applier .apply(SearchContext::default(), &mut mock_reader, None) @@ -351,7 +351,7 @@ mod tests { #[tokio::test] async fn test_index_applier_with_empty_index() { let mut mock_reader = MockInvertedIndexReader::new(); - mock_reader.expect_metadata().returning(move || { + mock_reader.expect_metadata().returning(move |_| { Ok(Arc::new(InvertedIndexMetas { total_row_count: 0, // No rows segment_row_count: 1, @@ -378,7 +378,7 @@ mod tests { let mut mock_reader = MockInvertedIndexReader::new(); mock_reader .expect_metadata() - .returning(|| Ok(mock_metas(vec![]))); + .returning(|_| Ok(mock_metas(vec![]))); let mut mock_fst_applier = MockFstApplier::new(); mock_fst_applier.expect_apply().never(); diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index e7c3019dfd..274cd44f77 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -146,12 +146,15 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead Ok(pages) } - async fn metadata(&self) -> Result> { + async fn metadata<'a>( + &self, + metrics: Option<&'a mut InvertedIndexReadMetrics>, + ) -> Result> { if let Some(cached) = self.cache.get_metadata(self.file_id) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(cached) } else { - let meta = self.inner.metadata().await?; + let meta = self.inner.metadata(metrics).await?; self.cache.put_metadata(self.file_id, meta.clone()); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) @@ -306,7 +309,7 @@ mod test { reader, Arc::new(InvertedIndexCache::new(8192, 8192, 50)), ); - let metadata = cached_reader.metadata().await.unwrap(); + let metadata = cached_reader.metadata(None).await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 2); diff --git a/src/mito2/src/engine/puffin_index.rs b/src/mito2/src/engine/puffin_index.rs index 10dc76bc69..925d547eb9 100644 --- a/src/mito2/src/engine/puffin_index.rs +++ b/src/mito2/src/engine/puffin_index.rs @@ -233,7 +233,7 @@ async fn collect_inverted_entries( InvertedIndexBlobReader::new(blob_reader), cache.clone(), ); - match reader.metadata().await { + match reader.metadata(None).await { Ok(metas) => metas, Err(err) => { warn!( @@ -247,7 +247,7 @@ async fn collect_inverted_entries( } } else { let reader = InvertedIndexBlobReader::new(blob_reader); - match reader.metadata().await { + match reader.metadata(None).await { Ok(metas) => metas, Err(err) => { warn!(