feat: add more verbose metrics to scanners (#7336)

* feat: add inverted applier metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add metrics to bloom applier

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add metrics to fulltext index applier

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement BloomFilterReadMetrics for BloomFilterReader

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect read metrics for inverted index

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add metrics for range_read and metadata

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename elapsed to fetch_elapsed

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect metadata fetch metrics for inverted index

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect cache metrics for inverted and bloom index

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect read metrics in appliers

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect fulltext dir metrics for applier

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect parquet row group metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add parquet metadata metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add apply metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect more metrics for memory row group

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add fetch metrics to ReaderMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: init verbose metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: debug print metrics in ScanMetricsSet

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement debug for new metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: update parquet fetch metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect the whole fetch time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add file_scan_cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: parquet fetch add cache_miss counter

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: print index read metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: use actual bytes to increase counter

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove provided implementations for index reader traits

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: change get_parquet_meta_data() method to receive metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename file_scan_cost to sst_scan_cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: refine ParquetFetchMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove useless inner method

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: collect page size actual needed

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify InvertedIndexReadMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplfy InvertedIndexApplyMetrics Debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify BloomFilterReadMetrics Debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify BloomFilterIndexApplyMetrics Debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify FulltextIndexApplyMetrics implementation

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify ParquetFetchMetrics Debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: simplify MetadataCacheMetrics Debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: only print verbose metrics when they are not empty.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use mutex to protect ParquetFetchMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use duration for elapsed in ParquetFetchMetricsData

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-12-04 21:40:18 +08:00
committed by GitHub
parent d5c616a9ff
commit 84e4e42ee7
31 changed files with 1716 additions and 324 deletions

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use crate::Bytes;
use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
@@ -38,7 +38,7 @@ pub struct BloomFilterApplier {
impl BloomFilterApplier {
pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
let meta = reader.metadata().await?;
let meta = reader.metadata(None).await?;
Ok(Self { reader, meta })
}
@@ -50,6 +50,7 @@ impl BloomFilterApplier {
&mut self,
predicates: &[InListPredicate],
search_ranges: &[Range<usize>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Range<usize>>> {
if predicates.is_empty() {
// If no predicates, return empty result
@@ -57,7 +58,7 @@ impl BloomFilterApplier {
}
let segments = self.row_ranges_to_segments(search_ranges);
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
Ok(intersect_ranges(search_ranges, &matching_row_ranges))
}
@@ -95,6 +96,7 @@ impl BloomFilterApplier {
async fn load_bloom_filters(
&mut self,
segments: &[usize],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
let segment_locations = segments
.iter()
@@ -108,7 +110,10 @@ impl BloomFilterApplier {
.map(|i| self.meta.bloom_filter_locs[i as usize])
.collect::<Vec<_>>();
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
let bloom_filters = self
.reader
.bloom_filter_vec(&bloom_filter_locs, metrics)
.await?;
Ok((segment_locations, bloom_filters))
}
@@ -422,7 +427,10 @@ mod tests {
];
for (predicates, search_range, expected) in cases {
let result = applier.search(&predicates, &[search_range]).await.unwrap();
let result = applier
.search(&predicates, &[search_range], None)
.await
.unwrap();
assert_eq!(
result, expected,
"Expected {:?}, got {:?}",

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::ops::{Range, Rem};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use bytemuck::try_cast_slice;
@@ -34,6 +35,72 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
/// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
/// Metrics for bloom filter read operations.
#[derive(Default, Clone)]
pub struct BloomFilterReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl std::fmt::Debug for BloomFilterReadMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
total_bytes,
total_ranges,
fetch_elapsed,
cache_hit,
cache_miss,
} = self;
// If both total_bytes and cache_hit are 0, we didn't read anything.
if *total_bytes == 0 && *cache_hit == 0 {
return write!(f, "{{}}");
}
write!(f, "{{")?;
if *total_bytes > 0 {
write!(f, "\"total_bytes\":{}", total_bytes)?;
}
if *cache_hit > 0 {
if *total_bytes > 0 {
write!(f, ", ")?;
}
write!(f, "\"cache_hit\":{}", cache_hit)?;
}
if *total_ranges > 0 {
write!(f, ", \"total_ranges\":{}", total_ranges)?;
}
if !fetch_elapsed.is_zero() {
write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?;
}
if *cache_miss > 0 {
write!(f, ", \"cache_miss\":{}", cache_miss)?;
}
write!(f, "}}")
}
}
impl BloomFilterReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
/// Faster than chunking and converting each piece individually.
///
@@ -79,25 +146,33 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
#[async_trait]
pub trait BloomFilterReader: Sync {
/// Reads range of bytes from the file.
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes>;
/// Reads bunch of ranges from the file.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let mut results = Vec::with_capacity(ranges.len());
for range in ranges {
let size = (range.end - range.start) as u32;
let data = self.range_read(range.start, size).await?;
results.push(data);
}
Ok(results)
}
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>>;
/// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta>;
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta>;
/// Reads a bloom filter with the given location.
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _).await?;
async fn bloom_filter(
&self,
loc: &BloomFilterLoc,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
let vec = bytes_to_u64_vec(&bytes);
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
@@ -105,12 +180,16 @@ pub trait BloomFilterReader: Sync {
Ok(bm)
}
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
async fn bloom_filter_vec(
&self,
locs: &[BloomFilterLoc],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<BloomFilter>> {
let ranges = locs
.iter()
.map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?;
let bss = self.read_vec(&ranges, metrics).await?;
let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
@@ -140,24 +219,59 @@ impl<R: RangeReader> BloomFilterReaderImpl<R> {
#[async_trait]
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
self.reader
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self
.reader
.read(offset..offset + size as u64)
.await
.context(IoSnafu)
.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += 1;
m.total_bytes += size as u64;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.reader.read_vec(ranges).await.context(IoSnafu)
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
}
async fn metadata(&self) -> Result<BloomFilterMeta> {
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
let metadata = self.reader.metadata().await.context(IoSnafu)?;
let file_size = metadata.content_length;
let mut meta_reader =
BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
meta_reader.metadata().await
meta_reader.metadata(metrics).await
}
}
@@ -183,7 +297,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
///
/// It will first prefetch some bytes from the end of the file,
/// then parse the metadata from the prefetch bytes.
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
pub async fn metadata(
&mut self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
ensure!(
self.file_size >= BLOOM_META_LEN_SIZE,
FileSizeTooSmallSnafu {
@@ -191,6 +308,7 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
}
);
let start = metrics.as_ref().map(|_| Instant::now());
let meta_start = self.file_size.saturating_sub(self.prefetch_size);
let suffix = self
.reader
@@ -208,8 +326,28 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
.await
.context(IoSnafu)?;
if let Some(m) = metrics {
// suffix read + meta read
m.total_ranges += 2;
// Ignores the meta length size to simplify the calculation.
m.total_bytes += self.file_size.min(self.prefetch_size) + length;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
} else {
if let Some(m) = metrics {
// suffix read only
m.total_ranges += 1;
m.total_bytes += self.file_size.min(self.prefetch_size);
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
@@ -290,7 +428,7 @@ mod tests {
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
let mut reader =
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
let meta = reader.metadata().await.unwrap();
let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.segment_count, 2);
@@ -312,11 +450,11 @@ mod tests {
let bytes = mock_bloom_filter_bytes().await;
let reader = BloomFilterReaderImpl::new(bytes);
let meta = reader.metadata().await.unwrap();
let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.bloom_filter_locs.len(), 2);
let bf = reader
.bloom_filter(&meta.bloom_filter_locs[0])
.bloom_filter(&meta.bloom_filter_locs[0], None)
.await
.unwrap();
assert!(bf.contains(&b"a"));
@@ -325,7 +463,7 @@ mod tests {
assert!(bf.contains(&b"d"));
let bf = reader
.bloom_filter(&meta.bloom_filter_locs[1])
.bloom_filter(&meta.bloom_filter_locs[1], None)
.await
.unwrap();
assert!(bf.contains(&b"e"));

View File

@@ -74,7 +74,7 @@ async fn test_search(
writer.finish().await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
for (query, expected) in query_expected {
let results = searcher.search(query).await.unwrap();

View File

@@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
@@ -29,37 +30,115 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
mod blob;
mod footer;
/// Metrics for inverted index read operations.
#[derive(Default, Clone)]
pub struct InvertedIndexReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl std::fmt::Debug for InvertedIndexReadMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
total_bytes,
total_ranges,
fetch_elapsed,
cache_hit,
cache_miss,
} = self;
// If both total_bytes and cache_hit are 0, we didn't read anything.
if *total_bytes == 0 && *cache_hit == 0 {
return write!(f, "{{}}");
}
write!(f, "{{")?;
if *total_bytes > 0 {
write!(f, "\"total_bytes\":{}", total_bytes)?;
}
if *cache_hit > 0 {
if *total_bytes > 0 {
write!(f, ", ")?;
}
write!(f, "\"cache_hit\":{}", cache_hit)?;
}
if *total_ranges > 0 {
write!(f, ", \"total_ranges\":{}", total_ranges)?;
}
if !fetch_elapsed.is_zero() {
write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?;
}
if *cache_miss > 0 {
write!(f, ", \"cache_miss\":{}", cache_miss)?;
}
write!(f, "}}")
}
}
impl InvertedIndexReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader: Send + Sync {
/// Seeks to given offset and reads data with exact size as provided.
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>>;
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>>;
/// Reads the bytes in the given ranges.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges {
let data = self
.range_read(range.start, (range.end - range.start) as u32)
.await?;
result.push(Bytes::from(data));
}
Ok(result)
}
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>>;
/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>>;
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>>;
/// Retrieves the finite state transducer (FST) map from the given offset and size.
async fn fst(&self, offset: u64, size: u32) -> Result<FstMap> {
let fst_data = self.range_read(offset, size).await?;
async fn fst<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<FstMap> {
let fst_data = self.range_read(offset, size, metrics).await?;
FstMap::new(fst_data).context(DecodeFstSnafu)
}
/// Retrieves the multiple finite state transducer (FST) maps from the given ranges.
async fn fst_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<FstMap>> {
self.read_vec(ranges)
async fn fst_vec<'a>(
&mut self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<FstMap>> {
self.read_vec(ranges, metrics)
.await?
.into_iter()
.map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu))
@@ -67,19 +146,28 @@ pub trait InvertedIndexReader: Send + Sync {
}
/// Retrieves the bitmap from the given offset and size.
async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result<Bitmap> {
self.range_read(offset, size).await.and_then(|bytes| {
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
})
async fn bitmap<'a>(
&self,
offset: u64,
size: u32,
bitmap_type: BitmapType,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Bitmap> {
self.range_read(offset, size, metrics)
.await
.and_then(|bytes| {
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
})
}
/// Retrieves the multiple bitmaps from the given ranges.
async fn bitmap_deque(
async fn bitmap_deque<'a>(
&mut self,
ranges: &[(Range<u64>, BitmapType)],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<VecDeque<Bitmap>> {
let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
let bytes = self.read_vec(&ranges).await?;
let bytes = self.read_vec(&ranges, metrics).await?;
bytes
.into_iter()
.zip(types)

View File

@@ -14,6 +14,7 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use bytes::Bytes;
@@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure};
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::MIN_BLOB_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::footer::{
DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
};
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// Inverted index blob reader, implements [`InvertedIndexReader`]
pub struct InvertedIndexBlobReader<R> {
@@ -53,27 +54,58 @@ impl<R> InvertedIndexBlobReader<R> {
#[async_trait]
impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let buf = self
.source
.read(offset..offset + size as u64)
.await
.context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += size as u64;
m.total_ranges += 1;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(buf.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.source.read_vec(ranges).await.context(CommonIoSnafu)
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
m.total_ranges += ranges.len();
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
}
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new)
footer_reader.metadata(metrics).await.map(Arc::new)
}
}
@@ -173,7 +205,7 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 2);
let meta0 = metas.metas.get("tag0").unwrap();
@@ -200,13 +232,14 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let fst_map = blob_reader
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
None,
)
.await
.unwrap();
@@ -219,6 +252,7 @@ mod tests {
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
None,
)
.await
.unwrap();
@@ -232,30 +266,30 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag1").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;
@@ -23,6 +25,7 @@ use crate::inverted_index::error::{
UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
@@ -54,12 +57,17 @@ impl<R> InvertedIndexFooterReader<R> {
}
impl<R: RangeReader> InvertedIndexFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
pub async fn metadata(
&mut self,
mut metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<InvertedIndexMetas> {
ensure!(
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
BlobSizeTooSmallSnafu
);
let start = metrics.as_ref().map(|_| Instant::now());
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
let suffix = self
.source
@@ -73,19 +81,36 @@ impl<R: RangeReader> InvertedIndexFooterReader<R> {
let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
// Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size {
let result = if length > suffix_len as u64 - footer_size {
let metadata_start = self.blob_size - length - footer_size;
let meta = self
.source
.read(metadata_start..self.blob_size - footer_size)
.await
.context(CommonIoSnafu)?;
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
m.total_ranges += 2;
}
self.parse_payload(&meta, length)
} else {
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size());
m.total_ranges += 1;
}
let metadata_start = self.blob_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(meta, length)
};
if let Some(m) = metrics {
m.fetch_elapsed += start.unwrap().elapsed();
}
result
}
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
@@ -186,7 +211,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let metas = reader.metadata().await.unwrap();
let metas = reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
@@ -210,7 +235,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let result = reader.metadata().await;
let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
}
}
@@ -233,7 +258,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let result = reader.metadata().await;
let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
}
}

View File

@@ -122,7 +122,7 @@ mod tests {
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 0);
@@ -182,7 +182,7 @@ mod tests {
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
@@ -198,13 +198,19 @@ mod tests {
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -213,7 +219,12 @@ mod tests {
);
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -222,7 +233,12 @@ mod tests {
);
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -241,13 +257,19 @@ mod tests {
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -256,7 +278,12 @@ mod tests {
);
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -265,7 +292,12 @@ mod tests {
);
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(

View File

@@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their
/// corresponding bitmaps within an inverted index.
@@ -35,7 +35,8 @@ impl<'a> ParallelFstValuesMapper<'a> {
pub async fn map_values_vec(
&mut self,
value_and_meta_vec: &[(Vec<u64>, &'a InvertedIndexMeta)],
value_and_meta_vec: &[(Vec<u64>, &InvertedIndexMeta)],
metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bitmap>> {
let groups = value_and_meta_vec
.iter()
@@ -64,7 +65,7 @@ impl<'a> ParallelFstValuesMapper<'a> {
}
common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges);
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?;
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges, metrics).await?;
let mut output = Vec::with_capacity(groups.len());
for counter in groups {
@@ -95,23 +96,25 @@ mod tests {
#[tokio::test]
async fn test_map_values_vec() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_bitmap_deque().returning(|ranges| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
mock_reader
.expect_bitmap_deque()
.returning(|ranges, _metrics| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
}
}
Ok(output)
});
Ok(output)
});
let meta = InvertedIndexMeta {
bitmap_type: BitmapType::Roaring.into(),
@@ -120,13 +123,13 @@ mod tests {
let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader);
let result = values_mapper
.map_values_vec(&[(vec![], &meta)])
.map_values_vec(&[(vec![], &meta)], None)
.await
.unwrap();
assert_eq!(result[0].count_ones(), 0);
let result = values_mapper
.map_values_vec(&[(vec![value(1, 1)], &meta)])
.map_values_vec(&[(vec![value(1, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -135,7 +138,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta)])
.map_values_vec(&[(vec![value(2, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -144,7 +147,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)])
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -153,7 +156,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)])
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -162,7 +165,10 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)])
.map_values_vec(
&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)],
None,
)
.await
.unwrap();
assert_eq!(
@@ -174,10 +180,13 @@ mod tests {
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
);
let result = values_mapper
.map_values_vec(&[
(vec![value(2, 1), value(1, 1)], &meta),
(vec![value(1, 1)], &meta),
])
.map_values_vec(
&[
(vec![value(2, 1), value(1, 1)], &meta),
(vec![value(1, 1)], &meta),
],
None,
)
.await
.unwrap();
assert_eq!(

View File

@@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier;
use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// The output of an apply operation.
#[derive(Clone, Debug, PartialEq)]
@@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
#[allow(unused_parens)]
async fn apply<'a>(
async fn apply<'a, 'b>(
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput>;
/// Returns the memory usage of the applier.

View File

@@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas;
use crate::bitmap::Bitmap;
use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use crate::inverted_index::search::fst_apply::{
FstApplier, IntersectionFstApplier, KeysFstApplier,
};
@@ -43,12 +43,14 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply<'a>(
async fn apply<'a, 'b>(
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput> {
let metadata = reader.metadata().await?;
let mut metrics = metrics;
let metadata = reader.metadata(metrics.as_deref_mut()).await?;
let mut output = ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: metadata.total_row_count as _,
@@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier {
return Ok(output);
}
let fsts = reader.fst_vec(&fst_ranges).await?;
let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
let value_and_meta_vec = fsts
.into_iter()
.zip(appliers)
@@ -92,7 +94,7 @@ impl IndexApplier for PredicatesIndexApplier {
.collect::<Vec<_>>();
let mut mapper = ParallelFstValuesMapper::new(reader);
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
for bm in bm_vec {
@@ -221,26 +223,28 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_ranges| {
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
Ok(vec![
FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
])
});
mock_reader.expect_bitmap_deque().returning(|arg| {
assert_eq!(arg.len(), 1);
let range = &arg[0].0;
let bitmap_type = arg[0].1;
assert_eq!(*range, 2..3);
assert_eq!(bitmap_type, BitmapType::Roaring);
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
&[0b10101010],
bitmap_type,
)]))
});
mock_reader
.expect_bitmap_deque()
.returning(|arg, _metrics| {
assert_eq!(arg.len(), 1);
let range = &arg[0].0;
let bitmap_type = arg[0].1;
assert_eq!(*range, 2..3);
assert_eq!(bitmap_type, BitmapType::Roaring);
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
&[0b10101010],
bitmap_type,
)]))
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(
@@ -252,14 +256,14 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_range| {
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_range, _metrics| {
Ok(vec![
FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
])
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(output.matched_segment_ids.count_ones(), 0);
@@ -279,8 +283,8 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
mock_reader.expect_fst_vec().returning(|ranges| {
.returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
let mut output = vec![];
for range in ranges {
match range.start {
@@ -293,27 +297,29 @@ mod tests {
}
Ok(output)
});
mock_reader.expect_bitmap_deque().returning(|ranges| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
mock_reader
.expect_bitmap_deque()
.returning(|ranges, _metrics| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
}
}
Ok(output)
});
Ok(output)
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(
@@ -331,10 +337,10 @@ mod tests {
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
@@ -343,7 +349,7 @@ mod tests {
#[tokio::test]
async fn test_index_applier_with_empty_index() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_metadata().returning(move || {
mock_reader.expect_metadata().returning(move |_| {
Ok(Arc::new(InvertedIndexMetas {
total_row_count: 0, // No rows
segment_row_count: 1,
@@ -359,7 +365,7 @@ mod tests {
};
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert!(output.matched_segment_ids.is_empty());
@@ -370,7 +376,7 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(vec![])));
.returning(|_| Ok(mock_metas(vec![])));
let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier.expect_apply().never();
@@ -385,6 +391,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
},
&mut mock_reader,
None,
)
.await;
assert!(matches!(result, Err(Error::IndexNotFound { .. })));
@@ -395,6 +402,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
},
&mut mock_reader,
None,
)
.await
.unwrap();
@@ -406,6 +414,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
},
&mut mock_reader,
None,
)
.await
.unwrap();

View File

@@ -45,6 +45,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
@@ -75,19 +76,24 @@ pub enum CacheStrategy {
}
impl CacheStrategy {
/// Calls [CacheManager::get_parquet_meta_data()].
pub async fn get_parquet_meta_data(
/// Gets parquet metadata with cache metrics tracking.
/// Returns the metadata and updates the provided metrics.
pub(crate) async fn get_parquet_meta_data(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id).await
cache_manager.get_parquet_meta_data(file_id, metrics).await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data(file_id).await
cache_manager.get_parquet_meta_data(file_id, metrics).await
}
CacheStrategy::Disabled => {
metrics.cache_miss += 1;
None
}
CacheStrategy::Disabled => None,
}
}
@@ -292,16 +298,17 @@ impl CacheManager {
CacheManagerBuilder::default()
}
/// Gets cached [ParquetMetaData] from in-memory cache first.
/// If not found, tries to get it from write cache and fill the in-memory cache.
pub async fn get_parquet_meta_data(
/// Gets cached [ParquetMetaData] with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_parquet_meta_data(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
let metadata = self.get_parquet_meta_data_from_mem_cache(file_id);
if metadata.is_some() {
return metadata;
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
// Try to get metadata from write cache
@@ -309,11 +316,13 @@ impl CacheManager {
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);
// Put metadata into sst meta cache
self.put_parquet_meta_data(file_id, metadata.clone());
return Some(metadata);
};
metrics.cache_miss += 1;
None
}
@@ -826,8 +835,14 @@ mod tests {
let region_id = RegionId::new(1, 1);
let file_id = RegionFileId::new(region_id, FileId::random());
let metadata = parquet_meta();
let mut metrics = MetadataCacheMetrics::default();
cache.put_parquet_meta_data(file_id, metadata);
assert!(cache.get_parquet_meta_data(file_id).await.is_none());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.await
.is_none()
);
let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
@@ -849,14 +864,30 @@ mod tests {
#[tokio::test]
async fn test_parquet_meta_cache() {
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let mut metrics = MetadataCacheMetrics::default();
let region_id = RegionId::new(1, 1);
let file_id = RegionFileId::new(region_id, FileId::random());
assert!(cache.get_parquet_meta_data(file_id).await.is_none());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.await
.is_none()
);
let metadata = parquet_meta();
cache.put_parquet_meta_data(file_id, metadata);
assert!(cache.get_parquet_meta_data(file_id).await.is_some());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.await
.is_some()
);
cache.remove_parquet_meta_data(file_id);
assert!(cache.get_parquet_meta_data(file_id).await.is_none());
assert!(
cache
.get_parquet_meta_data(file_id, &mut metrics)
.await
.is_none()
);
}
#[test]

View File

@@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata";
/// Metrics for index content.
const INDEX_CONTENT_TYPE: &str = "index_content";
/// Metrics collected from IndexCache operations.
#[derive(Debug, Default, Clone)]
pub struct IndexCacheMetrics {
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
/// Number of pages accessed.
pub num_pages: usize,
/// Total bytes from pages.
pub page_bytes: u64,
}
impl IndexCacheMetrics {
/// Merges another set of metrics into this one.
pub fn merge(&mut self, other: &Self) {
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
self.num_pages += other.num_pages;
self.page_bytes += other.page_bytes;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageKey {
page_id: u64,
@@ -160,18 +183,20 @@ where
offset: u64,
size: u32,
load: F,
) -> Result<Vec<u8>, E>
) -> Result<(Vec<u8>, IndexCacheMetrics), E>
where
F: Fn(Vec<Range<u64>>) -> Fut,
Fut: Future<Output = Result<Vec<Bytes>, E>>,
E: std::error::Error,
{
let mut metrics = IndexCacheMetrics::default();
let page_keys =
PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
// Size is 0, return empty data.
if page_keys.is_empty() {
return Ok(Vec::new());
return Ok((Vec::new(), metrics));
}
metrics.num_pages = page_keys.len();
let mut data = Vec::with_capacity(page_keys.len());
data.resize(page_keys.len(), Bytes::new());
let mut cache_miss_range = vec![];
@@ -182,10 +207,13 @@ where
match self.get_page(key, *page_key) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_hit += 1;
metrics.page_bytes += page.len() as u64;
data[i] = page;
}
None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_miss += 1;
let base_offset = page_key.page_id * self.page_size;
let pruned_size = if i == last_index {
prune_size(page_keys.iter(), file_size, self.page_size)
@@ -201,14 +229,18 @@ where
let pages = load(cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page_key = page_keys[i];
metrics.page_bytes += page.len() as u64;
data[i] = page.clone();
self.put_page(key, page_key, page.clone());
}
}
let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec())
Ok((
buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec(),
metrics,
))
}
fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {

View File

@@ -14,12 +14,13 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::BloomFilterReader;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
@@ -114,51 +115,93 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
#[async_trait]
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
self.cache
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.map(|b| b.into())
.await?;
if let Some(m) = metrics {
m.total_ranges += cache_metrics.num_pages;
m.total_bytes += cache_metrics.page_bytes;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_ranges += total_cache_metrics.num_pages;
m.total_bytes += total_cache_metrics.page_bytes;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(pages)
}
/// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta> {
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self
.cache
.get_metadata((self.file_id, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok((*cached).clone())
} else {
let meta = self.inner.metadata().await?;
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(
(self.file_id, self.column_id, self.tag),
Arc::new(meta.clone()),

View File

@@ -14,12 +14,13 @@
use core::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait;
use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::InvertedIndexReader;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
@@ -83,46 +84,86 @@ impl<R> CachedInvertedIndexBlobReader<R> {
#[async_trait]
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
self.cache
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.await?;
if let Some(m) = metrics {
m.total_bytes += cache_metrics.page_bytes;
m.total_ranges += cache_metrics.num_pages;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_bytes += total_cache_metrics.page_bytes;
m.total_ranges += total_cache_metrics.num_pages;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(pages)
}
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok(cached)
} else {
let meta = self.inner.metadata().await?;
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta)
@@ -277,7 +318,7 @@ mod test {
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
);
let metadata = cached_reader.metadata().await.unwrap();
let metadata = cached_reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
@@ -292,13 +333,19 @@ mod test {
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -307,7 +354,12 @@ mod test {
);
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -316,7 +368,12 @@ mod test {
);
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -335,13 +392,19 @@ mod test {
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -350,7 +413,12 @@ mod test {
);
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -359,7 +427,12 @@ mod test {
);
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -372,16 +445,16 @@ mod test {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.random_range(0..file_size);
let size = rng.random_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap();
let expected = cached_reader.range_read(offset, size, None).await.unwrap();
let inner = &cached_reader.inner;
let read = cached_reader
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,
file_size,
offset,
size,
|ranges| async move { inner.read_vec(&ranges).await },
|ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.unwrap();

View File

@@ -233,7 +233,7 @@ async fn collect_inverted_entries(
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
);
match reader.metadata().await {
match reader.metadata(None).await {
Ok(metas) => metas,
Err(err) => {
warn!(
@@ -247,7 +247,7 @@ async fn collect_inverted_entries(
}
} else {
let reader = InvertedIndexBlobReader::new(blob_reader);
match reader.metadata().await {
match reader.metadata(None).await {
Ok(metas) => metas,
Err(err) => {
warn!(
@@ -318,10 +318,10 @@ async fn try_read_bloom_meta(
bloom_reader,
cache.clone(),
)
.metadata()
.metadata(None)
.await
}
_ => bloom_reader.metadata().await,
_ => bloom_reader.metadata(None).await,
};
match result {

View File

@@ -41,10 +41,14 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Verbose scan metrics for a partition.
#[derive(Default)]
@@ -81,6 +85,8 @@ pub(crate) struct ScanMetricsSet {
// SST related metrics:
/// Duration to build file ranges.
build_parts_cost: Duration,
/// Duration to scan SST files.
sst_scan_cost: Duration,
/// Number of row groups before filtering.
rg_total: usize,
/// Number of row groups filtered by fulltext index.
@@ -126,6 +132,18 @@ pub(crate) struct ScanMetricsSet {
/// The stream reached EOF
stream_eof: bool,
// Optional verbose metrics:
/// Inverted index apply metrics.
inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Bloom filter index apply metrics.
bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Fulltext index apply metrics.
fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
/// Parquet fetch metrics.
fetch_metrics: Option<ParquetFetchMetrics>,
/// Metadata cache metrics.
metadata_cache_metrics: Option<MetadataCacheMetrics>,
}
impl fmt::Debug for ScanMetricsSet {
@@ -141,6 +159,7 @@ impl fmt::Debug for ScanMetricsSet {
num_mem_ranges,
num_file_ranges,
build_parts_cost,
sst_scan_cost,
rg_total,
rg_fulltext_filtered,
rg_inverted_filtered,
@@ -166,6 +185,11 @@ impl fmt::Debug for ScanMetricsSet {
mem_rows,
mem_batches,
mem_series,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
fetch_metrics,
metadata_cache_metrics,
} = self;
// Write core metrics
@@ -181,6 +205,7 @@ impl fmt::Debug for ScanMetricsSet {
\"num_mem_ranges\":{num_mem_ranges}, \
\"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \
\"sst_scan_cost\":\"{sst_scan_cost:?}\", \
\"rg_total\":{rg_total}, \
\"rows_before_filter\":{rows_before_filter}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \
@@ -255,6 +280,33 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
}
// Write optional verbose metrics if they are not empty
if let Some(metrics) = inverted_index_apply_metrics
&& !metrics.is_empty()
{
write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = bloom_filter_apply_metrics
&& !metrics.is_empty()
{
write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fulltext_index_apply_metrics
&& !metrics.is_empty()
{
write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fetch_metrics
&& !metrics.is_empty()
{
write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = metadata_cache_metrics
&& !metrics.is_empty()
{
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
}
}
@@ -304,14 +356,20 @@ impl ScanMetricsSet {
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
},
num_record_batches,
num_batches,
num_rows,
scan_cost: _,
scan_cost,
metadata_cache_metrics,
fetch_metrics,
} = other;
self.build_parts_cost += *build_cost;
self.sst_scan_cost += *scan_cost;
self.rg_total += *rg_total;
self.rg_fulltext_filtered += *rg_fulltext_filtered;
@@ -328,6 +386,31 @@ impl ScanMetricsSet {
self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows;
// Merge optional verbose metrics
if let Some(metrics) = inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(InvertedIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(BloomFilterIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(FulltextIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fetch_metrics {
self.fetch_metrics
.get_or_insert_with(ParquetFetchMetrics::default)
.merge_from(metrics);
}
self.metadata_cache_metrics
.get_or_insert_with(MetadataCacheMetrics::default)
.merge_from(metadata_cache_metrics);
}
/// Sets distributor metrics.
@@ -615,6 +698,11 @@ impl PartitionMetrics {
let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.set_distributor_metrics(metrics);
}
/// Returns whether verbose explain is enabled.
pub(crate) fn explain_verbose(&self) -> bool {
self.0.explain_verbose
}
}
impl fmt::Debug for PartitionMetrics {
@@ -768,6 +856,21 @@ fn can_split_series(num_rows: u64, num_series: u64) -> bool {
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
}
/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
/// based on the `explain_verbose` flag.
fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
if explain_verbose {
ReaderFilterMetrics {
inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
..Default::default()
}
} else {
ReaderFilterMetrics::default()
}
}
/// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
@@ -776,7 +879,10 @@ pub(crate) async fn scan_file_ranges(
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<Batch>>> {
let mut reader_metrics = ReaderMetrics::default();
let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
@@ -799,7 +905,10 @@ pub(crate) async fn scan_flat_file_ranges(
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<RecordBatch>>> {
let mut reader_metrics = ReaderMetrics::default();
let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
@@ -822,10 +931,18 @@ pub fn build_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
@@ -857,10 +974,18 @@ pub fn build_flat_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let mut reader = range.flat_reader().await?;
let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -17,11 +17,14 @@ mod builder;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
};
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
@@ -47,6 +50,62 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Default, Clone)]
pub struct BloomFilterIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for bloom filter read operations.
pub read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
apply_elapsed,
blob_cache_miss,
blob_read_bytes,
read_metrics,
} = self;
if self.is_empty() {
return write!(f, "{{}}");
}
write!(f, "{{")?;
write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
if *blob_cache_miss > 0 {
write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
}
if *blob_read_bytes > 0 {
write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
}
write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
write!(f, "}}")
}
}
impl BloomFilterIndexApplyMetrics {
/// Returns true if the metrics are empty (contain no meaningful data).
pub fn is_empty(&self) -> bool {
self.apply_elapsed.is_zero()
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.read_metrics.merge_from(&other.read_metrics);
}
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
@@ -133,15 +192,20 @@ impl BloomFilterIndexApplier {
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.start_timer();
let apply_start = Instant::now();
// Calculates row groups' ranges based on start of the file.
let mut input = Vec::with_capacity(row_groups.size_hint().0);
@@ -163,7 +227,7 @@ impl BloomFilterIndexApplier {
for (column_id, predicates) in self.predicates.iter() {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
{
Some(blob) => blob,
@@ -173,6 +237,9 @@ impl BloomFilterIndexApplier {
// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
if let Some(m) = &mut metrics {
m.blob_read_bytes += blob_size;
}
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
*column_id,
@@ -181,12 +248,12 @@ impl BloomFilterIndexApplier {
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
} else {
let reader = BloomFilterReaderImpl::new(blob);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
@@ -201,6 +268,16 @@ impl BloomFilterIndexApplier {
}
}
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(output)
}
@@ -212,6 +289,7 @@ impl BloomFilterIndexApplier {
file_id: RegionFileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Option<BlobReader>> {
let reader = match self
.cached_blob_reader(file_id, column_id, file_size_hint)
@@ -219,6 +297,9 @@ impl BloomFilterIndexApplier {
{
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Some(m) = metrics {
m.blob_cache_miss += 1;
}
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
@@ -320,6 +401,7 @@ impl BloomFilterIndexApplier {
reader: R,
predicates: &[InListPredicate],
output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> std::result::Result<(), index::bloom_filter::error::Error> {
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
@@ -329,7 +411,10 @@ impl BloomFilterIndexApplier {
continue;
}
*row_group_output = applier.search(predicates, row_group_output).await?;
let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
*row_group_output = applier
.search(predicates, row_group_output, read_metrics)
.await?;
}
Ok(())
@@ -393,7 +478,7 @@ mod tests {
let applier = builder.build(&exprs).unwrap().unwrap();
applier
.apply(file_id, None, row_groups.into_iter())
.apply(file_id, None, row_groups.into_iter(), None)
.await
.unwrap()
.into_iter()

View File

@@ -637,17 +637,17 @@ pub(crate) mod tests {
.unwrap();
let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap();
let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 10);
for i in 0..5 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag1"));
}
for i in 5..10 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag2"));
}
}
@@ -662,13 +662,13 @@ pub(crate) mod tests {
.unwrap();
let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap();
let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 5);
for i in 0u64..20 {
let idx = i as usize / 4;
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
.unwrap();

View File

@@ -16,11 +16,12 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::iter;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::BloomFilterReaderImpl;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
use index::fulltext_index::{Analyzer, Config};
@@ -53,6 +54,95 @@ use crate::sst::index::puffin_manager::{
pub mod builder;
/// Metrics for tracking fulltext index apply operations.
#[derive(Default, Clone)]
pub struct FulltextIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Number of directory cache hits.
pub dir_cache_hit: usize,
/// Number of directory cache misses.
pub dir_cache_miss: usize,
/// Elapsed time to initialize directory data.
pub dir_init_elapsed: std::time::Duration,
/// Metrics for bloom filter reads.
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for FulltextIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
apply_elapsed,
blob_cache_miss,
dir_cache_hit,
dir_cache_miss,
dir_init_elapsed,
bloom_filter_read_metrics,
} = self;
if self.is_empty() {
return write!(f, "{{}}");
}
write!(f, "{{")?;
write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
if *blob_cache_miss > 0 {
write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
}
if *dir_cache_hit > 0 {
write!(f, ", \"dir_cache_hit\":{}", dir_cache_hit)?;
}
if *dir_cache_miss > 0 {
write!(f, ", \"dir_cache_miss\":{}", dir_cache_miss)?;
}
if !dir_init_elapsed.is_zero() {
write!(f, ", \"dir_init_elapsed\":\"{:?}\"", dir_init_elapsed)?;
}
write!(
f,
", \"bloom_filter_read_metrics\":{:?}",
bloom_filter_read_metrics
)?;
write!(f, "}}")
}
}
impl FulltextIndexApplyMetrics {
/// Returns true if the metrics are empty (contain no meaningful data).
pub fn is_empty(&self) -> bool {
self.apply_elapsed.is_zero()
}
/// Collects metrics from a directory read operation.
pub fn collect_dir_metrics(
&mut self,
elapsed: std::time::Duration,
dir_metrics: puffin::puffin_manager::DirMetrics,
) {
self.dir_init_elapsed += elapsed;
if dir_metrics.cache_hit {
self.dir_cache_hit += 1;
} else {
self.dir_cache_miss += 1;
}
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.dir_cache_hit += other.dir_cache_hit;
self.dir_cache_miss += other.dir_cache_miss;
self.dir_init_elapsed += other.dir_init_elapsed;
self.bloom_filter_read_metrics
.merge_from(&other.bloom_filter_read_metrics);
}
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier {
/// Requests to be applied.
@@ -124,14 +214,18 @@ impl FulltextIndexApplier {
impl FulltextIndexApplier {
/// Applies fine-grained fulltext index to the specified SST file.
/// Returns the row ids that match the queries.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_fine(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
let timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let apply_start = Instant::now();
let mut row_ids: Option<BTreeSet<RowId>> = None;
for (column_id, request) in self.requests.iter() {
@@ -140,7 +234,13 @@ impl FulltextIndexApplier {
}
let Some(result) = self
.apply_fine_one_column(file_size_hint, file_id, *column_id, request)
.apply_fine_one_column(
file_size_hint,
file_id,
*column_id,
request,
metrics.as_deref_mut(),
)
.await?
else {
continue;
@@ -159,9 +259,16 @@ impl FulltextIndexApplier {
}
}
if row_ids.is_none() {
timer.stop_and_discard();
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(row_ids)
}
@@ -171,6 +278,7 @@ impl FulltextIndexApplier {
file_id: RegionFileId,
column_id: ColumnId,
request: &FulltextRequest,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
let blob_key = format!(
"{INDEX_BLOB_TYPE_TANTIVY}-{}",
@@ -178,7 +286,7 @@ impl FulltextIndexApplier {
);
let dir = self
.index_source
.dir(file_id, &blob_key, file_size_hint)
.dir(file_id, &blob_key, file_size_hint, metrics)
.await?;
let dir = match &dir {
@@ -240,15 +348,20 @@ impl FulltextIndexApplier {
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_coarse(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
let timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let apply_start = Instant::now();
let (input, mut output) = Self::init_coarse_output(row_groups);
let mut applied = false;
@@ -266,16 +379,27 @@ impl FulltextIndexApplier {
*column_id,
&request.terms,
&mut output,
metrics.as_deref_mut(),
)
.await?;
}
if !applied {
timer.stop_and_discard();
return Ok(None);
}
Self::adjust_coarse_output(input, &mut output);
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(Some(output))
}
@@ -286,6 +410,7 @@ impl FulltextIndexApplier {
column_id: ColumnId,
terms: &[FulltextTerm],
output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<bool> {
let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}",
@@ -293,7 +418,7 @@ impl FulltextIndexApplier {
);
let Some(reader) = self
.index_source
.blob(file_id, &blob_key, file_size_hint)
.blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
.await?
else {
return Ok(false);
@@ -336,7 +461,13 @@ impl FulltextIndexApplier {
}
*row_group_output = applier
.search(&predicates, row_group_output)
.search(
&predicates,
row_group_output,
metrics
.as_deref_mut()
.map(|m| &mut m.bloom_filter_read_metrics),
)
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
@@ -483,8 +614,15 @@ impl IndexSource {
file_id: RegionFileId,
key: &str,
file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = metrics {
m.blob_cache_miss += 1;
}
let res = reader.blob(key).await;
match res {
Ok(blob) => Ok(Some(blob)),
@@ -514,11 +652,25 @@ impl IndexSource {
file_id: RegionFileId,
key: &str,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = &mut metrics {
m.blob_cache_miss += 1;
}
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => {
if fallbacked {
@@ -526,9 +678,16 @@ impl IndexSource {
} else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?;
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}

View File

@@ -723,15 +723,16 @@ mod tests {
let backend = backend.clone();
async move {
match backend {
FulltextBackend::Tantivy => {
applier.apply_fine(region_file_id, None).await.unwrap()
}
FulltextBackend::Tantivy => applier
.apply_fine(region_file_id, None, None)
.await
.unwrap(),
FulltextBackend::Bloom => {
let coarse_mask = coarse_mask.unwrap_or_default();
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
// row group id == row id
let resp = applier
.apply_coarse(region_file_id, None, row_groups)
.apply_coarse(region_file_id, None, row_groups, None)
.await
.unwrap();
resp.map(|r| {

View File

@@ -16,10 +16,11 @@ pub mod builder;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
@@ -44,6 +45,67 @@ use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking inverted index apply operations.
#[derive(Default, Clone)]
pub struct InvertedIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses (0 or 1).
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for inverted index reads.
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
impl std::fmt::Debug for InvertedIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
apply_elapsed,
blob_cache_miss,
blob_read_bytes,
inverted_index_read_metrics,
} = self;
if self.is_empty() {
return write!(f, "{{}}");
}
write!(f, "{{")?;
write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
if *blob_cache_miss > 0 {
write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
}
if *blob_read_bytes > 0 {
write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
}
write!(
f,
", \"inverted_index_read_metrics\":{:?}",
inverted_index_read_metrics
)?;
write!(f, "}}")
}
}
impl InvertedIndexApplyMetrics {
/// Returns true if the metrics are empty (contain no meaningful data).
pub fn is_empty(&self) -> bool {
self.apply_elapsed.is_zero()
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.inverted_index_read_metrics
.merge_from(&other.inverted_index_read_metrics);
}
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub(crate) struct InvertedIndexApplier {
@@ -124,24 +186,30 @@ impl InvertedIndexApplier {
self
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
let start = Instant::now();
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
let mut cache_miss = 0;
let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
cache_miss += 1;
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
@@ -149,8 +217,9 @@ impl InvertedIndexApplier {
}
};
if let Some(index_cache) = &self.inverted_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(),
blob_size,
@@ -158,16 +227,42 @@ impl InvertedIndexApplier {
index_cache.clone(),
);
self.index_applier
.apply(context, &mut index_reader)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
} else {
let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier
.apply(context, &mut index_reader)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
};
// Record elapsed time to histogram and collect metrics if requested
let elapsed = start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(metrics) = metrics {
metrics.apply_elapsed = elapsed;
metrics.blob_cache_miss = cache_miss;
metrics.blob_read_bytes = blob_size;
}
result
}
/// Creates a blob reader from the cached index file.
@@ -281,7 +376,7 @@ mod tests {
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().returning(|_, _| {
mock_index_applier.expect_apply().returning(|_, _, _| {
Ok(ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: 100,
@@ -297,7 +392,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(file_id, None).await.unwrap();
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -345,7 +440,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(file_id, None).await;
let res = sst_index_applier.apply(file_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -615,7 +615,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id, None)
.apply(sst_file_id, None, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -245,7 +245,7 @@ mod tests {
let bs = blob_reader.read(0..meta.content_length).await.unwrap();
assert_eq!(&*bs, raw_data);
let dir_guard = reader.dir(dir_key).await.unwrap();
let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
let file = dir_guard.path().join("hello");
let data = tokio::fs::read(file).await.unwrap();
assert_eq!(data, raw_data);

View File

@@ -45,6 +45,7 @@ use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Checks if a row group contains delete operations by examining the min value of op_type column.
///
@@ -117,11 +118,16 @@ impl FileRange {
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?;
let use_last_row_reader = if selector
@@ -168,11 +174,18 @@ impl FileRange {
}
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
pub(crate) async fn flat_reader(
&self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<FlatPruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?;
// Compute skip_fields once for this row group

View File

@@ -52,15 +52,21 @@ use crate::metrics::{
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
};
use crate::sst::index::fulltext_index::applier::{
FulltextIndexApplierRef, FulltextIndexApplyMetrics,
};
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -253,7 +259,9 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
let parquet_meta = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST.
@@ -378,25 +386,34 @@ impl ParquetReaderBuilder {
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<Arc<ParquetMetaData>> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
.start_timer();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data(file_id, cache_metrics)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok(metadata);
}
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
self.cache_strategy
.put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok(metadata)
}
@@ -527,7 +544,11 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.apply_fine(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(Some(res)) => {
@@ -595,13 +616,17 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
apply_output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
@@ -670,7 +695,12 @@ impl ParquetReaderBuilder {
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(apply_output) => {
@@ -748,7 +778,12 @@ impl ParquetReaderBuilder {
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.apply_coarse(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
@@ -892,7 +927,7 @@ fn all_required_row_groups_searched(
}
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) rg_total: usize,
@@ -915,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Optional metrics for bloom filter index applier.
pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Optional metrics for fulltext index applier.
pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
}
impl ReaderFilterMetrics {
@@ -931,6 +973,23 @@ impl ReaderFilterMetrics {
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
}
/// Reports metrics.
@@ -987,6 +1046,64 @@ impl ReaderFilterMetrics {
}
}
/// Metrics for parquet metadata cache operations.
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {
/// Number of memory cache hits for parquet metadata.
pub(crate) mem_cache_hit: usize,
/// Number of file cache hits for parquet metadata.
pub(crate) file_cache_hit: usize,
/// Number of cache misses for parquet metadata.
pub(crate) cache_miss: usize,
/// Duration to load parquet metadata.
pub(crate) metadata_load_cost: Duration,
}
impl std::fmt::Debug for MetadataCacheMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
mem_cache_hit,
file_cache_hit,
cache_miss,
metadata_load_cost,
} = self;
if self.is_empty() {
return write!(f, "{{}}");
}
write!(f, "{{")?;
write!(f, "\"metadata_load_cost\":\"{:?}\"", metadata_load_cost)?;
if *mem_cache_hit > 0 {
write!(f, ", \"mem_cache_hit\":{}", mem_cache_hit)?;
}
if *file_cache_hit > 0 {
write!(f, ", \"file_cache_hit\":{}", file_cache_hit)?;
}
if *cache_miss > 0 {
write!(f, ", \"cache_miss\":{}", cache_miss)?;
}
write!(f, "}}")
}
}
impl MetadataCacheMetrics {
/// Returns true if the metrics are empty (contain no meaningful data).
pub(crate) fn is_empty(&self) -> bool {
self.metadata_load_cost.is_zero()
}
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
self.mem_cache_hit += other.mem_cache_hit;
self.file_cache_hit += other.file_cache_hit;
self.cache_miss += other.cache_miss;
self.metadata_load_cost += other.metadata_load_cost;
}
}
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub struct ReaderMetrics {
@@ -1002,6 +1119,10 @@ pub struct ReaderMetrics {
pub(crate) num_batches: usize,
/// Number of rows read.
pub(crate) num_rows: usize,
/// Metrics for parquet metadata cache.
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
/// Optional metrics for page/row group fetch operations.
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
}
impl ReaderMetrics {
@@ -1013,6 +1134,15 @@ impl ReaderMetrics {
self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
self.metadata_cache_metrics
.merge_from(&other.metadata_cache_metrics);
if let Some(other_fetch) = &other.fetch_metrics {
if let Some(self_fetch) = &self.fetch_metrics {
self_fetch.merge_from(other_fetch);
} else {
self.fetch_metrics = Some(other_fetch.clone());
}
}
}
/// Reports total rows.
@@ -1067,7 +1197,10 @@ impl RowGroupReaderBuilder {
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
@@ -1079,12 +1212,17 @@ impl RowGroupReaderBuilder {
);
// Fetches data into memory.
row_group
.fetch(&self.projection, row_selection.as_ref())
.fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.data.lock().unwrap().total_fetch_elapsed += fetch_start.elapsed();
}
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
@@ -1228,6 +1366,8 @@ pub struct ParquetReader {
selection: RowGroupSelection,
/// Reader of current row group.
reader_state: ReaderState,
/// Metrics for tracking row group fetch operations.
fetch_metrics: ParquetFetchMetrics,
}
#[async_trait]
@@ -1247,7 +1387,11 @@ impl BatchReader for ParquetReader {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, Some(row_selection))
.build(
row_group_idx,
Some(row_selection),
Some(&self.fetch_metrics),
)
.await?;
// Resets the parquet reader.
@@ -1303,11 +1447,12 @@ impl ParquetReader {
context: FileRangeContextRef,
mut selection: RowGroupSelection,
) -> Result<Self> {
let fetch_metrics = ParquetFetchMetrics::default();
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, Some(row_selection))
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
.await?;
// Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx);
@@ -1324,6 +1469,7 @@ impl ParquetReader {
context,
selection,
reader_state,
fetch_metrics,
})
}

View File

@@ -35,6 +35,175 @@ use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
/// Inner data for ParquetFetchMetrics.
#[derive(Default, Debug, Clone)]
pub struct ParquetFetchMetricsData {
/// Number of page cache hits.
pub page_cache_hit: usize,
/// Number of write cache hits.
pub write_cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
/// Number of pages to fetch from mem cache.
pub pages_to_fetch_mem: usize,
/// Total size in bytes of pages to fetch from mem cache.
pub page_size_to_fetch_mem: u64,
/// Number of pages to fetch from write cache.
pub pages_to_fetch_write_cache: usize,
/// Total size in bytes of pages to fetch from write cache.
pub page_size_to_fetch_write_cache: u64,
/// Number of pages to fetch from store.
pub pages_to_fetch_store: usize,
/// Total size in bytes of pages to fetch from store.
pub page_size_to_fetch_store: u64,
/// Total size in bytes of pages actually returned.
pub page_size_needed: u64,
/// Elapsed time fetching from write cache.
pub write_cache_fetch_elapsed: std::time::Duration,
/// Elapsed time fetching from object store.
pub store_fetch_elapsed: std::time::Duration,
/// Total elapsed time for fetching row groups.
pub total_fetch_elapsed: std::time::Duration,
}
impl ParquetFetchMetricsData {
/// Returns true if the metrics are empty (contain no meaningful data).
fn is_empty(&self) -> bool {
self.total_fetch_elapsed.is_zero()
}
}
/// Metrics for tracking page/row group fetch operations.
#[derive(Default)]
pub struct ParquetFetchMetrics {
pub data: std::sync::Mutex<ParquetFetchMetricsData>,
}
impl std::fmt::Debug for ParquetFetchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let data = self.data.lock().unwrap();
if data.is_empty() {
return write!(f, "{{}}");
}
let ParquetFetchMetricsData {
page_cache_hit,
write_cache_hit,
cache_miss,
pages_to_fetch_mem,
page_size_to_fetch_mem,
pages_to_fetch_write_cache,
page_size_to_fetch_write_cache,
pages_to_fetch_store,
page_size_to_fetch_store,
page_size_needed,
write_cache_fetch_elapsed,
store_fetch_elapsed,
total_fetch_elapsed,
} = *data;
write!(f, "{{")?;
write!(f, "\"total_fetch_elapsed\":\"{:?}\"", total_fetch_elapsed)?;
if page_cache_hit > 0 {
write!(f, ", \"page_cache_hit\":{}", page_cache_hit)?;
}
if write_cache_hit > 0 {
write!(f, ", \"write_cache_hit\":{}", write_cache_hit)?;
}
if cache_miss > 0 {
write!(f, ", \"cache_miss\":{}", cache_miss)?;
}
if pages_to_fetch_mem > 0 {
write!(f, ", \"pages_to_fetch_mem\":{}", pages_to_fetch_mem)?;
}
if page_size_to_fetch_mem > 0 {
write!(f, ", \"page_size_to_fetch_mem\":{}", page_size_to_fetch_mem)?;
}
if pages_to_fetch_write_cache > 0 {
write!(
f,
", \"pages_to_fetch_write_cache\":{}",
pages_to_fetch_write_cache
)?;
}
if page_size_to_fetch_write_cache > 0 {
write!(
f,
", \"page_size_to_fetch_write_cache\":{}",
page_size_to_fetch_write_cache
)?;
}
if pages_to_fetch_store > 0 {
write!(f, ", \"pages_to_fetch_store\":{}", pages_to_fetch_store)?;
}
if page_size_to_fetch_store > 0 {
write!(
f,
", \"page_size_to_fetch_store\":{}",
page_size_to_fetch_store
)?;
}
if page_size_needed > 0 {
write!(f, ", \"page_size_needed\":{}", page_size_needed)?;
}
if !write_cache_fetch_elapsed.is_zero() {
write!(
f,
", \"write_cache_fetch_elapsed\":\"{:?}\"",
write_cache_fetch_elapsed
)?;
}
if !store_fetch_elapsed.is_zero() {
write!(f, ", \"store_fetch_elapsed\":\"{:?}\"", store_fetch_elapsed)?;
}
write!(f, "}}")
}
}
impl ParquetFetchMetrics {
/// Returns true if the metrics are empty (contain no meaningful data).
pub fn is_empty(&self) -> bool {
self.data.lock().unwrap().is_empty()
}
/// Merges metrics from another [ParquetFetchMetrics].
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
let ParquetFetchMetricsData {
page_cache_hit,
write_cache_hit,
cache_miss,
pages_to_fetch_mem,
page_size_to_fetch_mem,
pages_to_fetch_write_cache,
page_size_to_fetch_write_cache,
pages_to_fetch_store,
page_size_to_fetch_store,
page_size_needed,
write_cache_fetch_elapsed,
store_fetch_elapsed,
total_fetch_elapsed,
} = *other.data.lock().unwrap();
let mut data = self.data.lock().unwrap();
data.page_cache_hit += page_cache_hit;
data.write_cache_hit += write_cache_hit;
data.cache_miss += cache_miss;
data.pages_to_fetch_mem += pages_to_fetch_mem;
data.page_size_to_fetch_mem += page_size_to_fetch_mem;
data.pages_to_fetch_write_cache += pages_to_fetch_write_cache;
data.page_size_to_fetch_write_cache += page_size_to_fetch_write_cache;
data.pages_to_fetch_store += pages_to_fetch_store;
data.page_size_to_fetch_store += page_size_to_fetch_store;
data.page_size_needed += page_size_needed;
data.write_cache_fetch_elapsed += write_cache_fetch_elapsed;
data.store_fetch_elapsed += store_fetch_elapsed;
data.total_fetch_elapsed += total_fetch_elapsed;
}
}
pub(crate) struct RowGroupBase<'a> {
metadata: &'a RowGroupMetaData,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
@@ -244,13 +413,14 @@ impl<'a> InMemoryRowGroup<'a> {
&mut self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assign sparse chunk data to base.
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
@@ -268,7 +438,7 @@ impl<'a> InMemoryRowGroup<'a> {
}
// Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assigns fetched data to base.
self.base.assign_dense_chunk(projection, chunk_data);
@@ -279,31 +449,74 @@ impl<'a> InMemoryRowGroup<'a> {
/// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = metrics {
let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += ranges.len();
metrics_data.page_size_to_fetch_mem += total_size;
metrics_data.page_size_needed += total_size;
}
return Ok(pages.compressed.clone());
}
// Calculate total range size for metrics.
let (total_range_size, unaligned_size) = compute_total_range_size(ranges);
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => data,
let fetch_write_cache_start = metrics.map(|_| std::time::Instant::now());
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = metrics {
let elapsed = fetch_write_cache_start
.map(|start| start.elapsed())
.unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.write_cache_fetch_elapsed += elapsed;
metrics_data.write_cache_hit += 1;
metrics_data.pages_to_fetch_write_cache += ranges.len();
metrics_data.page_size_to_fetch_write_cache += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
let start = metrics.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.store_fetch_elapsed += elapsed;
metrics_data.cache_miss += 1;
metrics_data.pages_to_fetch_store += ranges.len();
metrics_data.page_size_to_fetch_store += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
data
}
};
// Put pages back to the cache.
let total_range_size = compute_total_range_size(ranges);
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
@@ -326,17 +539,21 @@ impl<'a> InMemoryRowGroup<'a> {
}
/// Computes the max possible buffer size to read the given `ranges`.
/// Returns (aligned_size, unaligned_size) where:
/// - aligned_size: total size aligned to pooled buffer size
/// - unaligned_size: actual total size without alignment
// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
fn compute_total_range_size(ranges: &[Range<u64>]) -> (u64, u64) {
if ranges.is_empty() {
return 0;
return (0, 0);
}
let gap = MERGE_GAP as u64;
let mut sorted_ranges = ranges.to_vec();
sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
let mut total_size = 0;
let mut total_size_aligned = 0;
let mut total_size_unaligned = 0;
let mut cur = sorted_ranges[0].clone();
for range in sorted_ranges.into_iter().skip(1) {
@@ -345,15 +562,19 @@ fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
cur.end = cur.end.max(range.end);
} else {
// No overlap and the gap is too large, add current range to total and start a new one
total_size += align_to_pooled_buf_size(cur.end - cur.start);
let range_size = cur.end - cur.start;
total_size_aligned += align_to_pooled_buf_size(range_size);
total_size_unaligned += range_size;
cur = range;
}
}
// Add the last range
total_size += align_to_pooled_buf_size(cur.end - cur.start);
let range_size = cur.end - cur.start;
total_size_aligned += align_to_pooled_buf_size(range_size);
total_size_unaligned += range_size;
total_size
(total_size_aligned, total_size_unaligned)
}
/// Aligns the given size to the multiple of the pooled buffer size.

View File

@@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::Result;
use crate::file_metadata::FileMetadata;
/// Metrics returned by `PuffinReader::dir` operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DirMetrics {
/// Whether this was a cache hit (true) or cache miss (false).
pub cache_hit: bool,
/// Size of the directory in bytes.
pub dir_size: u64,
}
/// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`.
#[async_trait]
pub trait PuffinManager {
@@ -106,9 +115,10 @@ pub trait PuffinReader {
/// Reads a directory from the Puffin file.
///
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata.
/// The returned tuple contains `GuardWithMetadata` and `DirMetrics`.
/// The `GuardWithMetadata` is used to access the directory data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the directory data.
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>;
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)>;
}
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.

View File

@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F>
@@ -130,10 +130,10 @@ where
Ok(GuardWithMetadata::new(blob, blob_metadata))
}
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)> {
let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let dir = self
let (dir, metrics) = self
.stager
.get_dir(
&self.handle,
@@ -153,7 +153,7 @@ where
)
.await?;
Ok(GuardWithMetadata::new(dir, blob_metadata))
Ok((GuardWithMetadata::new(dir, blob_metadata), metrics))
}
}

View File

@@ -23,7 +23,7 @@ use futures::AsyncWrite;
use futures::future::BoxFuture;
use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
@@ -72,14 +72,15 @@ pub trait Stager: Send + Sync {
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The returned tuple contains the `DirGuard` and `DirMetrics`.
/// The `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
) -> Result<(Self::Dir, DirMetrics)>;
/// Stores a directory in the staging area.
async fn put_dir(

View File

@@ -41,7 +41,7 @@ use crate::error::{
use crate::puffin_manager::stager::{
BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
};
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
@@ -203,7 +203,7 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
) -> Result<(Self::Dir, DirMetrics)> {
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
@@ -242,15 +242,22 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
.await
.context(CacheGetSnafu)?;
let dir_size = v.size();
if let Some(notifier) = self.notifier.as_ref() {
if miss {
notifier.on_cache_miss(v.size());
notifier.on_cache_miss(dir_size);
} else {
notifier.on_cache_hit(v.size());
notifier.on_cache_hit(dir_size);
}
}
let metrics = DirMetrics {
cache_hit: !miss,
dir_size,
};
match v {
CacheValue::Dir(guard) => Ok(guard),
CacheValue::Dir(guard) => Ok((guard, metrics)),
_ => unreachable!(),
}
}
@@ -882,7 +889,7 @@ mod tests {
let puffin_file_name = "test_get_dir".to_string();
let key = "key";
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
key,
@@ -901,6 +908,9 @@ mod tests {
.await
.unwrap();
assert!(!metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -974,7 +984,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1016,7 +1026,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world");
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1024,6 +1034,9 @@ mod tests {
)
.await
.unwrap();
assert!(metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -1151,7 +1164,7 @@ mod tests {
];
// First time to get the directory
let guard_0 = stager
let (guard_0, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1198,7 +1211,7 @@ mod tests {
);
// Second time to get the directory
let guard_1 = stager
let (guard_1, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1237,7 +1250,7 @@ mod tests {
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
let guard_2 = stager
let (guard_2, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1390,7 +1403,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,

View File

@@ -356,7 +356,7 @@ async fn check_dir(
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();
let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap();
let metadata = res_dir.metadata();
assert_eq!(
metadata.properties,