Compare commits

...

24 Commits

Author SHA1 Message Date
shuiyisong
59ddfa84ec fix: check and clippy
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-11-26 18:35:21 +08:00
evenyag
dd043eadc4 feat: add file_scan_cost
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7e6af2c7ee feat: collect the whole fetch time
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
87d3b17f4d feat: update parquet fetch metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5acac3d403 chore: fix compiler errors
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
f9c66ba0de feat: implement debug for new metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
37847a8df6 feat: debug print metrics in ScanMetricsSet
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
6e06ac9e5c feat: init verbose metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
09effc8128 feat: add fetch metrics to ReaderMetrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
c14728e3ae feat: collect more metrics for memory row group
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
cce4d56e00 feat: add apply metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
69cf13b33a feat: add parquet metadata metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
c83a282b39 feat: collect parquet row group metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5329efcdba feat: collect fulltext dir metrics for applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
50b5c90d53 feat: collect read metrics in appliers
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
fea2966dec feat: collect cache metrics for inverted and bloom index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
e00452c4db feat: collect metadata fetch metrics for inverted index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7a31b2a8ea refactor: rename elapsed to fetch_elapsed
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
f363d73f72 feat: add metrics for range_read and metadata
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7a6befcad3 feat: collect read metrics for inverted index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
d6c75ec55f feat: implement BloomFilterReadMetrics for BloomFilterReader
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5b8f1d819f feat: add metrics to fulltext index applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
b68286e8af feat: add metrics to bloom applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
4519607bc6 feat: add inverted applier metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
31 changed files with 1664 additions and 288 deletions

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use crate::Bytes; use crate::Bytes;
use crate::bloom_filter::error::Result; use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader; use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least /// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied. /// one of the elements (logical OR semantic) for the predicate to be satisfied.
@@ -38,7 +38,7 @@ pub struct BloomFilterApplier {
impl BloomFilterApplier { impl BloomFilterApplier {
pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> { pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
let meta = reader.metadata().await?; let meta = reader.metadata(None).await?;
Ok(Self { reader, meta }) Ok(Self { reader, meta })
} }
@@ -50,6 +50,7 @@ impl BloomFilterApplier {
&mut self, &mut self,
predicates: &[InListPredicate], predicates: &[InListPredicate],
search_ranges: &[Range<usize>], search_ranges: &[Range<usize>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Range<usize>>> { ) -> Result<Vec<Range<usize>>> {
if predicates.is_empty() { if predicates.is_empty() {
// If no predicates, return empty result // If no predicates, return empty result
@@ -57,7 +58,7 @@ impl BloomFilterApplier {
} }
let segments = self.row_ranges_to_segments(search_ranges); let segments = self.row_ranges_to_segments(search_ranges);
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?; let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates); let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
Ok(intersect_ranges(search_ranges, &matching_row_ranges)) Ok(intersect_ranges(search_ranges, &matching_row_ranges))
} }
@@ -95,6 +96,7 @@ impl BloomFilterApplier {
async fn load_bloom_filters( async fn load_bloom_filters(
&mut self, &mut self,
segments: &[usize], segments: &[usize],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> { ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
let segment_locations = segments let segment_locations = segments
.iter() .iter()
@@ -108,7 +110,10 @@ impl BloomFilterApplier {
.map(|i| self.meta.bloom_filter_locs[i as usize]) .map(|i| self.meta.bloom_filter_locs[i as usize])
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?; let bloom_filters = self
.reader
.bloom_filter_vec(&bloom_filter_locs, metrics)
.await?;
Ok((segment_locations, bloom_filters)) Ok((segment_locations, bloom_filters))
} }
@@ -422,7 +427,10 @@ mod tests {
]; ];
for (predicates, search_range, expected) in cases { for (predicates, search_range, expected) in cases {
let result = applier.search(&predicates, &[search_range]).await.unwrap(); let result = applier
.search(&predicates, &[search_range], None)
.await
.unwrap();
assert_eq!( assert_eq!(
result, expected, result, expected,
"Expected {:?}, got {:?}", "Expected {:?}, got {:?}",

View File

@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
use std::ops::{Range, Rem}; use std::ops::{Range, Rem};
use std::time::{Duration, Instant};
use async_trait::async_trait; use async_trait::async_trait;
use bytemuck::try_cast_slice; use bytemuck::try_cast_slice;
@@ -34,6 +35,32 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
/// Default prefetch size of bloom filter meta. /// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
/// Metrics for bloom filter read operations.
#[derive(Debug, Default, Clone)]
pub struct BloomFilterReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl BloomFilterReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance. /// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
/// Faster than chunking and converting each piece individually. /// Faster than chunking and converting each piece individually.
/// ///
@@ -79,25 +106,52 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
#[async_trait] #[async_trait]
pub trait BloomFilterReader: Sync { pub trait BloomFilterReader: Sync {
/// Reads range of bytes from the file. /// Reads range of bytes from the file.
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>; async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes>;
/// Reads bunch of ranges from the file. /// Reads bunch of ranges from the file.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut results = Vec::with_capacity(ranges.len()); let mut results = Vec::with_capacity(ranges.len());
for range in ranges { for range in ranges {
let size = (range.end - range.start) as u32; let size = (range.end - range.start) as u32;
let data = self.range_read(range.start, size).await?; let data = self.range_read(range.start, size, None).await?;
results.push(data); results.push(data);
} }
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(results) Ok(results)
} }
/// Reads the meta information of the bloom filter. /// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta>; async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta>;
/// Reads a bloom filter with the given location. /// Reads a bloom filter with the given location.
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> { async fn bloom_filter(
let bytes = self.range_read(loc.offset, loc.size as _).await?; &self,
loc: &BloomFilterLoc,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
let vec = bytes_to_u64_vec(&bytes); let vec = bytes_to_u64_vec(&bytes);
let bm = BloomFilter::from_vec(vec) let bm = BloomFilter::from_vec(vec)
.seed(&SEED) .seed(&SEED)
@@ -105,12 +159,16 @@ pub trait BloomFilterReader: Sync {
Ok(bm) Ok(bm)
} }
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> { async fn bloom_filter_vec(
&self,
locs: &[BloomFilterLoc],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<BloomFilter>> {
let ranges = locs let ranges = locs
.iter() .iter()
.map(|l| l.offset..l.offset + l.size) .map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?; let bss = self.read_vec(&ranges, metrics).await?;
let mut result = Vec::with_capacity(bss.len()); let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) { for (bs, loc) in bss.into_iter().zip(locs.iter()) {
@@ -140,24 +198,59 @@ impl<R: RangeReader> BloomFilterReaderImpl<R> {
#[async_trait] #[async_trait]
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> { impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> { async fn range_read(
self.reader &self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self
.reader
.read(offset..offset + size as u64) .read(offset..offset + size as u64)
.await .await
.context(IoSnafu) .context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += 1;
m.total_bytes += size as u64;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
} }
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec(
self.reader.read_vec(ranges).await.context(IoSnafu) &self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
} }
async fn metadata(&self) -> Result<BloomFilterMeta> { async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
let metadata = self.reader.metadata().await.context(IoSnafu)?; let metadata = self.reader.metadata().await.context(IoSnafu)?;
let file_size = metadata.content_length; let file_size = metadata.content_length;
let mut meta_reader = let mut meta_reader =
BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE)); BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
meta_reader.metadata().await meta_reader.metadata(metrics).await
} }
} }
@@ -183,7 +276,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
/// ///
/// It will first prefetch some bytes from the end of the file, /// It will first prefetch some bytes from the end of the file,
/// then parse the metadata from the prefetch bytes. /// then parse the metadata from the prefetch bytes.
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> { pub async fn metadata(
&mut self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
ensure!( ensure!(
self.file_size >= BLOOM_META_LEN_SIZE, self.file_size >= BLOOM_META_LEN_SIZE,
FileSizeTooSmallSnafu { FileSizeTooSmallSnafu {
@@ -191,6 +287,7 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
} }
); );
let start = metrics.as_ref().map(|_| Instant::now());
let meta_start = self.file_size.saturating_sub(self.prefetch_size); let meta_start = self.file_size.saturating_sub(self.prefetch_size);
let suffix = self let suffix = self
.reader .reader
@@ -208,8 +305,28 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE) .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
.await .await
.context(IoSnafu)?; .context(IoSnafu)?;
if let Some(m) = metrics {
// suffix read + meta read
m.total_ranges += 2;
// Ignores the meta length size to simplify the calculation.
m.total_bytes += self.file_size.min(self.prefetch_size) + length;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
} else { } else {
if let Some(m) = metrics {
// suffix read only
m.total_ranges += 1;
m.total_bytes += self.file_size.min(self.prefetch_size);
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start; let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize]; let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu) BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
@@ -290,7 +407,7 @@ mod tests {
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] { for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
let mut reader = let mut reader =
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch)); BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
let meta = reader.metadata().await.unwrap(); let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.rows_per_segment, 2); assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.segment_count, 2); assert_eq!(meta.segment_count, 2);
@@ -312,11 +429,11 @@ mod tests {
let bytes = mock_bloom_filter_bytes().await; let bytes = mock_bloom_filter_bytes().await;
let reader = BloomFilterReaderImpl::new(bytes); let reader = BloomFilterReaderImpl::new(bytes);
let meta = reader.metadata().await.unwrap(); let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.bloom_filter_locs.len(), 2); assert_eq!(meta.bloom_filter_locs.len(), 2);
let bf = reader let bf = reader
.bloom_filter(&meta.bloom_filter_locs[0]) .bloom_filter(&meta.bloom_filter_locs[0], None)
.await .await
.unwrap(); .unwrap();
assert!(bf.contains(&b"a")); assert!(bf.contains(&b"a"));
@@ -325,7 +442,7 @@ mod tests {
assert!(bf.contains(&b"d")); assert!(bf.contains(&b"d"));
let bf = reader let bf = reader
.bloom_filter(&meta.bloom_filter_locs[1]) .bloom_filter(&meta.bloom_filter_locs[1], None)
.await .await
.unwrap(); .unwrap();
assert!(bf.contains(&b"e")); assert!(bf.contains(&b"e"));

View File

@@ -74,7 +74,7 @@ async fn test_search(
writer.finish().await.unwrap(); writer.finish().await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap(); let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap(); let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap(); let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
for (query, expected) in query_expected { for (query, expected) in query_expected {
let results = searcher.search(query).await.unwrap(); let results = searcher.search(query).await.unwrap();

View File

@@ -15,6 +15,7 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@@ -29,19 +30,59 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
mod blob; mod blob;
mod footer; mod footer;
/// Metrics for inverted index read operations.
#[derive(Debug, Default, Clone)]
pub struct InvertedIndexReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl InvertedIndexReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// InvertedIndexReader defines an asynchronous reader of inverted index data /// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock] #[mockall::automock]
#[async_trait] #[async_trait]
pub trait InvertedIndexReader: Send + Sync { pub trait InvertedIndexReader: Send + Sync {
/// Seeks to given offset and reads data with exact size as provided. /// Seeks to given offset and reads data with exact size as provided.
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>>; async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>>;
/// Reads the bytes in the given ranges. /// Reads the bytes in the given ranges.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let mut metrics = metrics;
let mut result = Vec::with_capacity(ranges.len()); let mut result = Vec::with_capacity(ranges.len());
for range in ranges { for range in ranges {
let data = self let data = self
.range_read(range.start, (range.end - range.start) as u32) .range_read(
range.start,
(range.end - range.start) as u32,
metrics.as_deref_mut(),
)
.await?; .await?;
result.push(Bytes::from(data)); result.push(Bytes::from(data));
} }
@@ -49,17 +90,29 @@ pub trait InvertedIndexReader: Send + Sync {
} }
/// Retrieves metadata of all inverted indices stored within the blob. /// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>>; async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>>;
/// Retrieves the finite state transducer (FST) map from the given offset and size. /// Retrieves the finite state transducer (FST) map from the given offset and size.
async fn fst(&self, offset: u64, size: u32) -> Result<FstMap> { async fn fst<'a>(
let fst_data = self.range_read(offset, size).await?; &self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<FstMap> {
let fst_data = self.range_read(offset, size, metrics).await?;
FstMap::new(fst_data).context(DecodeFstSnafu) FstMap::new(fst_data).context(DecodeFstSnafu)
} }
/// Retrieves the multiple finite state transducer (FST) maps from the given ranges. /// Retrieves the multiple finite state transducer (FST) maps from the given ranges.
async fn fst_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<FstMap>> { async fn fst_vec<'a>(
self.read_vec(ranges) &mut self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<FstMap>> {
self.read_vec(ranges, metrics)
.await? .await?
.into_iter() .into_iter()
.map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu)) .map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu))
@@ -67,19 +120,28 @@ pub trait InvertedIndexReader: Send + Sync {
} }
/// Retrieves the bitmap from the given offset and size. /// Retrieves the bitmap from the given offset and size.
async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result<Bitmap> { async fn bitmap<'a>(
self.range_read(offset, size).await.and_then(|bytes| { &self,
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu) offset: u64,
}) size: u32,
bitmap_type: BitmapType,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Bitmap> {
self.range_read(offset, size, metrics)
.await
.and_then(|bytes| {
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
})
} }
/// Retrieves the multiple bitmaps from the given ranges. /// Retrieves the multiple bitmaps from the given ranges.
async fn bitmap_deque( async fn bitmap_deque<'a>(
&mut self, &mut self,
ranges: &[(Range<u64>, BitmapType)], ranges: &[(Range<u64>, BitmapType)],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<VecDeque<Bitmap>> { ) -> Result<VecDeque<Bitmap>> {
let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip(); let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
let bytes = self.read_vec(&ranges).await?; let bytes = self.read_vec(&ranges, metrics).await?;
bytes bytes
.into_iter() .into_iter()
.zip(types) .zip(types)

View File

@@ -14,6 +14,7 @@
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
@@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure};
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu}; use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::MIN_BLOB_SIZE; use crate::inverted_index::format::MIN_BLOB_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::footer::{ use crate::inverted_index::format::reader::footer::{
DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader, DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
}; };
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// Inverted index blob reader, implements [`InvertedIndexReader`] /// Inverted index blob reader, implements [`InvertedIndexReader`]
pub struct InvertedIndexBlobReader<R> { pub struct InvertedIndexBlobReader<R> {
@@ -53,27 +54,58 @@ impl<R> InvertedIndexBlobReader<R> {
#[async_trait] #[async_trait]
impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> { impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> { async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let buf = self let buf = self
.source .source
.read(offset..offset + size as u64) .read(offset..offset + size as u64)
.await .await
.context(CommonIoSnafu)?; .context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += size as u64;
m.total_ranges += 1;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(buf.into()) Ok(buf.into())
} }
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec<'a>(
self.source.read_vec(ranges).await.context(CommonIoSnafu) &self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
m.total_ranges += ranges.len();
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
} }
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> { async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?; let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length; let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?; Self::validate_blob_size(blob_size)?;
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size) let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE); .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new) footer_reader.metadata(metrics).await.map(Arc::new)
} }
} }
@@ -173,7 +205,7 @@ mod tests {
let blob = create_inverted_index_blob(); let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob); let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap(); let metas = blob_reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 2); assert_eq!(metas.metas.len(), 2);
let meta0 = metas.metas.get("tag0").unwrap(); let meta0 = metas.metas.get("tag0").unwrap();
@@ -200,13 +232,14 @@ mod tests {
let blob = create_inverted_index_blob(); let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob); let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap(); let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap(); let meta = metas.metas.get("tag0").unwrap();
let fst_map = blob_reader let fst_map = blob_reader
.fst( .fst(
meta.base_offset + meta.relative_fst_offset as u64, meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size, meta.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
@@ -219,6 +252,7 @@ mod tests {
.fst( .fst(
meta.base_offset + meta.relative_fst_offset as u64, meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size, meta.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
@@ -232,30 +266,30 @@ mod tests {
let blob = create_inverted_index_blob(); let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob); let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap(); let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap(); let meta = metas.metas.get("tag0").unwrap();
let bitmap = blob_reader let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring) .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(bitmap, mock_bitmap()); assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(bitmap, mock_bitmap()); assert_eq!(bitmap, mock_bitmap());
let metas = blob_reader.metadata().await.unwrap(); let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag1").unwrap(); let meta = metas.metas.get("tag1").unwrap();
let bitmap = blob_reader let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring) .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(bitmap, mock_bitmap()); assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring) .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(bitmap, mock_bitmap()); assert_eq!(bitmap, mock_bitmap());

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::time::Instant;
use common_base::range_read::RangeReader; use common_base::range_read::RangeReader;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message; use prost::Message;
@@ -23,6 +25,7 @@ use crate::inverted_index::error::{
UnexpectedZeroSegmentRowCountSnafu, UnexpectedZeroSegmentRowCountSnafu,
}; };
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
@@ -54,12 +57,17 @@ impl<R> InvertedIndexFooterReader<R> {
} }
impl<R: RangeReader> InvertedIndexFooterReader<R> { impl<R: RangeReader> InvertedIndexFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> { pub async fn metadata(
&mut self,
mut metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<InvertedIndexMetas> {
ensure!( ensure!(
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE, self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
BlobSizeTooSmallSnafu BlobSizeTooSmallSnafu
); );
let start = metrics.as_ref().map(|_| Instant::now());
let footer_start = self.blob_size.saturating_sub(self.prefetch_size()); let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
let suffix = self let suffix = self
.source .source
@@ -73,19 +81,36 @@ impl<R: RangeReader> InvertedIndexFooterReader<R> {
let footer_size = FOOTER_PAYLOAD_SIZE_SIZE; let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
// Did not fetch the entire file metadata in the initial read, need to make a second request. // Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size { let result = if length > suffix_len as u64 - footer_size {
let metadata_start = self.blob_size - length - footer_size; let metadata_start = self.blob_size - length - footer_size;
let meta = self let meta = self
.source .source
.read(metadata_start..self.blob_size - footer_size) .read(metadata_start..self.blob_size - footer_size)
.await .await
.context(CommonIoSnafu)?; .context(CommonIoSnafu)?;
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
m.total_ranges += 2;
}
self.parse_payload(&meta, length) self.parse_payload(&meta, length)
} else { } else {
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size());
m.total_ranges += 1;
}
let metadata_start = self.blob_size - length - footer_size - footer_start; let metadata_start = self.blob_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(meta, length) self.parse_payload(meta, length)
};
if let Some(m) = metrics {
m.fetch_elapsed += start.unwrap().elapsed();
} }
result
} }
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
@@ -186,7 +211,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch); reader = reader.with_prefetch_size(prefetch);
} }
let metas = reader.metadata().await.unwrap(); let metas = reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 1); assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap(); let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test"); assert_eq!(index_meta.name, "test");
@@ -210,7 +235,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch); reader = reader.with_prefetch_size(prefetch);
} }
let result = reader.metadata().await; let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. })); assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
} }
} }
@@ -233,7 +258,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch); reader = reader.with_prefetch_size(prefetch);
} }
let result = reader.metadata().await; let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. })); assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
} }
} }

View File

@@ -122,7 +122,7 @@ mod tests {
.unwrap(); .unwrap();
let reader = InvertedIndexBlobReader::new(blob); let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap(); let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 0); assert_eq!(metadata.metas.len(), 0);
@@ -182,7 +182,7 @@ mod tests {
.unwrap(); .unwrap();
let reader = InvertedIndexBlobReader::new(blob); let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap(); let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2); assert_eq!(metadata.metas.len(), 2);
@@ -198,13 +198,19 @@ mod tests {
.fst( .fst(
tag0.base_offset + tag0.relative_fst_offset as u64, tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size, tag0.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(fst0.len(), 3); assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap()); let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -213,7 +219,12 @@ mod tests {
); );
let [offset, size] = unpack(fst0.get(b"b").unwrap()); let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -222,7 +233,12 @@ mod tests {
); );
let [offset, size] = unpack(fst0.get(b"c").unwrap()); let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -241,13 +257,19 @@ mod tests {
.fst( .fst(
tag1.base_offset + tag1.relative_fst_offset as u64, tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size, tag1.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(fst1.len(), 3); assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap()); let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -256,7 +278,12 @@ mod tests {
); );
let [offset, size] = unpack(fst1.get(b"y").unwrap()); let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -265,7 +292,12 @@ mod tests {
); );
let [offset, size] = unpack(fst1.get(b"z").unwrap()); let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = reader let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(

View File

@@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
use crate::bitmap::Bitmap; use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result; use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their /// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their
/// corresponding bitmaps within an inverted index. /// corresponding bitmaps within an inverted index.
@@ -35,7 +35,8 @@ impl<'a> ParallelFstValuesMapper<'a> {
pub async fn map_values_vec( pub async fn map_values_vec(
&mut self, &mut self,
value_and_meta_vec: &[(Vec<u64>, &'a InvertedIndexMeta)], value_and_meta_vec: &[(Vec<u64>, &InvertedIndexMeta)],
metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bitmap>> { ) -> Result<Vec<Bitmap>> {
let groups = value_and_meta_vec let groups = value_and_meta_vec
.iter() .iter()
@@ -64,7 +65,7 @@ impl<'a> ParallelFstValuesMapper<'a> {
} }
common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges); common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges);
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?; let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges, metrics).await?;
let mut output = Vec::with_capacity(groups.len()); let mut output = Vec::with_capacity(groups.len());
for counter in groups { for counter in groups {
@@ -95,23 +96,25 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_map_values_vec() { async fn test_map_values_vec() {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_bitmap_deque().returning(|ranges| { mock_reader
let mut output = VecDeque::new(); .expect_bitmap_deque()
for (range, bitmap_type) in ranges { .returning(|ranges, _metrics| {
let offset = range.start; let mut output = VecDeque::new();
let size = range.end - range.start; for (range, bitmap_type) in ranges {
match (offset, size, bitmap_type) { let offset = range.start;
(1, 1, BitmapType::Roaring) => { let size = range.end - range.start;
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
} }
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
} }
} Ok(output)
Ok(output) });
});
let meta = InvertedIndexMeta { let meta = InvertedIndexMeta {
bitmap_type: BitmapType::Roaring.into(), bitmap_type: BitmapType::Roaring.into(),
@@ -120,13 +123,13 @@ mod tests {
let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader); let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader);
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![], &meta)]) .map_values_vec(&[(vec![], &meta)], None)
.await .await
.unwrap(); .unwrap();
assert_eq!(result[0].count_ones(), 0); assert_eq!(result[0].count_ones(), 0);
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![value(1, 1)], &meta)]) .map_values_vec(&[(vec![value(1, 1)], &meta)], None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -135,7 +138,7 @@ mod tests {
); );
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta)]) .map_values_vec(&[(vec![value(2, 1)], &meta)], None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -144,7 +147,7 @@ mod tests {
); );
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)]) .map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -153,7 +156,7 @@ mod tests {
); );
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)]) .map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -162,7 +165,10 @@ mod tests {
); );
let result = values_mapper let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)]) .map_values_vec(
&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)],
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -174,10 +180,13 @@ mod tests {
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring) Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
); );
let result = values_mapper let result = values_mapper
.map_values_vec(&[ .map_values_vec(
(vec![value(2, 1), value(1, 1)], &meta), &[
(vec![value(1, 1)], &meta), (vec![value(2, 1), value(1, 1)], &meta),
]) (vec![value(1, 1)], &meta),
],
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(

View File

@@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier;
use crate::bitmap::Bitmap; use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result; use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// The output of an apply operation. /// The output of an apply operation.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
@@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning /// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs). /// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
#[allow(unused_parens)] #[allow(unused_parens)]
async fn apply<'a>( async fn apply<'a, 'b>(
&self, &self,
context: SearchContext, context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a), reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput>; ) -> Result<ApplyOutput>;
/// Returns the memory usage of the applier. /// Returns the memory usage of the applier.

View File

@@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas;
use crate::bitmap::Bitmap; use crate::bitmap::Bitmap;
use crate::inverted_index::error::{IndexNotFoundSnafu, Result}; use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
use crate::inverted_index::format::reader::InvertedIndexReader; use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use crate::inverted_index::search::fst_apply::{ use crate::inverted_index::search::fst_apply::{
FstApplier, IntersectionFstApplier, KeysFstApplier, FstApplier, IntersectionFstApplier, KeysFstApplier,
}; };
@@ -43,12 +43,14 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier { impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices. /// bitmaps obtained for each index to result in a final set of indices.
async fn apply<'a>( async fn apply<'a, 'b>(
&self, &self,
context: SearchContext, context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a), reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput> { ) -> Result<ApplyOutput> {
let metadata = reader.metadata().await?; let mut metrics = metrics;
let metadata = reader.metadata(metrics.as_deref_mut()).await?;
let mut output = ApplyOutput { let mut output = ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(), matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: metadata.total_row_count as _, total_row_count: metadata.total_row_count as _,
@@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier {
return Ok(output); return Ok(output);
} }
let fsts = reader.fst_vec(&fst_ranges).await?; let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
let value_and_meta_vec = fsts let value_and_meta_vec = fsts
.into_iter() .into_iter()
.zip(appliers) .zip(appliers)
@@ -92,7 +94,7 @@ impl IndexApplier for PredicatesIndexApplier {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut mapper = ParallelFstValuesMapper::new(reader); let mut mapper = ParallelFstValuesMapper::new(reader);
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?; let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
for bm in bm_vec { for bm in bm_vec {
@@ -221,26 +223,28 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader mock_reader
.expect_metadata() .expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)]))); .returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_ranges| { mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
Ok(vec![ Ok(vec![
FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(), FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
]) ])
}); });
mock_reader.expect_bitmap_deque().returning(|arg| { mock_reader
assert_eq!(arg.len(), 1); .expect_bitmap_deque()
let range = &arg[0].0; .returning(|arg, _metrics| {
let bitmap_type = arg[0].1; assert_eq!(arg.len(), 1);
assert_eq!(*range, 2..3); let range = &arg[0].0;
assert_eq!(bitmap_type, BitmapType::Roaring); let bitmap_type = arg[0].1;
Ok(VecDeque::from([Bitmap::from_lsb0_bytes( assert_eq!(*range, 2..3);
&[0b10101010], assert_eq!(bitmap_type, BitmapType::Roaring);
bitmap_type, Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
)])) &[0b10101010],
}); bitmap_type,
)]))
});
let output = applier let output = applier
.apply(SearchContext::default(), &mut mock_reader) .apply(SearchContext::default(), &mut mock_reader, None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -252,14 +256,14 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader mock_reader
.expect_metadata() .expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)]))); .returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_range| { mock_reader.expect_fst_vec().returning(|_range, _metrics| {
Ok(vec![ Ok(vec![
FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(), FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
]) ])
}); });
let output = applier let output = applier
.apply(SearchContext::default(), &mut mock_reader) .apply(SearchContext::default(), &mut mock_reader, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(output.matched_segment_ids.count_ones(), 0); assert_eq!(output.matched_segment_ids.count_ones(), 0);
@@ -279,8 +283,8 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader mock_reader
.expect_metadata() .expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)]))); .returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
mock_reader.expect_fst_vec().returning(|ranges| { mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
let mut output = vec![]; let mut output = vec![];
for range in ranges { for range in ranges {
match range.start { match range.start {
@@ -293,27 +297,29 @@ mod tests {
} }
Ok(output) Ok(output)
}); });
mock_reader.expect_bitmap_deque().returning(|ranges| { mock_reader
let mut output = VecDeque::new(); .expect_bitmap_deque()
for (range, bitmap_type) in ranges { .returning(|ranges, _metrics| {
let offset = range.start; let mut output = VecDeque::new();
let size = range.end - range.start; for (range, bitmap_type) in ranges {
match (offset, size, bitmap_type) { let offset = range.start;
(1, 1, BitmapType::Roaring) => { let size = range.end - range.start;
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type)) match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
} }
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
} }
}
Ok(output) Ok(output)
}); });
let output = applier let output = applier
.apply(SearchContext::default(), &mut mock_reader) .apply(SearchContext::default(), &mut mock_reader, None)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -331,10 +337,10 @@ mod tests {
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new(); let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
mock_reader mock_reader
.expect_metadata() .expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)]))); .returning(|_| Ok(mock_metas([("tag-0", 0)])));
let output = applier let output = applier
.apply(SearchContext::default(), &mut mock_reader) .apply(SearchContext::default(), &mut mock_reader, None)
.await .await
.unwrap(); .unwrap();
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
@@ -343,7 +349,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_index_applier_with_empty_index() { async fn test_index_applier_with_empty_index() {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_metadata().returning(move || { mock_reader.expect_metadata().returning(move |_| {
Ok(Arc::new(InvertedIndexMetas { Ok(Arc::new(InvertedIndexMetas {
total_row_count: 0, // No rows total_row_count: 0, // No rows
segment_row_count: 1, segment_row_count: 1,
@@ -359,7 +365,7 @@ mod tests {
}; };
let output = applier let output = applier
.apply(SearchContext::default(), &mut mock_reader) .apply(SearchContext::default(), &mut mock_reader, None)
.await .await
.unwrap(); .unwrap();
assert!(output.matched_segment_ids.is_empty()); assert!(output.matched_segment_ids.is_empty());
@@ -370,7 +376,7 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new(); let mut mock_reader = MockInvertedIndexReader::new();
mock_reader mock_reader
.expect_metadata() .expect_metadata()
.returning(|| Ok(mock_metas(vec![]))); .returning(|_| Ok(mock_metas(vec![])));
let mut mock_fst_applier = MockFstApplier::new(); let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier.expect_apply().never(); mock_fst_applier.expect_apply().never();
@@ -385,6 +391,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ThrowError, index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
}, },
&mut mock_reader, &mut mock_reader,
None,
) )
.await; .await;
assert!(matches!(result, Err(Error::IndexNotFound { .. }))); assert!(matches!(result, Err(Error::IndexNotFound { .. })));
@@ -395,6 +402,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
}, },
&mut mock_reader, &mut mock_reader,
None,
) )
.await .await
.unwrap(); .unwrap();
@@ -406,6 +414,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::Ignore, index_not_found_strategy: IndexNotFoundStrategy::Ignore,
}, },
&mut mock_reader, &mut mock_reader,
None,
) )
.await .await
.unwrap(); .unwrap();

View File

@@ -44,6 +44,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch; use crate::read::Batch;
use crate::sst::file::RegionFileId; use crate::sst::file::RegionFileId;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta. /// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta"; const SST_META_TYPE: &str = "sst_meta";
@@ -90,6 +91,32 @@ impl CacheStrategy {
} }
} }
/// Gets parquet metadata with cache metrics tracking.
/// Returns the metadata and updates the provided metrics.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Disabled => {
metrics.mem_cache_miss += 1;
metrics.file_cache_miss += 1;
None
}
}
}
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()]. /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache( pub fn get_parquet_meta_data_from_mem_cache(
&self, &self,
@@ -317,6 +344,36 @@ impl CacheManager {
None None
} }
/// Gets cached [ParquetMetaData] with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache_inner(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
metrics.mem_cache_miss += 1;
// Try to get metadata from write cache
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);
// Put metadata into sst meta cache
self.put_parquet_meta_data(file_id, metadata.clone());
return Some(metadata);
};
metrics.file_cache_miss += 1;
None
}
/// Gets cached [ParquetMetaData] from in-memory cache. /// Gets cached [ParquetMetaData] from in-memory cache.
/// This method does not perform I/O. /// This method does not perform I/O.
pub fn get_parquet_meta_data_from_mem_cache( pub fn get_parquet_meta_data_from_mem_cache(
@@ -330,6 +387,17 @@ impl CacheManager {
}) })
} }
/// Gets cached [ParquetMetaData] from in-memory cache without updating global metrics.
/// This is used by `get_parquet_meta_data_with_metrics` to avoid double counting.
fn get_parquet_meta_data_from_mem_cache_inner(
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()))
})
}
/// Puts [ParquetMetaData] into the cache. /// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) { pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
if let Some(cache) = &self.sst_meta_cache { if let Some(cache) = &self.sst_meta_cache {

View File

@@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata";
/// Metrics for index content. /// Metrics for index content.
const INDEX_CONTENT_TYPE: &str = "index_content"; const INDEX_CONTENT_TYPE: &str = "index_content";
/// Metrics collected from IndexCache operations.
#[derive(Debug, Default, Clone)]
pub struct IndexCacheMetrics {
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
/// Number of pages accessed.
pub num_pages: usize,
/// Total bytes from pages.
pub page_bytes: u64,
}
impl IndexCacheMetrics {
/// Merges another set of metrics into this one.
pub fn merge(&mut self, other: &Self) {
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
self.num_pages += other.num_pages;
self.page_bytes += other.page_bytes;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageKey { pub struct PageKey {
page_id: u64, page_id: u64,
@@ -160,18 +183,20 @@ where
offset: u64, offset: u64,
size: u32, size: u32,
load: F, load: F,
) -> Result<Vec<u8>, E> ) -> Result<(Vec<u8>, IndexCacheMetrics), E>
where where
F: Fn(Vec<Range<u64>>) -> Fut, F: Fn(Vec<Range<u64>>) -> Fut,
Fut: Future<Output = Result<Vec<Bytes>, E>>, Fut: Future<Output = Result<Vec<Bytes>, E>>,
E: std::error::Error, E: std::error::Error,
{ {
let mut metrics = IndexCacheMetrics::default();
let page_keys = let page_keys =
PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>(); PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
// Size is 0, return empty data. // Size is 0, return empty data.
if page_keys.is_empty() { if page_keys.is_empty() {
return Ok(Vec::new()); return Ok((Vec::new(), metrics));
} }
metrics.num_pages = page_keys.len();
let mut data = Vec::with_capacity(page_keys.len()); let mut data = Vec::with_capacity(page_keys.len());
data.resize(page_keys.len(), Bytes::new()); data.resize(page_keys.len(), Bytes::new());
let mut cache_miss_range = vec![]; let mut cache_miss_range = vec![];
@@ -182,10 +207,13 @@ where
match self.get_page(key, *page_key) { match self.get_page(key, *page_key) {
Some(page) => { Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_hit += 1;
metrics.page_bytes += page.len() as u64;
data[i] = page; data[i] = page;
} }
None => { None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_miss += 1;
let base_offset = page_key.page_id * self.page_size; let base_offset = page_key.page_id * self.page_size;
let pruned_size = if i == last_index { let pruned_size = if i == last_index {
prune_size(page_keys.iter(), file_size, self.page_size) prune_size(page_keys.iter(), file_size, self.page_size)
@@ -201,14 +229,18 @@ where
let pages = load(cache_miss_range).await?; let pages = load(cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page_key = page_keys[i]; let page_key = page_keys[i];
metrics.page_bytes += page.len() as u64;
data[i] = page.clone(); data[i] = page.clone();
self.put_page(key, page_key, page.clone()); self.put_page(key, page_key, page.clone());
} }
} }
let buffer = Buffer::from_iter(data.into_iter()); let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer Ok((
.slice(PageKey::calculate_range(offset, size, self.page_size)) buffer
.to_vec()) .slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec(),
metrics,
))
} }
fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> { fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {

View File

@@ -14,12 +14,13 @@
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta}; use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use index::bloom_filter::error::Result; use index::bloom_filter::error::Result;
use index::bloom_filter::reader::BloomFilterReader; use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId}; use store_api::storage::{ColumnId, FileId};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey}; use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
@@ -114,51 +115,93 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
#[async_trait] #[async_trait]
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> { impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> { async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner; let inner = &self.inner;
self.cache let (result, cache_metrics) = self
.cache
.get_or_load( .get_or_load(
(self.file_id, self.column_id, self.tag), (self.file_id, self.column_id, self.tag),
self.blob_size, self.blob_size,
offset, offset,
size, size,
move |ranges| async move { inner.read_vec(&ranges).await }, move |ranges| async move { inner.read_vec(&ranges, None).await },
) )
.await .await?;
.map(|b| b.into())
if let Some(m) = metrics {
m.total_ranges += cache_metrics.num_pages;
m.total_bytes += cache_metrics.page_bytes;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result.into())
} }
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len()); let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges { for range in ranges {
let inner = &self.inner; let inner = &self.inner;
let page = self let (page, cache_metrics) = self
.cache .cache
.get_or_load( .get_or_load(
(self.file_id, self.column_id, self.tag), (self.file_id, self.column_id, self.tag),
self.blob_size, self.blob_size,
range.start, range.start,
(range.end - range.start) as u32, (range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await }, move |ranges| async move { inner.read_vec(&ranges, None).await },
) )
.await?; .await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page)); pages.push(Bytes::from(page));
} }
if let Some(m) = metrics {
m.total_ranges += total_cache_metrics.num_pages;
m.total_bytes += total_cache_metrics.page_bytes;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(pages) Ok(pages)
} }
/// Reads the meta information of the bloom filter. /// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta> { async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self if let Some(cached) = self
.cache .cache
.get_metadata((self.file_id, self.column_id, self.tag)) .get_metadata((self.file_id, self.column_id, self.tag))
{ {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok((*cached).clone()) Ok((*cached).clone())
} else { } else {
let meta = self.inner.metadata().await?; let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata( self.cache.put_metadata(
(self.file_id, self.column_id, self.tag), (self.file_id, self.column_id, self.tag),
Arc::new(meta.clone()), Arc::new(meta.clone()),

View File

@@ -14,12 +14,13 @@
use core::ops::Range; use core::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use api::v1::index::InvertedIndexMetas; use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use index::inverted_index::error::Result; use index::inverted_index::error::Result;
use index::inverted_index::format::reader::InvertedIndexReader; use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message; use prost::Message;
use store_api::storage::FileId; use store_api::storage::FileId;
@@ -83,46 +84,86 @@ impl<R> CachedInvertedIndexBlobReader<R> {
#[async_trait] #[async_trait]
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> { impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> { async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner; let inner = &self.inner;
self.cache let (result, cache_metrics) = self
.cache
.get_or_load( .get_or_load(
self.file_id, self.file_id,
self.blob_size, self.blob_size,
offset, offset,
size, size,
move |ranges| async move { inner.read_vec(&ranges).await }, move |ranges| async move { inner.read_vec(&ranges, None).await },
) )
.await .await?;
if let Some(m) = metrics {
m.total_bytes += cache_metrics.page_bytes;
m.total_ranges += cache_metrics.num_pages;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
} }
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len()); let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges { for range in ranges {
let inner = &self.inner; let inner = &self.inner;
let page = self let (page, cache_metrics) = self
.cache .cache
.get_or_load( .get_or_load(
self.file_id, self.file_id,
self.blob_size, self.blob_size,
range.start, range.start,
(range.end - range.start) as u32, (range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await }, move |ranges| async move { inner.read_vec(&ranges, None).await },
) )
.await?; .await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page)); pages.push(Bytes::from(page));
} }
if let Some(m) = metrics {
m.total_bytes += total_cache_metrics.page_bytes;
m.total_ranges += total_cache_metrics.num_pages;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(pages) Ok(pages)
} }
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> { async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) { if let Some(cached) = self.cache.get_metadata(self.file_id) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok(cached) Ok(cached)
} else { } else {
let meta = self.inner.metadata().await?; let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone()); self.cache.put_metadata(self.file_id, meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta) Ok(meta)
@@ -277,7 +318,7 @@ mod test {
reader, reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)), Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
); );
let metadata = cached_reader.metadata().await.unwrap(); let metadata = cached_reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2); assert_eq!(metadata.metas.len(), 2);
@@ -292,13 +333,19 @@ mod test {
.fst( .fst(
tag0.base_offset + tag0.relative_fst_offset as u64, tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size, tag0.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(fst0.len(), 3); assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap()); let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -307,7 +354,12 @@ mod test {
); );
let [offset, size] = unpack(fst0.get(b"b").unwrap()); let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -316,7 +368,12 @@ mod test {
); );
let [offset, size] = unpack(fst0.get(b"c").unwrap()); let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -335,13 +392,19 @@ mod test {
.fst( .fst(
tag1.base_offset + tag1.relative_fst_offset as u64, tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size, tag1.fst_size,
None,
) )
.await .await
.unwrap(); .unwrap();
assert_eq!(fst1.len(), 3); assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap()); let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -350,7 +413,12 @@ mod test {
); );
let [offset, size] = unpack(fst1.get(b"y").unwrap()); let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -359,7 +427,12 @@ mod test {
); );
let [offset, size] = unpack(fst1.get(b"z").unwrap()); let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = cached_reader let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring) .bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
@@ -372,16 +445,16 @@ mod test {
for _ in 0..FUZZ_REPEAT_TIMES { for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.random_range(0..file_size); let offset = rng.random_range(0..file_size);
let size = rng.random_range(0..file_size as u32 - offset as u32); let size = rng.random_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap(); let expected = cached_reader.range_read(offset, size, None).await.unwrap();
let inner = &cached_reader.inner; let inner = &cached_reader.inner;
let read = cached_reader let (read, _cache_metrics) = cached_reader
.cache .cache
.get_or_load( .get_or_load(
cached_reader.file_id, cached_reader.file_id,
file_size, file_size,
offset, offset,
size, size,
|ranges| async move { inner.read_vec(&ranges).await }, |ranges| async move { inner.read_vec(&ranges, None).await },
) )
.await .await
.unwrap(); .unwrap();

View File

@@ -233,7 +233,7 @@ async fn collect_inverted_entries(
InvertedIndexBlobReader::new(blob_reader), InvertedIndexBlobReader::new(blob_reader),
cache.clone(), cache.clone(),
); );
match reader.metadata().await { match reader.metadata(None).await {
Ok(metas) => metas, Ok(metas) => metas,
Err(err) => { Err(err) => {
warn!( warn!(
@@ -247,7 +247,7 @@ async fn collect_inverted_entries(
} }
} else { } else {
let reader = InvertedIndexBlobReader::new(blob_reader); let reader = InvertedIndexBlobReader::new(blob_reader);
match reader.metadata().await { match reader.metadata(None).await {
Ok(metas) => metas, Ok(metas) => metas,
Err(err) => { Err(err) => {
warn!( warn!(
@@ -318,10 +318,10 @@ async fn try_read_bloom_meta(
bloom_reader, bloom_reader,
cache.clone(), cache.clone(),
) )
.metadata() .metadata(None)
.await .await
} }
_ => bloom_reader.metadata().await, _ => bloom_reader.metadata(None).await,
}; };
match result { match result {

View File

@@ -41,10 +41,14 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext; use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange; use crate::sst::file::FileTimeRange;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index; use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Verbose scan metrics for a partition. /// Verbose scan metrics for a partition.
#[derive(Default)] #[derive(Default)]
@@ -81,6 +85,8 @@ pub(crate) struct ScanMetricsSet {
// SST related metrics: // SST related metrics:
/// Duration to build file ranges. /// Duration to build file ranges.
build_parts_cost: Duration, build_parts_cost: Duration,
/// Duration to scan SST files.
file_scan_cost: Duration,
/// Number of row groups before filtering. /// Number of row groups before filtering.
rg_total: usize, rg_total: usize,
/// Number of row groups filtered by fulltext index. /// Number of row groups filtered by fulltext index.
@@ -126,6 +132,18 @@ pub(crate) struct ScanMetricsSet {
/// The stream reached EOF /// The stream reached EOF
stream_eof: bool, stream_eof: bool,
// Optional verbose metrics:
/// Inverted index apply metrics.
inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Bloom filter index apply metrics.
bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Fulltext index apply metrics.
fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
/// Parquet fetch metrics.
fetch_metrics: Option<ParquetFetchMetrics>,
/// Metadata cache metrics.
metadata_cache_metrics: Option<MetadataCacheMetrics>,
} }
impl fmt::Debug for ScanMetricsSet { impl fmt::Debug for ScanMetricsSet {
@@ -141,6 +159,7 @@ impl fmt::Debug for ScanMetricsSet {
num_mem_ranges, num_mem_ranges,
num_file_ranges, num_file_ranges,
build_parts_cost, build_parts_cost,
file_scan_cost,
rg_total, rg_total,
rg_fulltext_filtered, rg_fulltext_filtered,
rg_inverted_filtered, rg_inverted_filtered,
@@ -166,6 +185,11 @@ impl fmt::Debug for ScanMetricsSet {
mem_rows, mem_rows,
mem_batches, mem_batches,
mem_series, mem_series,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
fetch_metrics,
metadata_cache_metrics,
} = self; } = self;
// Write core metrics // Write core metrics
@@ -181,6 +205,7 @@ impl fmt::Debug for ScanMetricsSet {
\"num_mem_ranges\":{num_mem_ranges}, \ \"num_mem_ranges\":{num_mem_ranges}, \
\"num_file_ranges\":{num_file_ranges}, \ \"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \ \"build_parts_cost\":\"{build_parts_cost:?}\", \
\"file_scan_cost\":\"{file_scan_cost:?}\", \
\"rg_total\":{rg_total}, \ \"rg_total\":{rg_total}, \
\"rows_before_filter\":{rows_before_filter}, \ \"rows_before_filter\":{rows_before_filter}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \ \"num_sst_record_batches\":{num_sst_record_batches}, \
@@ -255,6 +280,23 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?; write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
} }
// Write optional verbose metrics
if let Some(metrics) = inverted_index_apply_metrics {
write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = bloom_filter_apply_metrics {
write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fulltext_index_apply_metrics {
write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fetch_metrics {
write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = metadata_cache_metrics {
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}") write!(f, ", \"stream_eof\":{stream_eof}}}")
} }
} }
@@ -304,14 +346,20 @@ impl ScanMetricsSet {
rows_inverted_filtered, rows_inverted_filtered,
rows_bloom_filtered, rows_bloom_filtered,
rows_precise_filtered, rows_precise_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
}, },
num_record_batches, num_record_batches,
num_batches, num_batches,
num_rows, num_rows,
scan_cost: _, scan_cost,
metadata_cache_metrics,
fetch_metrics,
} = other; } = other;
self.build_parts_cost += *build_cost; self.build_parts_cost += *build_cost;
self.file_scan_cost += *scan_cost;
self.rg_total += *rg_total; self.rg_total += *rg_total;
self.rg_fulltext_filtered += *rg_fulltext_filtered; self.rg_fulltext_filtered += *rg_fulltext_filtered;
@@ -328,6 +376,31 @@ impl ScanMetricsSet {
self.num_sst_record_batches += *num_record_batches; self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches; self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows; self.num_sst_rows += *num_rows;
// Merge optional verbose metrics
if let Some(metrics) = inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(InvertedIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(BloomFilterIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(FulltextIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fetch_metrics {
self.fetch_metrics
.get_or_insert_with(ParquetFetchMetrics::default)
.merge_from(metrics);
}
self.metadata_cache_metrics
.get_or_insert_with(MetadataCacheMetrics::default)
.merge_from(metadata_cache_metrics);
} }
/// Sets distributor metrics. /// Sets distributor metrics.
@@ -615,6 +688,11 @@ impl PartitionMetrics {
let mut metrics_set = self.0.metrics.lock().unwrap(); let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.set_distributor_metrics(metrics); metrics_set.set_distributor_metrics(metrics);
} }
/// Returns whether verbose explain is enabled.
pub(crate) fn explain_verbose(&self) -> bool {
self.0.explain_verbose
}
} }
impl fmt::Debug for PartitionMetrics { impl fmt::Debug for PartitionMetrics {
@@ -768,6 +846,21 @@ fn can_split_series(num_rows: u64, num_series: u64) -> bool {
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
} }
/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
/// based on the `explain_verbose` flag.
fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
if explain_verbose {
ReaderFilterMetrics {
inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
..Default::default()
}
} else {
ReaderFilterMetrics::default()
}
}
/// Scans file ranges at `index`. /// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges( pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>, stream_ctx: Arc<StreamContext>,
@@ -776,7 +869,10 @@ pub(crate) async fn scan_file_ranges(
read_type: &'static str, read_type: &'static str,
range_builder: Arc<RangeBuilderList>, range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<Batch>>> { ) -> Result<impl Stream<Item = Result<Batch>>> {
let mut reader_metrics = ReaderMetrics::default(); let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?; .await?;
@@ -799,7 +895,10 @@ pub(crate) async fn scan_flat_file_ranges(
read_type: &'static str, read_type: &'static str,
range_builder: Arc<RangeBuilderList>, range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<RecordBatch>>> { ) -> Result<impl Stream<Item = Result<RecordBatch>>> {
let mut reader_metrics = ReaderMetrics::default(); let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics) .build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?; .await?;
@@ -822,10 +921,18 @@ pub fn build_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>, ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<Batch>> { ) -> impl Stream<Item = Result<Batch>> {
try_stream! { try_stream! {
let reader_metrics = &mut ReaderMetrics::default(); let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges { for range in ranges {
let build_reader_start = Instant::now(); let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector).await?; let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed(); let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost); part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch(); let compat_batch = range.compat_batch();
@@ -857,10 +964,18 @@ pub fn build_flat_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>, ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<RecordBatch>> { ) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! { try_stream! {
let reader_metrics = &mut ReaderMetrics::default(); let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges { for range in ranges {
let build_reader_start = Instant::now(); let build_reader_start = Instant::now();
let mut reader = range.flat_reader().await?; let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed(); let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost); part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -17,11 +17,14 @@ mod builder;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader; use common_base::range_read::RangeReader;
use common_telemetry::warn; use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
};
use index::target::IndexTarget; use index::target::IndexTarget;
use object_store::ObjectStore; use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
@@ -47,6 +50,56 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder; pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Default, Clone)]
pub struct BloomFilterIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for bloom filter read operations.
pub read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
}
write!(f, "}}")
}
}
impl BloomFilterIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.read_metrics.merge_from(&other.read_metrics);
}
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>; pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file. /// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
@@ -133,15 +186,20 @@ impl BloomFilterIndexApplier {
/// ///
/// Row group id existing in the returned result means that the row group is searched. /// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found. /// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply( pub async fn apply(
&self, &self,
file_id: RegionFileId, file_id: RegionFileId,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>, row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> { ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
let _timer = INDEX_APPLY_ELAPSED let apply_start = Instant::now();
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.start_timer();
// Calculates row groups' ranges based on start of the file. // Calculates row groups' ranges based on start of the file.
let mut input = Vec::with_capacity(row_groups.size_hint().0); let mut input = Vec::with_capacity(row_groups.size_hint().0);
@@ -163,7 +221,7 @@ impl BloomFilterIndexApplier {
for (column_id, predicates) in self.predicates.iter() { for (column_id, predicates) in self.predicates.iter() {
let blob = match self let blob = match self
.blob_reader(file_id, *column_id, file_size_hint) .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await? .await?
{ {
Some(blob) => blob, Some(blob) => blob,
@@ -173,6 +231,9 @@ impl BloomFilterIndexApplier {
// Create appropriate reader based on whether we have caching enabled // Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache { if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length; let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
if let Some(m) = &mut metrics {
m.blob_read_bytes += blob_size;
}
let reader = CachedBloomFilterIndexBlobReader::new( let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(), file_id.file_id(),
*column_id, *column_id,
@@ -181,12 +242,12 @@ impl BloomFilterIndexApplier {
BloomFilterReaderImpl::new(blob), BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(), bloom_filter_cache.clone(),
); );
self.apply_predicates(reader, predicates, &mut output) self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await .await
.context(ApplyBloomFilterIndexSnafu)?; .context(ApplyBloomFilterIndexSnafu)?;
} else { } else {
let reader = BloomFilterReaderImpl::new(blob); let reader = BloomFilterReaderImpl::new(blob);
self.apply_predicates(reader, predicates, &mut output) self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await .await
.context(ApplyBloomFilterIndexSnafu)?; .context(ApplyBloomFilterIndexSnafu)?;
} }
@@ -201,6 +262,16 @@ impl BloomFilterIndexApplier {
} }
} }
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(output) Ok(output)
} }
@@ -212,6 +283,7 @@ impl BloomFilterIndexApplier {
file_id: RegionFileId, file_id: RegionFileId,
column_id: ColumnId, column_id: ColumnId,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Option<BlobReader>> { ) -> Result<Option<BlobReader>> {
let reader = match self let reader = match self
.cached_blob_reader(file_id, column_id, file_size_hint) .cached_blob_reader(file_id, column_id, file_size_hint)
@@ -219,6 +291,9 @@ impl BloomFilterIndexApplier {
{ {
Ok(Some(puffin_reader)) => puffin_reader, Ok(Some(puffin_reader)) => puffin_reader,
other => { other => {
if let Some(m) = metrics {
m.blob_cache_miss += 1;
}
if let Err(err) = other { if let Err(err) = other {
// Blob not found means no index for this column // Blob not found means no index for this column
if is_blob_not_found(&err) { if is_blob_not_found(&err) {
@@ -320,6 +395,7 @@ impl BloomFilterIndexApplier {
reader: R, reader: R,
predicates: &[InListPredicate], predicates: &[InListPredicate],
output: &mut [(usize, Vec<Range<usize>>)], output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> std::result::Result<(), index::bloom_filter::error::Error> { ) -> std::result::Result<(), index::bloom_filter::error::Error> {
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?; let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
@@ -329,7 +405,10 @@ impl BloomFilterIndexApplier {
continue; continue;
} }
*row_group_output = applier.search(predicates, row_group_output).await?; let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
*row_group_output = applier
.search(predicates, row_group_output, read_metrics)
.await?;
} }
Ok(()) Ok(())
@@ -393,7 +472,7 @@ mod tests {
let applier = builder.build(&exprs).unwrap().unwrap(); let applier = builder.build(&exprs).unwrap().unwrap();
applier applier
.apply(file_id, None, row_groups.into_iter()) .apply(file_id, None, row_groups.into_iter(), None)
.await .await
.unwrap() .unwrap()
.into_iter() .into_iter()

View File

@@ -637,17 +637,17 @@ pub(crate) mod tests {
.unwrap(); .unwrap();
let reader = blob_guard.reader().await.unwrap(); let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader); let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap(); let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 10); assert_eq!(metadata.segment_count, 10);
for i in 0..5 { for i in 0..5 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap(); let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag1")); assert!(bf.contains(b"tag1"));
} }
for i in 5..10 { for i in 5..10 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize]; let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap(); let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag2")); assert!(bf.contains(b"tag2"));
} }
} }
@@ -662,13 +662,13 @@ pub(crate) mod tests {
.unwrap(); .unwrap();
let reader = blob_guard.reader().await.unwrap(); let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader); let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap(); let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 5); assert_eq!(metadata.segment_count, 5);
for i in 0u64..20 { for i in 0u64..20 {
let idx = i as usize / 4; let idx = i as usize / 4;
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize]; let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap(); let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
let mut buf = vec![]; let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
.unwrap(); .unwrap();

View File

@@ -16,11 +16,12 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::iter; use std::iter;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader; use common_base::range_read::RangeReader;
use common_telemetry::warn; use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate}; use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::BloomFilterReaderImpl; use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer}; use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
use index::fulltext_index::{Analyzer, Config}; use index::fulltext_index::{Analyzer, Config};
@@ -53,6 +54,91 @@ use crate::sst::index::puffin_manager::{
pub mod builder; pub mod builder;
/// Metrics for tracking fulltext index apply operations.
#[derive(Default, Clone)]
pub struct FulltextIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Number of directory cache hits.
pub dir_cache_hit: usize,
/// Number of directory cache misses.
pub dir_cache_miss: usize,
/// Elapsed time to initialize directory data.
pub dir_init_elapsed: std::time::Duration,
/// Metrics for bloom filter reads.
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for FulltextIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.dir_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_hit\":{}", self.dir_cache_hit)?;
first = false;
}
if self.dir_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_miss\":{}", self.dir_cache_miss)?;
first = false;
}
if !self.dir_init_elapsed.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_init_elapsed\":\"{:?}\"", self.dir_init_elapsed)?;
}
write!(f, "}}")
}
}
impl FulltextIndexApplyMetrics {
/// Collects metrics from a directory read operation.
pub fn collect_dir_metrics(
&mut self,
elapsed: std::time::Duration,
dir_metrics: puffin::puffin_manager::DirMetrics,
) {
self.dir_init_elapsed += elapsed;
if dir_metrics.cache_hit {
self.dir_cache_hit += 1;
} else {
self.dir_cache_miss += 1;
}
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.dir_cache_hit += other.dir_cache_hit;
self.dir_cache_miss += other.dir_cache_miss;
self.dir_init_elapsed += other.dir_init_elapsed;
self.bloom_filter_read_metrics
.merge_from(&other.bloom_filter_read_metrics);
}
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier { pub struct FulltextIndexApplier {
/// Requests to be applied. /// Requests to be applied.
@@ -124,14 +210,18 @@ impl FulltextIndexApplier {
impl FulltextIndexApplier { impl FulltextIndexApplier {
/// Applies fine-grained fulltext index to the specified SST file. /// Applies fine-grained fulltext index to the specified SST file.
/// Returns the row ids that match the queries. /// Returns the row ids that match the queries.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_fine( pub async fn apply_fine(
&self, &self,
file_id: RegionFileId, file_id: RegionFileId,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> { ) -> Result<Option<BTreeSet<RowId>>> {
let timer = INDEX_APPLY_ELAPSED let apply_start = Instant::now();
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let mut row_ids: Option<BTreeSet<RowId>> = None; let mut row_ids: Option<BTreeSet<RowId>> = None;
for (column_id, request) in self.requests.iter() { for (column_id, request) in self.requests.iter() {
@@ -140,7 +230,13 @@ impl FulltextIndexApplier {
} }
let Some(result) = self let Some(result) = self
.apply_fine_one_column(file_size_hint, file_id, *column_id, request) .apply_fine_one_column(
file_size_hint,
file_id,
*column_id,
request,
metrics.as_deref_mut(),
)
.await? .await?
else { else {
continue; continue;
@@ -159,9 +255,16 @@ impl FulltextIndexApplier {
} }
} }
if row_ids.is_none() { // Record elapsed time to histogram and collect metrics if requested
timer.stop_and_discard(); let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
} }
Ok(row_ids) Ok(row_ids)
} }
@@ -171,6 +274,7 @@ impl FulltextIndexApplier {
file_id: RegionFileId, file_id: RegionFileId,
column_id: ColumnId, column_id: ColumnId,
request: &FulltextRequest, request: &FulltextRequest,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> { ) -> Result<Option<BTreeSet<RowId>>> {
let blob_key = format!( let blob_key = format!(
"{INDEX_BLOB_TYPE_TANTIVY}-{}", "{INDEX_BLOB_TYPE_TANTIVY}-{}",
@@ -178,7 +282,7 @@ impl FulltextIndexApplier {
); );
let dir = self let dir = self
.index_source .index_source
.dir(file_id, &blob_key, file_size_hint) .dir(file_id, &blob_key, file_size_hint, metrics)
.await?; .await?;
let dir = match &dir { let dir = match &dir {
@@ -240,15 +344,20 @@ impl FulltextIndexApplier {
/// ///
/// Row group id existing in the returned result means that the row group is searched. /// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found. /// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_coarse( pub async fn apply_coarse(
&self, &self,
file_id: RegionFileId, file_id: RegionFileId,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>, row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> { ) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
let timer = INDEX_APPLY_ELAPSED let apply_start = Instant::now();
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let (input, mut output) = Self::init_coarse_output(row_groups); let (input, mut output) = Self::init_coarse_output(row_groups);
let mut applied = false; let mut applied = false;
@@ -266,16 +375,27 @@ impl FulltextIndexApplier {
*column_id, *column_id,
&request.terms, &request.terms,
&mut output, &mut output,
metrics.as_deref_mut(),
) )
.await?; .await?;
} }
if !applied { if !applied {
timer.stop_and_discard();
return Ok(None); return Ok(None);
} }
Self::adjust_coarse_output(input, &mut output); Self::adjust_coarse_output(input, &mut output);
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(Some(output)) Ok(Some(output))
} }
@@ -286,6 +406,7 @@ impl FulltextIndexApplier {
column_id: ColumnId, column_id: ColumnId,
terms: &[FulltextTerm], terms: &[FulltextTerm],
output: &mut [(usize, Vec<Range<usize>>)], output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<bool> { ) -> Result<bool> {
let blob_key = format!( let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}", "{INDEX_BLOB_TYPE_BLOOM}-{}",
@@ -293,7 +414,7 @@ impl FulltextIndexApplier {
); );
let Some(reader) = self let Some(reader) = self
.index_source .index_source
.blob(file_id, &blob_key, file_size_hint) .blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
.await? .await?
else { else {
return Ok(false); return Ok(false);
@@ -336,7 +457,13 @@ impl FulltextIndexApplier {
} }
*row_group_output = applier *row_group_output = applier
.search(&predicates, row_group_output) .search(
&predicates,
row_group_output,
metrics
.as_deref_mut()
.map(|m| &mut m.bloom_filter_read_metrics),
)
.await .await
.context(ApplyBloomFilterIndexSnafu)?; .context(ApplyBloomFilterIndexSnafu)?;
} }
@@ -483,8 +610,15 @@ impl IndexSource {
file_id: RegionFileId, file_id: RegionFileId,
key: &str, key: &str,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> { ) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = metrics {
m.blob_cache_miss += 1;
}
let res = reader.blob(key).await; let res = reader.blob(key).await;
match res { match res {
Ok(blob) => Ok(Some(blob)), Ok(blob) => Ok(Some(blob)),
@@ -514,11 +648,25 @@ impl IndexSource {
file_id: RegionFileId, file_id: RegionFileId,
key: &str, key: &str,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> { ) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = &mut metrics {
m.blob_cache_miss += 1;
}
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await; let res = reader.dir(key).await;
match res { match res {
Ok(dir) => Ok(Some(dir)), Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None), Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => { Err(err) => {
if fallbacked { if fallbacked {
@@ -526,9 +674,16 @@ impl IndexSource {
} else { } else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file."); warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?; let reader = self.build_remote(file_id, file_size_hint).await?;
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await; let res = reader.dir(key).await;
match res { match res {
Ok(dir) => Ok(Some(dir)), Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None), Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu), Err(err) => Err(err).context(PuffinReadBlobSnafu),
} }

View File

@@ -723,15 +723,16 @@ mod tests {
let backend = backend.clone(); let backend = backend.clone();
async move { async move {
match backend { match backend {
FulltextBackend::Tantivy => { FulltextBackend::Tantivy => applier
applier.apply_fine(region_file_id, None).await.unwrap() .apply_fine(region_file_id, None, None)
} .await
.unwrap(),
FulltextBackend::Bloom => { FulltextBackend::Bloom => {
let coarse_mask = coarse_mask.unwrap_or_default(); let coarse_mask = coarse_mask.unwrap_or_default();
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i])); let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
// row group id == row id // row group id == row id
let resp = applier let resp = applier
.apply_coarse(region_file_id, None, row_groups) .apply_coarse(region_file_id, None, row_groups, None)
.await .await
.unwrap(); .unwrap();
resp.map(|r| { resp.map(|r| {

View File

@@ -16,10 +16,11 @@ pub mod builder;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader; use common_base::range_read::RangeReader;
use common_telemetry::warn; use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
use index::inverted_index::search::index_apply::{ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
}; };
@@ -44,6 +45,57 @@ use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking inverted index apply operations.
#[derive(Default, Clone)]
pub struct InvertedIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses (0 or 1).
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for inverted index reads.
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
impl std::fmt::Debug for InvertedIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
}
write!(f, "}}")
}
}
impl InvertedIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.inverted_index_read_metrics
.merge_from(&other.inverted_index_read_metrics);
}
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan. /// and returning the relevant row group ids for further scan.
pub(crate) struct InvertedIndexApplier { pub(crate) struct InvertedIndexApplier {
@@ -124,24 +176,30 @@ impl InvertedIndexApplier {
self self
} }
/// Applies predicates to the provided SST file id and returns the relevant row group ids /// Applies predicates to the provided SST file id and returns the relevant row group ids.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply( pub async fn apply(
&self, &self,
file_id: RegionFileId, file_id: RegionFileId,
file_size_hint: Option<u64>, file_size_hint: Option<u64>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> { ) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED let start = Instant::now();
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
let context = SearchContext { let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates. // Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
}; };
let mut cache_miss = 0;
let blob = match self.cached_blob_reader(file_id, file_size_hint).await { let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(puffin_reader)) => puffin_reader, Ok(Some(puffin_reader)) => puffin_reader,
other => { other => {
cache_miss += 1;
if let Err(err) = other { if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
} }
@@ -149,8 +207,9 @@ impl InvertedIndexApplier {
} }
}; };
if let Some(index_cache) = &self.inverted_index_cache { let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new( let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(), file_id.file_id(),
blob_size, blob_size,
@@ -158,16 +217,42 @@ impl InvertedIndexApplier {
index_cache.clone(), index_cache.clone(),
); );
self.index_applier self.index_applier
.apply(context, &mut index_reader) .apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await .await
.context(ApplyInvertedIndexSnafu) .context(ApplyInvertedIndexSnafu)
} else { } else {
let mut index_reader = InvertedIndexBlobReader::new(blob); let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier self.index_applier
.apply(context, &mut index_reader) .apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await .await
.context(ApplyInvertedIndexSnafu) .context(ApplyInvertedIndexSnafu)
};
// Record elapsed time to histogram and collect metrics if requested
let elapsed = start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(metrics) = metrics {
metrics.apply_elapsed = elapsed;
metrics.blob_cache_miss = cache_miss;
metrics.blob_read_bytes = blob_size;
} }
result
} }
/// Creates a blob reader from the cached index file. /// Creates a blob reader from the cached index file.
@@ -281,7 +366,7 @@ mod tests {
let mut mock_index_applier = MockIndexApplier::new(); let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100); mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().returning(|_, _| { mock_index_applier.expect_apply().returning(|_, _, _| {
Ok(ApplyOutput { Ok(ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(), matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: 100, total_row_count: 100,
@@ -297,7 +382,7 @@ mod tests {
puffin_manager_factory, puffin_manager_factory,
Default::default(), Default::default(),
); );
let output = sst_index_applier.apply(file_id, None).await.unwrap(); let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
assert_eq!( assert_eq!(
output, output,
ApplyOutput { ApplyOutput {
@@ -345,7 +430,7 @@ mod tests {
puffin_manager_factory, puffin_manager_factory,
Default::default(), Default::default(),
); );
let res = sst_index_applier.apply(file_id, None).await; let res = sst_index_applier.apply(file_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
} }
} }

View File

@@ -615,7 +615,7 @@ mod tests {
.unwrap(); .unwrap();
Box::pin(async move { Box::pin(async move {
applier applier
.apply(sst_file_id, None) .apply(sst_file_id, None, None)
.await .await
.unwrap() .unwrap()
.matched_segment_ids .matched_segment_ids

View File

@@ -245,7 +245,7 @@ mod tests {
let bs = blob_reader.read(0..meta.content_length).await.unwrap(); let bs = blob_reader.read(0..meta.content_length).await.unwrap();
assert_eq!(&*bs, raw_data); assert_eq!(&*bs, raw_data);
let dir_guard = reader.dir(dir_key).await.unwrap(); let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
let file = dir_guard.path().join("hello"); let file = dir_guard.path().join("hello");
let data = tokio::fs::read(file).await.unwrap(); let data = tokio::fs::read(file).await.unwrap();
assert_eq!(data, raw_data); assert_eq!(data, raw_data);

View File

@@ -45,6 +45,7 @@ use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{ use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
}; };
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Checks if a row group contains delete operations by examining the min value of op_type column. /// Checks if a row group contains delete operations by examining the min value of op_type column.
/// ///
@@ -117,11 +118,16 @@ impl FileRange {
pub(crate) async fn reader( pub(crate) async fn reader(
&self, &self,
selector: Option<TimeSeriesRowSelector>, selector: Option<TimeSeriesRowSelector>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PruneReader> { ) -> Result<PruneReader> {
let parquet_reader = self let parquet_reader = self
.context .context
.reader_builder .reader_builder
.build(self.row_group_idx, self.row_selection.clone()) .build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?; .await?;
let use_last_row_reader = if selector let use_last_row_reader = if selector
@@ -168,11 +174,18 @@ impl FileRange {
} }
/// Creates a flat reader that returns RecordBatch. /// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> { pub(crate) async fn flat_reader(
&self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<FlatPruneReader> {
let parquet_reader = self let parquet_reader = self
.context .context
.reader_builder .reader_builder
.build(self.row_group_idx, self.row_selection.clone()) .build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?; .await?;
// Compute skip_fields once for this row group // Compute skip_fields once for this row group

View File

@@ -52,15 +52,21 @@ use crate::metrics::{
use crate::read::prune::{PruneReader, Source}; use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader}; use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle; use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::bloom_filter::applier::{
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; };
use crate::sst::index::fulltext_index::applier::{
FulltextIndexApplierRef, FulltextIndexApplyMetrics,
};
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
use crate::sst::parquet::file_range::{ use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete, FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
}; };
use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -253,7 +259,9 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size; let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file. // Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; let parquet_meta = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata. // Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata(); let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST. // Gets the metadata stored in the SST.
@@ -378,25 +386,34 @@ impl ParquetReaderBuilder {
&self, &self,
file_path: &str, file_path: &str,
file_size: u64, file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<Arc<ParquetMetaData>> { ) -> Result<Arc<ParquetMetaData>> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"]) .with_label_values(&["read_parquet_metadata"])
.start_timer(); .start_timer();
let file_id = self.file_handle.file_id(); let file_id = self.file_handle.file_id();
// Tries to get from global cache. // Tries to get from cache with metrics tracking.
if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await { if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data_with_metrics(file_id, cache_metrics)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok(metadata); return Ok(metadata);
} }
// Cache miss, load metadata directly. // Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size); let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?; let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata); let metadata = Arc::new(metadata);
// Cache the metadata. // Cache the metadata.
self.cache_strategy self.cache_strategy
.put_parquet_meta_data(file_id, metadata.clone()); .put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok(metadata) Ok(metadata)
} }
@@ -527,7 +544,11 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file. // Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size(); let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint)) .apply_fine(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await; .await;
let selection = match apply_res { let selection = match apply_res {
Ok(Some(res)) => { Ok(Some(res)) => {
@@ -595,13 +616,17 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file. // Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size(); let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint)) .apply(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
.await; .await;
let selection = match apply_res { let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output( Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size, row_group_size,
num_row_groups, num_row_groups,
output, apply_output,
), ),
Err(err) => { Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
@@ -670,7 +695,12 @@ impl ParquetReaderBuilder {
) )
}); });
let apply_res = index_applier let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs) .apply(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
.await; .await;
let mut selection = match apply_res { let mut selection = match apply_res {
Ok(apply_output) => { Ok(apply_output) => {
@@ -748,7 +778,12 @@ impl ParquetReaderBuilder {
) )
}); });
let apply_res = index_applier let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) .apply_coarse(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await; .await;
let mut selection = match apply_res { let mut selection = match apply_res {
Ok(Some(apply_output)) => { Ok(Some(apply_output)) => {
@@ -892,7 +927,7 @@ fn all_required_row_groups_searched(
} }
/// Metrics of filtering rows groups and rows. /// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)] #[derive(Debug, Default, Clone)]
pub(crate) struct ReaderFilterMetrics { pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering. /// Number of row groups before filtering.
pub(crate) rg_total: usize, pub(crate) rg_total: usize,
@@ -915,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_bloom_filtered: usize, pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter. /// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize, pub(crate) rows_precise_filtered: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Optional metrics for bloom filter index applier.
pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Optional metrics for fulltext index applier.
pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
} }
impl ReaderFilterMetrics { impl ReaderFilterMetrics {
@@ -931,6 +973,23 @@ impl ReaderFilterMetrics {
self.rows_inverted_filtered += other.rows_inverted_filtered; self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered; self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered; self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
} }
/// Reports metrics. /// Reports metrics.
@@ -987,6 +1046,77 @@ impl ReaderFilterMetrics {
} }
} }
/// Metrics for parquet metadata cache operations.
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {
/// Number of memory cache hits for parquet metadata.
pub(crate) mem_cache_hit: usize,
/// Number of memory cache misses for parquet metadata.
pub(crate) mem_cache_miss: usize,
/// Number of file cache hits for parquet metadata.
pub(crate) file_cache_hit: usize,
/// Number of file cache misses for parquet metadata.
pub(crate) file_cache_miss: usize,
/// Duration to load parquet metadata.
pub(crate) metadata_load_cost: Duration,
}
impl std::fmt::Debug for MetadataCacheMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if self.mem_cache_hit > 0 {
write!(f, "\"mem_cache_hit\":{}", self.mem_cache_hit)?;
first = false;
}
if self.mem_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"mem_cache_miss\":{}", self.mem_cache_miss)?;
first = false;
}
if self.file_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_hit\":{}", self.file_cache_hit)?;
first = false;
}
if self.file_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_miss\":{}", self.file_cache_miss)?;
first = false;
}
if !self.metadata_load_cost.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"metadata_load_cost\":\"{:?}\"",
self.metadata_load_cost
)?;
}
write!(f, "}}")
}
}
impl MetadataCacheMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
self.mem_cache_hit += other.mem_cache_hit;
self.mem_cache_miss += other.mem_cache_miss;
self.file_cache_hit += other.file_cache_hit;
self.file_cache_miss += other.file_cache_miss;
self.metadata_load_cost += other.metadata_load_cost;
}
}
/// Parquet reader metrics. /// Parquet reader metrics.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct ReaderMetrics { pub struct ReaderMetrics {
@@ -1002,6 +1132,10 @@ pub struct ReaderMetrics {
pub(crate) num_batches: usize, pub(crate) num_batches: usize,
/// Number of rows read. /// Number of rows read.
pub(crate) num_rows: usize, pub(crate) num_rows: usize,
/// Metrics for parquet metadata cache.
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
/// Optional metrics for page/row group fetch operations.
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
} }
impl ReaderMetrics { impl ReaderMetrics {
@@ -1013,6 +1147,15 @@ impl ReaderMetrics {
self.num_record_batches += other.num_record_batches; self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches; self.num_batches += other.num_batches;
self.num_rows += other.num_rows; self.num_rows += other.num_rows;
self.metadata_cache_metrics
.merge_from(&other.metadata_cache_metrics);
if let Some(other_fetch) = &other.fetch_metrics {
if let Some(self_fetch) = &self.fetch_metrics {
self_fetch.merge_from(other_fetch);
} else {
self.fetch_metrics = Some(other_fetch.clone());
}
}
} }
/// Reports total rows. /// Reports total rows.
@@ -1067,7 +1210,10 @@ impl RowGroupReaderBuilder {
&self, &self,
row_group_idx: usize, row_group_idx: usize,
row_selection: Option<RowSelection>, row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> { ) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create( let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(), self.file_handle.region_id(),
self.file_handle.file_id().file_id(), self.file_handle.file_id().file_id(),
@@ -1079,12 +1225,17 @@ impl RowGroupReaderBuilder {
); );
// Fetches data into memory. // Fetches data into memory.
row_group row_group
.fetch(&self.projection, row_selection.as_ref()) .fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
.await .await
.context(ReadParquetSnafu { .context(ReadParquetSnafu {
path: &self.file_path, path: &self.file_path,
})?; })?;
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.add_total_fetch_elapsed(fetch_start.elapsed().as_micros() as u64);
}
// Builds the parquet reader. // Builds the parquet reader.
// Now the row selection is None. // Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups( ParquetRecordBatchReader::try_new_with_row_groups(
@@ -1228,6 +1379,8 @@ pub struct ParquetReader {
selection: RowGroupSelection, selection: RowGroupSelection,
/// Reader of current row group. /// Reader of current row group.
reader_state: ReaderState, reader_state: ReaderState,
/// Metrics for tracking row group fetch operations.
fetch_metrics: ParquetFetchMetrics,
} }
#[async_trait] #[async_trait]
@@ -1247,7 +1400,11 @@ impl BatchReader for ParquetReader {
let parquet_reader = self let parquet_reader = self
.context .context
.reader_builder() .reader_builder()
.build(row_group_idx, Some(row_selection)) .build(
row_group_idx,
Some(row_selection),
Some(&self.fetch_metrics),
)
.await?; .await?;
// Resets the parquet reader. // Resets the parquet reader.
@@ -1303,11 +1460,12 @@ impl ParquetReader {
context: FileRangeContextRef, context: FileRangeContextRef,
mut selection: RowGroupSelection, mut selection: RowGroupSelection,
) -> Result<Self> { ) -> Result<Self> {
let fetch_metrics = ParquetFetchMetrics::default();
// No more items in current row group, reads next row group. // No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context let parquet_reader = context
.reader_builder() .reader_builder()
.build(row_group_idx, Some(row_selection)) .build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
.await?; .await?;
// Compute skip_fields once for this row group // Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx); let skip_fields = context.should_skip_fields(row_group_idx);
@@ -1324,6 +1482,7 @@ impl ParquetReader {
context, context,
selection, selection,
reader_state, reader_state,
fetch_metrics,
}) })
} }

View File

@@ -35,6 +35,206 @@ use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges}; use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
/// Metrics for tracking page/row group fetch operations.
/// Uses atomic counters for thread-safe updates.
#[derive(Default)]
pub struct ParquetFetchMetrics {
/// Number of page cache hits.
page_cache_hit: std::sync::atomic::AtomicUsize,
/// Number of write cache hits.
write_cache_hit: std::sync::atomic::AtomicUsize,
/// Number of pages to fetch.
pages_to_fetch: std::sync::atomic::AtomicUsize,
/// Total size in bytes of pages to fetch.
page_size_to_fetch: std::sync::atomic::AtomicU64,
/// Elapsed time in microseconds fetching from write cache.
write_cache_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Elapsed time in microseconds fetching from object store.
store_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Total elapsed time in microseconds for fetching row groups.
total_fetch_elapsed: std::sync::atomic::AtomicU64,
}
impl std::fmt::Debug for ParquetFetchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
let page_cache_hit = self.page_cache_hit();
let write_cache_hit = self.write_cache_hit();
let pages_to_fetch = self.pages_to_fetch();
let page_size_to_fetch = self.page_size_to_fetch();
let write_cache_elapsed = self.write_cache_fetch_elapsed();
let store_elapsed = self.store_fetch_elapsed();
let total_elapsed = self.total_fetch_elapsed();
if page_cache_hit > 0 {
write!(f, "\"page_cache_hit\":{}", page_cache_hit)?;
first = false;
}
if write_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"write_cache_hit\":{}", write_cache_hit)?;
first = false;
}
if pages_to_fetch > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"pages_to_fetch\":{}", pages_to_fetch)?;
first = false;
}
if page_size_to_fetch > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"page_size_to_fetch\":{}", page_size_to_fetch)?;
first = false;
}
if write_cache_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(write_cache_elapsed);
write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if store_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(store_elapsed);
write!(f, "\"store_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if total_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(total_elapsed);
write!(f, "\"total_fetch_elapsed\":\"{:?}\"", duration)?;
}
write!(f, "}}")
}
}
impl ParquetFetchMetrics {
/// Increments page cache hit counter.
pub fn inc_page_cache_hit(&self) {
self.page_cache_hit
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
/// Increments write cache hit counter.
pub fn inc_write_cache_hit(&self) {
self.write_cache_hit
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
/// Adds to the pages to fetch counter.
pub fn add_pages_to_fetch(&self, count: usize) {
self.pages_to_fetch
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
}
/// Adds to the page size to fetch counter.
pub fn add_page_size_to_fetch(&self, size: u64) {
self.page_size_to_fetch
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the page cache hit count.
pub fn page_cache_hit(&self) -> usize {
self.page_cache_hit
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the write cache hit count.
pub fn write_cache_hit(&self) -> usize {
self.write_cache_hit
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the pages to fetch count.
pub fn pages_to_fetch(&self) -> usize {
self.pages_to_fetch
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the page size to fetch in bytes.
pub fn page_size_to_fetch(&self) -> u64 {
self.page_size_to_fetch
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for write cache fetch.
pub fn add_write_cache_fetch_elapsed(&self, elapsed_us: u64) {
self.write_cache_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the elapsed time in microseconds for write cache fetch.
pub fn write_cache_fetch_elapsed(&self) -> u64 {
self.write_cache_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for object store fetch.
pub fn add_store_fetch_elapsed(&self, elapsed_us: u64) {
self.store_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the elapsed time in microseconds for object store fetch.
pub fn store_fetch_elapsed(&self) -> u64 {
self.store_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for total fetch operation.
pub fn add_total_fetch_elapsed(&self, elapsed_us: u64) {
self.total_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the total elapsed time in microseconds for fetch operations.
pub fn total_fetch_elapsed(&self) -> u64 {
self.total_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Merges metrics from another [ParquetFetchMetrics].
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
self.page_cache_hit
.fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_hit.fetch_add(
other.write_cache_hit(),
std::sync::atomic::Ordering::Relaxed,
);
self.pages_to_fetch
.fetch_add(other.pages_to_fetch(), std::sync::atomic::Ordering::Relaxed);
self.page_size_to_fetch.fetch_add(
other.page_size_to_fetch(),
std::sync::atomic::Ordering::Relaxed,
);
self.write_cache_fetch_elapsed.fetch_add(
other.write_cache_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.store_fetch_elapsed.fetch_add(
other.store_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.total_fetch_elapsed.fetch_add(
other.total_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
}
}
pub(crate) struct RowGroupBase<'a> { pub(crate) struct RowGroupBase<'a> {
metadata: &'a RowGroupMetaData, metadata: &'a RowGroupMetaData,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>, pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
@@ -244,13 +444,14 @@ impl<'a> InMemoryRowGroup<'a> {
&mut self, &mut self,
projection: &ProjectionMask, projection: &ProjectionMask,
selection: Option<&RowSelection>, selection: Option<&RowSelection>,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> { ) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) { if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) = let (fetch_ranges, page_start_offsets) =
self.base self.base
.calc_sparse_read_ranges(projection, offset_index, selection); .calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges).await?; let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assign sparse chunk data to base. // Assign sparse chunk data to base.
self.base self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets); .assign_sparse_chunk(projection, chunk_data, page_start_offsets);
@@ -268,7 +469,7 @@ impl<'a> InMemoryRowGroup<'a> {
} }
// Fetch data with ranges // Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges).await?; let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assigns fetched data to base. // Assigns fetched data to base.
self.base.assign_dense_chunk(projection, chunk_data); self.base.assign_dense_chunk(projection, chunk_data);
@@ -279,31 +480,62 @@ impl<'a> InMemoryRowGroup<'a> {
/// Try to fetch data from the memory cache or the WriteCache, /// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly. /// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> { async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages. // Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer(); let _timer = READ_STAGE_FETCH_PAGES.start_timer();
// Track pages to fetch regardless of cache hit/miss.
if let Some(metrics) = metrics {
metrics.add_pages_to_fetch(ranges.len());
}
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) { if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = metrics {
metrics.inc_page_cache_hit();
}
return Ok(pages.compressed.clone()); return Ok(pages.compressed.clone());
} }
// Calculate total range size for metrics.
let total_range_size = compute_total_range_size(ranges);
if let Some(metrics) = metrics {
metrics.add_page_size_to_fetch(total_range_size);
}
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let pages = match self.fetch_ranges_from_write_cache(key, ranges).await { let start = std::time::Instant::now();
Some(data) => data, let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = metrics {
metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64);
metrics.inc_write_cache_hit();
}
data
}
None => { None => {
// Fetch data from object store. // Fetch data from object store.
let _timer = READ_STAGE_ELAPSED let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"]) .with_label_values(&["cache_miss_read"])
.start_timer(); .start_timer();
fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) let start = std::time::Instant::now();
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await .await
.map_err(|e| ParquetError::External(Box::new(e)))? .map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
metrics.add_store_fetch_elapsed(start.elapsed().as_micros() as u64);
}
data
} }
}; };
// Put pages back to the cache. // Put pages back to the cache.
let total_range_size = compute_total_range_size(ranges);
let page_value = PageValue::new(pages.clone(), total_range_size); let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy self.cache_strategy
.put_pages(page_key, Arc::new(page_value)); .put_pages(page_key, Arc::new(page_value));

View File

@@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::Result; use crate::error::Result;
use crate::file_metadata::FileMetadata; use crate::file_metadata::FileMetadata;
/// Metrics returned by `PuffinReader::dir` operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DirMetrics {
/// Whether this was a cache hit (true) or cache miss (false).
pub cache_hit: bool,
/// Size of the directory in bytes.
pub dir_size: u64,
}
/// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`. /// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`.
#[async_trait] #[async_trait]
pub trait PuffinManager { pub trait PuffinManager {
@@ -106,9 +115,10 @@ pub trait PuffinReader {
/// Reads a directory from the Puffin file. /// Reads a directory from the Puffin file.
/// ///
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata. /// The returned tuple contains `GuardWithMetadata` and `DirMetrics`.
/// The `GuardWithMetadata` is used to access the directory data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the directory data. /// Users should hold the `GuardWithMetadata` until they are done with the directory data.
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>; async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)>;
} }
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data. /// `BlobGuard` is provided by the `PuffinReader` to access the blob data.

View File

@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef; use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader}; use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F> pub struct FsPuffinReader<S, F>
@@ -130,10 +130,10 @@ where
Ok(GuardWithMetadata::new(blob, blob_metadata)) Ok(GuardWithMetadata::new(blob, blob_metadata))
} }
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> { async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)> {
let mut file = self.puffin_reader().await?; let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?; let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let dir = self let (dir, metrics) = self
.stager .stager
.get_dir( .get_dir(
&self.handle, &self.handle,
@@ -153,7 +153,7 @@ where
) )
.await?; .await?;
Ok(GuardWithMetadata::new(dir, blob_metadata)) Ok((GuardWithMetadata::new(dir, blob_metadata), metrics))
} }
} }

View File

@@ -23,7 +23,7 @@ use futures::AsyncWrite;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use crate::error::Result; use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard}; use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>; pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
@@ -72,14 +72,15 @@ pub trait Stager: Send + Sync {
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`. /// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
/// ///
/// The returned `DirGuard` is used to access the directory in the filesystem. /// The returned tuple contains the `DirGuard` and `DirMetrics`.
/// The `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory. /// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>( async fn get_dir<'a>(
&self, &self,
handle: &Self::FileHandle, handle: &Self::FileHandle,
key: &str, key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>, init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>; ) -> Result<(Self::Dir, DirMetrics)>;
/// Stores a directory in the staging area. /// Stores a directory in the staging area.
async fn put_dir( async fn put_dir(

View File

@@ -41,7 +41,7 @@ use crate::error::{
use crate::puffin_manager::stager::{ use crate::puffin_manager::stager::{
BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier, BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
}; };
use crate::puffin_manager::{BlobGuard, DirGuard}; use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
const DELETE_QUEUE_SIZE: usize = 10240; const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp"; const TMP_EXTENSION: &str = "tmp";
@@ -203,7 +203,7 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
handle: &Self::FileHandle, handle: &Self::FileHandle,
key: &str, key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>, init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> { ) -> Result<(Self::Dir, DirMetrics)> {
let handle_str = handle.to_string(); let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key); let cache_key = Self::encode_cache_key(&handle_str, key);
@@ -242,15 +242,22 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
.await .await
.context(CacheGetSnafu)?; .context(CacheGetSnafu)?;
let dir_size = v.size();
if let Some(notifier) = self.notifier.as_ref() { if let Some(notifier) = self.notifier.as_ref() {
if miss { if miss {
notifier.on_cache_miss(v.size()); notifier.on_cache_miss(dir_size);
} else { } else {
notifier.on_cache_hit(v.size()); notifier.on_cache_hit(dir_size);
} }
} }
let metrics = DirMetrics {
cache_hit: !miss,
dir_size,
};
match v { match v {
CacheValue::Dir(guard) => Ok(guard), CacheValue::Dir(guard) => Ok((guard, metrics)),
_ => unreachable!(), _ => unreachable!(),
} }
} }
@@ -882,7 +889,7 @@ mod tests {
let puffin_file_name = "test_get_dir".to_string(); let puffin_file_name = "test_get_dir".to_string();
let key = "key"; let key = "key";
let dir_path = stager let (dir_path, metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
key, key,
@@ -901,6 +908,9 @@ mod tests {
.await .await
.unwrap(); .unwrap();
assert!(!metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir { for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path); let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap(); let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -974,7 +984,7 @@ mod tests {
]; ];
let dir_key = "dir_key"; let dir_key = "dir_key";
let guard = stager let (guard, _metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,
@@ -1016,7 +1026,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap(); let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world"); assert_eq!(&*buf, b"hello world");
let dir_path = stager let (dir_path, metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,
@@ -1024,6 +1034,9 @@ mod tests {
) )
.await .await
.unwrap(); .unwrap();
assert!(metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir { for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path); let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap(); let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -1151,7 +1164,7 @@ mod tests {
]; ];
// First time to get the directory // First time to get the directory
let guard_0 = stager let (guard_0, _metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,
@@ -1198,7 +1211,7 @@ mod tests {
); );
// Second time to get the directory // Second time to get the directory
let guard_1 = stager let (guard_1, _metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,
@@ -1237,7 +1250,7 @@ mod tests {
// Third time to get the directory and all guards are dropped // Third time to get the directory and all guards are dropped
drop(guard_0); drop(guard_0);
drop(guard_1); drop(guard_1);
let guard_2 = stager let (guard_2, _metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,
@@ -1390,7 +1403,7 @@ mod tests {
]; ];
let dir_key = "dir_key"; let dir_key = "dir_key";
let guard = stager let (guard, _metrics) = stager
.get_dir( .get_dir(
&puffin_file_name, &puffin_file_name,
dir_key, dir_key,

View File

@@ -356,7 +356,7 @@ async fn check_dir(
stager: &BoundedStager<String>, stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader, puffin_reader: &impl PuffinReader,
) { ) {
let res_dir = puffin_reader.dir(key).await.unwrap(); let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap();
let metadata = res_dir.metadata(); let metadata = res_dir.metadata();
assert_eq!( assert_eq!(
metadata.properties, metadata.properties,