Compare commits

...

24 Commits

Author SHA1 Message Date
shuiyisong
59ddfa84ec fix: check and clippy
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-11-26 18:35:21 +08:00
evenyag
dd043eadc4 feat: add file_scan_cost
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7e6af2c7ee feat: collect the whole fetch time
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
87d3b17f4d feat: update parquet fetch metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5acac3d403 chore: fix compiler errors
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
f9c66ba0de feat: implement debug for new metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
37847a8df6 feat: debug print metrics in ScanMetricsSet
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
6e06ac9e5c feat: init verbose metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
09effc8128 feat: add fetch metrics to ReaderMetrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
c14728e3ae feat: collect more metrics for memory row group
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
cce4d56e00 feat: add apply metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
69cf13b33a feat: add parquet metadata metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
c83a282b39 feat: collect parquet row group metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5329efcdba feat: collect fulltext dir metrics for applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
50b5c90d53 feat: collect read metrics in appliers
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
fea2966dec feat: collect cache metrics for inverted and bloom index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
e00452c4db feat: collect metadata fetch metrics for inverted index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7a31b2a8ea refactor: rename elapsed to fetch_elapsed
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
f363d73f72 feat: add metrics for range_read and metadata
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
7a6befcad3 feat: collect read metrics for inverted index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
d6c75ec55f feat: implement BloomFilterReadMetrics for BloomFilterReader
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
5b8f1d819f feat: add metrics to fulltext index applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
b68286e8af feat: add metrics to bloom applier
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
evenyag
4519607bc6 feat: add inverted applier metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-11-26 18:20:06 +08:00
31 changed files with 1664 additions and 288 deletions

View File

@@ -21,7 +21,7 @@ use itertools::Itertools;
use crate::Bytes;
use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
@@ -38,7 +38,7 @@ pub struct BloomFilterApplier {
impl BloomFilterApplier {
pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
let meta = reader.metadata().await?;
let meta = reader.metadata(None).await?;
Ok(Self { reader, meta })
}
@@ -50,6 +50,7 @@ impl BloomFilterApplier {
&mut self,
predicates: &[InListPredicate],
search_ranges: &[Range<usize>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Range<usize>>> {
if predicates.is_empty() {
// If no predicates, return empty result
@@ -57,7 +58,7 @@ impl BloomFilterApplier {
}
let segments = self.row_ranges_to_segments(search_ranges);
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
Ok(intersect_ranges(search_ranges, &matching_row_ranges))
}
@@ -95,6 +96,7 @@ impl BloomFilterApplier {
async fn load_bloom_filters(
&mut self,
segments: &[usize],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
let segment_locations = segments
.iter()
@@ -108,7 +110,10 @@ impl BloomFilterApplier {
.map(|i| self.meta.bloom_filter_locs[i as usize])
.collect::<Vec<_>>();
let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
let bloom_filters = self
.reader
.bloom_filter_vec(&bloom_filter_locs, metrics)
.await?;
Ok((segment_locations, bloom_filters))
}
@@ -422,7 +427,10 @@ mod tests {
];
for (predicates, search_range, expected) in cases {
let result = applier.search(&predicates, &[search_range]).await.unwrap();
let result = applier
.search(&predicates, &[search_range], None)
.await
.unwrap();
assert_eq!(
result, expected,
"Expected {:?}, got {:?}",

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::ops::{Range, Rem};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use bytemuck::try_cast_slice;
@@ -34,6 +35,32 @@ const BLOOM_META_LEN_SIZE: u64 = 4;
/// Default prefetch size of bloom filter meta.
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
/// Metrics for bloom filter read operations.
#[derive(Debug, Default, Clone)]
pub struct BloomFilterReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl BloomFilterReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
/// Faster than chunking and converting each piece individually.
///
@@ -79,25 +106,52 @@ pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
#[async_trait]
pub trait BloomFilterReader: Sync {
/// Reads range of bytes from the file.
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes>;
/// Reads bunch of ranges from the file.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut results = Vec::with_capacity(ranges.len());
for range in ranges {
let size = (range.end - range.start) as u32;
let data = self.range_read(range.start, size).await?;
let data = self.range_read(range.start, size, None).await?;
results.push(data);
}
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(results)
}
/// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta>;
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta>;
/// Reads a bloom filter with the given location.
async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _).await?;
async fn bloom_filter(
&self,
loc: &BloomFilterLoc,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilter> {
let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
let vec = bytes_to_u64_vec(&bytes);
let bm = BloomFilter::from_vec(vec)
.seed(&SEED)
@@ -105,12 +159,16 @@ pub trait BloomFilterReader: Sync {
Ok(bm)
}
async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
async fn bloom_filter_vec(
&self,
locs: &[BloomFilterLoc],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<BloomFilter>> {
let ranges = locs
.iter()
.map(|l| l.offset..l.offset + l.size)
.collect::<Vec<_>>();
let bss = self.read_vec(&ranges).await?;
let bss = self.read_vec(&ranges, metrics).await?;
let mut result = Vec::with_capacity(bss.len());
for (bs, loc) in bss.into_iter().zip(locs.iter()) {
@@ -140,24 +198,59 @@ impl<R: RangeReader> BloomFilterReaderImpl<R> {
#[async_trait]
impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
self.reader
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self
.reader
.read(offset..offset + size as u64)
.await
.context(IoSnafu)
.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += 1;
m.total_bytes += size as u64;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.reader.read_vec(ranges).await.context(IoSnafu)
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
if let Some(m) = metrics {
m.total_ranges += ranges.len();
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result)
}
async fn metadata(&self) -> Result<BloomFilterMeta> {
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
let metadata = self.reader.metadata().await.context(IoSnafu)?;
let file_size = metadata.content_length;
let mut meta_reader =
BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
meta_reader.metadata().await
meta_reader.metadata(metrics).await
}
}
@@ -183,7 +276,10 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
///
/// It will first prefetch some bytes from the end of the file,
/// then parse the metadata from the prefetch bytes.
pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
pub async fn metadata(
&mut self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
ensure!(
self.file_size >= BLOOM_META_LEN_SIZE,
FileSizeTooSmallSnafu {
@@ -191,6 +287,7 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
}
);
let start = metrics.as_ref().map(|_| Instant::now());
let meta_start = self.file_size.saturating_sub(self.prefetch_size);
let suffix = self
.reader
@@ -208,8 +305,28 @@ impl<R: RangeReader> BloomFilterMetaReader<R> {
.read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
.await
.context(IoSnafu)?;
if let Some(m) = metrics {
// suffix read + meta read
m.total_ranges += 2;
// Ignores the meta length size to simplify the calculation.
m.total_bytes += self.file_size.min(self.prefetch_size) + length;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
} else {
if let Some(m) = metrics {
// suffix read only
m.total_ranges += 1;
m.total_bytes += self.file_size.min(self.prefetch_size);
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
@@ -290,7 +407,7 @@ mod tests {
for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
let mut reader =
BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
let meta = reader.metadata().await.unwrap();
let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.rows_per_segment, 2);
assert_eq!(meta.segment_count, 2);
@@ -312,11 +429,11 @@ mod tests {
let bytes = mock_bloom_filter_bytes().await;
let reader = BloomFilterReaderImpl::new(bytes);
let meta = reader.metadata().await.unwrap();
let meta = reader.metadata(None).await.unwrap();
assert_eq!(meta.bloom_filter_locs.len(), 2);
let bf = reader
.bloom_filter(&meta.bloom_filter_locs[0])
.bloom_filter(&meta.bloom_filter_locs[0], None)
.await
.unwrap();
assert!(bf.contains(&b"a"));
@@ -325,7 +442,7 @@ mod tests {
assert!(bf.contains(&b"d"));
let bf = reader
.bloom_filter(&meta.bloom_filter_locs[1])
.bloom_filter(&meta.bloom_filter_locs[1], None)
.await
.unwrap();
assert!(bf.contains(&b"e"));

View File

@@ -74,7 +74,7 @@ async fn test_search(
writer.finish().await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let (index_dir, _metrics) = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
for (query, expected) in query_expected {
let results = searcher.search(query).await.unwrap();

View File

@@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bytes::Bytes;
@@ -29,19 +30,59 @@ pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
mod blob;
mod footer;
/// Metrics for inverted index read operations.
#[derive(Debug, Default, Clone)]
pub struct InvertedIndexReadMetrics {
/// Total byte size to read.
pub total_bytes: u64,
/// Total number of ranges to read.
pub total_ranges: usize,
/// Elapsed time to fetch data.
pub fetch_elapsed: Duration,
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
}
impl InvertedIndexReadMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.total_bytes += other.total_bytes;
self.total_ranges += other.total_ranges;
self.fetch_elapsed += other.fetch_elapsed;
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
}
}
/// InvertedIndexReader defines an asynchronous reader of inverted index data
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader: Send + Sync {
/// Seeks to given offset and reads data with exact size as provided.
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>>;
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>>;
/// Reads the bytes in the given ranges.
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let mut metrics = metrics;
let mut result = Vec::with_capacity(ranges.len());
for range in ranges {
let data = self
.range_read(range.start, (range.end - range.start) as u32)
.range_read(
range.start,
(range.end - range.start) as u32,
metrics.as_deref_mut(),
)
.await?;
result.push(Bytes::from(data));
}
@@ -49,17 +90,29 @@ pub trait InvertedIndexReader: Send + Sync {
}
/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>>;
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>>;
/// Retrieves the finite state transducer (FST) map from the given offset and size.
async fn fst(&self, offset: u64, size: u32) -> Result<FstMap> {
let fst_data = self.range_read(offset, size).await?;
async fn fst<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<FstMap> {
let fst_data = self.range_read(offset, size, metrics).await?;
FstMap::new(fst_data).context(DecodeFstSnafu)
}
/// Retrieves the multiple finite state transducer (FST) maps from the given ranges.
async fn fst_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<FstMap>> {
self.read_vec(ranges)
async fn fst_vec<'a>(
&mut self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<FstMap>> {
self.read_vec(ranges, metrics)
.await?
.into_iter()
.map(|bytes| FstMap::new(bytes.to_vec()).context(DecodeFstSnafu))
@@ -67,19 +120,28 @@ pub trait InvertedIndexReader: Send + Sync {
}
/// Retrieves the bitmap from the given offset and size.
async fn bitmap(&self, offset: u64, size: u32, bitmap_type: BitmapType) -> Result<Bitmap> {
self.range_read(offset, size).await.and_then(|bytes| {
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
})
async fn bitmap<'a>(
&self,
offset: u64,
size: u32,
bitmap_type: BitmapType,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Bitmap> {
self.range_read(offset, size, metrics)
.await
.and_then(|bytes| {
Bitmap::deserialize_from(&bytes, bitmap_type).context(DecodeBitmapSnafu)
})
}
/// Retrieves the multiple bitmaps from the given ranges.
async fn bitmap_deque(
async fn bitmap_deque<'a>(
&mut self,
ranges: &[(Range<u64>, BitmapType)],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<VecDeque<Bitmap>> {
let (ranges, types): (Vec<_>, Vec<_>) = ranges.iter().cloned().unzip();
let bytes = self.read_vec(&ranges).await?;
let bytes = self.read_vec(&ranges, metrics).await?;
bytes
.into_iter()
.zip(types)

View File

@@ -14,6 +14,7 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use bytes::Bytes;
@@ -23,10 +24,10 @@ use snafu::{ResultExt, ensure};
use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
use crate::inverted_index::format::MIN_BLOB_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::footer::{
DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
};
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// Inverted index blob reader, implements [`InvertedIndexReader`]
pub struct InvertedIndexBlobReader<R> {
@@ -53,27 +54,58 @@ impl<R> InvertedIndexBlobReader<R> {
#[async_trait]
impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let buf = self
.source
.read(offset..offset + size as u64)
.await
.context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += size as u64;
m.total_ranges += 1;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(buf.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.source.read_vec(ranges).await.context(CommonIoSnafu)
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
if let Some(m) = metrics {
m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
m.total_ranges += ranges.len();
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
}
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;
Self::validate_blob_size(blob_size)?;
let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
.with_prefetch_size(DEFAULT_PREFETCH_SIZE);
footer_reader.metadata().await.map(Arc::new)
footer_reader.metadata(metrics).await.map(Arc::new)
}
}
@@ -173,7 +205,7 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 2);
let meta0 = metas.metas.get("tag0").unwrap();
@@ -200,13 +232,14 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let fst_map = blob_reader
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
None,
)
.await
.unwrap();
@@ -219,6 +252,7 @@ mod tests {
.fst(
meta.base_offset + meta.relative_fst_offset as u64,
meta.fst_size,
None,
)
.await
.unwrap();
@@ -232,30 +266,30 @@ mod tests {
let blob = create_inverted_index_blob();
let blob_reader = InvertedIndexBlobReader::new(blob);
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag0").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let metas = blob_reader.metadata().await.unwrap();
let metas = blob_reader.metadata(None).await.unwrap();
let meta = metas.metas.get("tag1").unwrap();
let bitmap = blob_reader
.bitmap(meta.base_offset, 26, BitmapType::Roaring)
.bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());
let bitmap = blob_reader
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
.bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
.await
.unwrap();
assert_eq!(bitmap, mock_bitmap());

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Instant;
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas};
use prost::Message;
@@ -23,6 +25,7 @@ use crate::inverted_index::error::{
UnexpectedZeroSegmentRowCountSnafu,
};
use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE;
use crate::inverted_index::format::reader::InvertedIndexReadMetrics;
pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
@@ -54,12 +57,17 @@ impl<R> InvertedIndexFooterReader<R> {
}
impl<R: RangeReader> InvertedIndexFooterReader<R> {
pub async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
pub async fn metadata(
&mut self,
mut metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<InvertedIndexMetas> {
ensure!(
self.blob_size >= FOOTER_PAYLOAD_SIZE_SIZE,
BlobSizeTooSmallSnafu
);
let start = metrics.as_ref().map(|_| Instant::now());
let footer_start = self.blob_size.saturating_sub(self.prefetch_size());
let suffix = self
.source
@@ -73,19 +81,36 @@ impl<R: RangeReader> InvertedIndexFooterReader<R> {
let footer_size = FOOTER_PAYLOAD_SIZE_SIZE;
// Did not fetch the entire file metadata in the initial read, need to make a second request.
if length > suffix_len as u64 - footer_size {
let result = if length > suffix_len as u64 - footer_size {
let metadata_start = self.blob_size - length - footer_size;
let meta = self
.source
.read(metadata_start..self.blob_size - footer_size)
.await
.context(CommonIoSnafu)?;
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size()) + length;
m.total_ranges += 2;
}
self.parse_payload(&meta, length)
} else {
if let Some(m) = metrics.as_deref_mut() {
m.total_bytes += self.blob_size.min(self.prefetch_size());
m.total_ranges += 1;
}
let metadata_start = self.blob_size - length - footer_size - footer_start;
let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize];
self.parse_payload(meta, length)
};
if let Some(m) = metrics {
m.fetch_elapsed += start.unwrap().elapsed();
}
result
}
fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
@@ -186,7 +211,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let metas = reader.metadata().await.unwrap();
let metas = reader.metadata(None).await.unwrap();
assert_eq!(metas.metas.len(), 1);
let index_meta = &metas.metas.get("test").unwrap();
assert_eq!(index_meta.name, "test");
@@ -210,7 +235,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let result = reader.metadata().await;
let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedFooterPayloadSize { .. }));
}
}
@@ -233,7 +258,7 @@ mod tests {
reader = reader.with_prefetch_size(prefetch);
}
let result = reader.metadata().await;
let result = reader.metadata(None).await;
assert_matches!(result, Err(Error::UnexpectedOffsetSize { .. }));
}
}

View File

@@ -122,7 +122,7 @@ mod tests {
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 0);
@@ -182,7 +182,7 @@ mod tests {
.unwrap();
let reader = InvertedIndexBlobReader::new(blob);
let metadata = reader.metadata().await.unwrap();
let metadata = reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
@@ -198,13 +198,19 @@ mod tests {
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -213,7 +219,12 @@ mod tests {
);
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -222,7 +233,12 @@ mod tests {
);
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -241,13 +257,19 @@ mod tests {
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -256,7 +278,12 @@ mod tests {
);
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -265,7 +292,12 @@ mod tests {
);
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(

View File

@@ -16,7 +16,7 @@ use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta};
use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// `ParallelFstValuesMapper` enables parallel mapping of multiple FST value groups to their
/// corresponding bitmaps within an inverted index.
@@ -35,7 +35,8 @@ impl<'a> ParallelFstValuesMapper<'a> {
pub async fn map_values_vec(
&mut self,
value_and_meta_vec: &[(Vec<u64>, &'a InvertedIndexMeta)],
value_and_meta_vec: &[(Vec<u64>, &InvertedIndexMeta)],
metrics: Option<&mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bitmap>> {
let groups = value_and_meta_vec
.iter()
@@ -64,7 +65,7 @@ impl<'a> ParallelFstValuesMapper<'a> {
}
common_telemetry::debug!("fetch ranges: {:?}", fetch_ranges);
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges).await?;
let mut bitmaps = self.reader.bitmap_deque(&fetch_ranges, metrics).await?;
let mut output = Vec::with_capacity(groups.len());
for counter in groups {
@@ -95,23 +96,25 @@ mod tests {
#[tokio::test]
async fn test_map_values_vec() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_bitmap_deque().returning(|ranges| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
mock_reader
.expect_bitmap_deque()
.returning(|ranges, _metrics| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b01010101], *bitmap_type))
}
_ => unreachable!(),
}
}
Ok(output)
});
Ok(output)
});
let meta = InvertedIndexMeta {
bitmap_type: BitmapType::Roaring.into(),
@@ -120,13 +123,13 @@ mod tests {
let mut values_mapper = ParallelFstValuesMapper::new(&mut mock_reader);
let result = values_mapper
.map_values_vec(&[(vec![], &meta)])
.map_values_vec(&[(vec![], &meta)], None)
.await
.unwrap();
assert_eq!(result[0].count_ones(), 0);
let result = values_mapper
.map_values_vec(&[(vec![value(1, 1)], &meta)])
.map_values_vec(&[(vec![value(1, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -135,7 +138,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta)])
.map_values_vec(&[(vec![value(2, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -144,7 +147,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)])
.map_values_vec(&[(vec![value(1, 1), value(2, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -153,7 +156,7 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)])
.map_values_vec(&[(vec![value(2, 1), value(1, 1)], &meta)], None)
.await
.unwrap();
assert_eq!(
@@ -162,7 +165,10 @@ mod tests {
);
let result = values_mapper
.map_values_vec(&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)])
.map_values_vec(
&[(vec![value(2, 1)], &meta), (vec![value(1, 1)], &meta)],
None,
)
.await
.unwrap();
assert_eq!(
@@ -174,10 +180,13 @@ mod tests {
Bitmap::from_lsb0_bytes(&[0b10101010], BitmapType::Roaring)
);
let result = values_mapper
.map_values_vec(&[
(vec![value(2, 1), value(1, 1)], &meta),
(vec![value(1, 1)], &meta),
])
.map_values_vec(
&[
(vec![value(2, 1), value(1, 1)], &meta),
(vec![value(1, 1)], &meta),
],
None,
)
.await
.unwrap();
assert_eq!(

View File

@@ -19,7 +19,7 @@ pub use predicates_apply::PredicatesIndexApplier;
use crate::bitmap::Bitmap;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
/// The output of an apply operation.
#[derive(Clone, Debug, PartialEq)]
@@ -44,10 +44,11 @@ pub trait IndexApplier: Send + Sync {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
#[allow(unused_parens)]
async fn apply<'a>(
async fn apply<'a, 'b>(
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput>;
/// Returns the memory usage of the applier.

View File

@@ -19,7 +19,7 @@ use greptime_proto::v1::index::InvertedIndexMetas;
use crate::bitmap::Bitmap;
use crate::inverted_index::error::{IndexNotFoundSnafu, Result};
use crate::inverted_index::format::reader::InvertedIndexReader;
use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use crate::inverted_index::search::fst_apply::{
FstApplier, IntersectionFstApplier, KeysFstApplier,
};
@@ -43,12 +43,14 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply<'a>(
async fn apply<'a, 'b>(
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
metrics: Option<&'b mut InvertedIndexReadMetrics>,
) -> Result<ApplyOutput> {
let metadata = reader.metadata().await?;
let mut metrics = metrics;
let metadata = reader.metadata(metrics.as_deref_mut()).await?;
let mut output = ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: metadata.total_row_count as _,
@@ -84,7 +86,7 @@ impl IndexApplier for PredicatesIndexApplier {
return Ok(output);
}
let fsts = reader.fst_vec(&fst_ranges).await?;
let fsts = reader.fst_vec(&fst_ranges, metrics.as_deref_mut()).await?;
let value_and_meta_vec = fsts
.into_iter()
.zip(appliers)
@@ -92,7 +94,7 @@ impl IndexApplier for PredicatesIndexApplier {
.collect::<Vec<_>>();
let mut mapper = ParallelFstValuesMapper::new(reader);
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec).await?;
let mut bm_vec = mapper.map_values_vec(&value_and_meta_vec, metrics).await?;
let mut bitmap = bm_vec.pop().unwrap(); // SAFETY: `fst_ranges` is not empty
for bm in bm_vec {
@@ -221,26 +223,28 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_ranges| {
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_ranges, _metrics| {
Ok(vec![
FstMap::from_iter([(b"tag-0_value-0", fst_value(2, 1))]).unwrap(),
])
});
mock_reader.expect_bitmap_deque().returning(|arg| {
assert_eq!(arg.len(), 1);
let range = &arg[0].0;
let bitmap_type = arg[0].1;
assert_eq!(*range, 2..3);
assert_eq!(bitmap_type, BitmapType::Roaring);
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
&[0b10101010],
bitmap_type,
)]))
});
mock_reader
.expect_bitmap_deque()
.returning(|arg, _metrics| {
assert_eq!(arg.len(), 1);
let range = &arg[0].0;
let bitmap_type = arg[0].1;
assert_eq!(*range, 2..3);
assert_eq!(bitmap_type, BitmapType::Roaring);
Ok(VecDeque::from([Bitmap::from_lsb0_bytes(
&[0b10101010],
bitmap_type,
)]))
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(
@@ -252,14 +256,14 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_range| {
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
mock_reader.expect_fst_vec().returning(|_range, _metrics| {
Ok(vec![
FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap(),
])
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(output.matched_segment_ids.count_ones(), 0);
@@ -279,8 +283,8 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
mock_reader.expect_fst_vec().returning(|ranges| {
.returning(|_| Ok(mock_metas([("tag-0", 0), ("tag-1", 1)])));
mock_reader.expect_fst_vec().returning(|ranges, _metrics| {
let mut output = vec![];
for range in ranges {
match range.start {
@@ -293,27 +297,29 @@ mod tests {
}
Ok(output)
});
mock_reader.expect_bitmap_deque().returning(|ranges| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
mock_reader
.expect_bitmap_deque()
.returning(|ranges, _metrics| {
let mut output = VecDeque::new();
for (range, bitmap_type) in ranges {
let offset = range.start;
let size = range.end - range.start;
match (offset, size, bitmap_type) {
(1, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b10101010], *bitmap_type))
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
}
(2, 1, BitmapType::Roaring) => {
output.push_back(Bitmap::from_lsb0_bytes(&[0b11011011], *bitmap_type))
}
_ => unreachable!(),
}
}
Ok(output)
});
Ok(output)
});
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(
@@ -331,10 +337,10 @@ mod tests {
let mut mock_reader: MockInvertedIndexReader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas([("tag-0", 0)])));
.returning(|_| Ok(mock_metas([("tag-0", 0)])));
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert_eq!(output.matched_segment_ids, Bitmap::full_bitvec(8)); // full range to scan
@@ -343,7 +349,7 @@ mod tests {
#[tokio::test]
async fn test_index_applier_with_empty_index() {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader.expect_metadata().returning(move || {
mock_reader.expect_metadata().returning(move |_| {
Ok(Arc::new(InvertedIndexMetas {
total_row_count: 0, // No rows
segment_row_count: 1,
@@ -359,7 +365,7 @@ mod tests {
};
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.apply(SearchContext::default(), &mut mock_reader, None)
.await
.unwrap();
assert!(output.matched_segment_ids.is_empty());
@@ -370,7 +376,7 @@ mod tests {
let mut mock_reader = MockInvertedIndexReader::new();
mock_reader
.expect_metadata()
.returning(|| Ok(mock_metas(vec![])));
.returning(|_| Ok(mock_metas(vec![])));
let mut mock_fst_applier = MockFstApplier::new();
mock_fst_applier.expect_apply().never();
@@ -385,6 +391,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ThrowError,
},
&mut mock_reader,
None,
)
.await;
assert!(matches!(result, Err(Error::IndexNotFound { .. })));
@@ -395,6 +402,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
},
&mut mock_reader,
None,
)
.await
.unwrap();
@@ -406,6 +414,7 @@ mod tests {
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
},
&mut mock_reader,
None,
)
.await
.unwrap();

View File

@@ -44,6 +44,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::RegionFileId;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Metrics type key for sst meta.
const SST_META_TYPE: &str = "sst_meta";
@@ -90,6 +91,32 @@ impl CacheStrategy {
}
}
/// Gets parquet metadata with cache metrics tracking.
/// Returns the metadata and updates the provided metrics.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data_with_metrics(file_id, metrics)
.await
}
CacheStrategy::Disabled => {
metrics.mem_cache_miss += 1;
metrics.file_cache_miss += 1;
None
}
}
}
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache(
&self,
@@ -317,6 +344,36 @@ impl CacheManager {
None
}
/// Gets cached [ParquetMetaData] with metrics tracking.
/// Tries in-memory cache first, then file cache, updating metrics accordingly.
pub(crate) async fn get_parquet_meta_data_with_metrics(
&self,
file_id: RegionFileId,
metrics: &mut MetadataCacheMetrics,
) -> Option<Arc<ParquetMetaData>> {
// Try to get metadata from sst meta cache
if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache_inner(file_id) {
metrics.mem_cache_hit += 1;
return Some(metadata);
}
metrics.mem_cache_miss += 1;
// Try to get metadata from write cache
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);
// Put metadata into sst meta cache
self.put_parquet_meta_data(file_id, metadata.clone());
return Some(metadata);
};
metrics.file_cache_miss += 1;
None
}
/// Gets cached [ParquetMetaData] from in-memory cache.
/// This method does not perform I/O.
pub fn get_parquet_meta_data_from_mem_cache(
@@ -330,6 +387,17 @@ impl CacheManager {
})
}
/// Gets cached [ParquetMetaData] from in-memory cache without updating global metrics.
/// This is used by `get_parquet_meta_data_with_metrics` to avoid double counting.
fn get_parquet_meta_data_from_mem_cache_inner(
&self,
file_id: RegionFileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()))
})
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
if let Some(cache) = &self.sst_meta_cache {

View File

@@ -31,6 +31,29 @@ const INDEX_METADATA_TYPE: &str = "index_metadata";
/// Metrics for index content.
const INDEX_CONTENT_TYPE: &str = "index_content";
/// Metrics collected from IndexCache operations.
#[derive(Debug, Default, Clone)]
pub struct IndexCacheMetrics {
/// Number of cache hits.
pub cache_hit: usize,
/// Number of cache misses.
pub cache_miss: usize,
/// Number of pages accessed.
pub num_pages: usize,
/// Total bytes from pages.
pub page_bytes: u64,
}
impl IndexCacheMetrics {
/// Merges another set of metrics into this one.
pub fn merge(&mut self, other: &Self) {
self.cache_hit += other.cache_hit;
self.cache_miss += other.cache_miss;
self.num_pages += other.num_pages;
self.page_bytes += other.page_bytes;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageKey {
page_id: u64,
@@ -160,18 +183,20 @@ where
offset: u64,
size: u32,
load: F,
) -> Result<Vec<u8>, E>
) -> Result<(Vec<u8>, IndexCacheMetrics), E>
where
F: Fn(Vec<Range<u64>>) -> Fut,
Fut: Future<Output = Result<Vec<Bytes>, E>>,
E: std::error::Error,
{
let mut metrics = IndexCacheMetrics::default();
let page_keys =
PageKey::generate_page_keys(offset, size, self.page_size).collect::<Vec<_>>();
// Size is 0, return empty data.
if page_keys.is_empty() {
return Ok(Vec::new());
return Ok((Vec::new(), metrics));
}
metrics.num_pages = page_keys.len();
let mut data = Vec::with_capacity(page_keys.len());
data.resize(page_keys.len(), Bytes::new());
let mut cache_miss_range = vec![];
@@ -182,10 +207,13 @@ where
match self.get_page(key, *page_key) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_hit += 1;
metrics.page_bytes += page.len() as u64;
data[i] = page;
}
None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
metrics.cache_miss += 1;
let base_offset = page_key.page_id * self.page_size;
let pruned_size = if i == last_index {
prune_size(page_keys.iter(), file_size, self.page_size)
@@ -201,14 +229,18 @@ where
let pages = load(cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page_key = page_keys[i];
metrics.page_bytes += page.len() as u64;
data[i] = page.clone();
self.put_page(key, page_key, page.clone());
}
}
let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec())
Ok((
buffer
.slice(PageKey::calculate_range(offset, size, self.page_size))
.to_vec(),
metrics,
))
}
fn get_page(&self, key: K, page_key: PageKey) -> Option<Bytes> {

View File

@@ -14,12 +14,13 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
use async_trait::async_trait;
use bytes::Bytes;
use index::bloom_filter::error::Result;
use index::bloom_filter::reader::BloomFilterReader;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
use store_api::storage::{ColumnId, FileId};
use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
@@ -114,51 +115,93 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
#[async_trait]
impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
async fn range_read(
&self,
offset: u64,
size: u32,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Bytes> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
self.cache
let (result, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.map(|b| b.into())
.await?;
if let Some(m) = metrics {
m.total_ranges += cache_metrics.num_pages;
m.total_bytes += cache_metrics.page_bytes;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(result.into())
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec(
&self,
ranges: &[Range<u64>],
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
(self.file_id, self.column_id, self.tag),
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_ranges += total_cache_metrics.num_pages;
m.total_bytes += total_cache_metrics.page_bytes;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
if let Some(start) = start {
m.fetch_elapsed += start.elapsed();
}
}
Ok(pages)
}
/// Reads the meta information of the bloom filter.
async fn metadata(&self) -> Result<BloomFilterMeta> {
async fn metadata(
&self,
metrics: Option<&mut BloomFilterReadMetrics>,
) -> Result<BloomFilterMeta> {
if let Some(cached) = self
.cache
.get_metadata((self.file_id, self.column_id, self.tag))
{
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok((*cached).clone())
} else {
let meta = self.inner.metadata().await?;
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(
(self.file_id, self.column_id, self.tag),
Arc::new(meta.clone()),

View File

@@ -14,12 +14,13 @@
use core::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait;
use bytes::Bytes;
use index::inverted_index::error::Result;
use index::inverted_index::format::reader::InvertedIndexReader;
use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
use prost::Message;
use store_api::storage::FileId;
@@ -83,46 +84,86 @@ impl<R> CachedInvertedIndexBlobReader<R> {
#[async_trait]
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
async fn range_read<'a>(
&self,
offset: u64,
size: u32,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<u8>> {
let start = metrics.as_ref().map(|_| Instant::now());
let inner = &self.inner;
self.cache
let (result, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.await?;
if let Some(m) = metrics {
m.total_bytes += cache_metrics.page_bytes;
m.total_ranges += cache_metrics.num_pages;
m.cache_hit += cache_metrics.cache_hit;
m.cache_miss += cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(result)
}
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn read_vec<'a>(
&self,
ranges: &[Range<u64>],
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Vec<Bytes>> {
let start = metrics.as_ref().map(|_| Instant::now());
let mut pages = Vec::with_capacity(ranges.len());
let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
for range in ranges {
let inner = &self.inner;
let page = self
let (page, cache_metrics) = self
.cache
.get_or_load(
self.file_id,
self.blob_size,
range.start,
(range.end - range.start) as u32,
move |ranges| async move { inner.read_vec(&ranges).await },
move |ranges| async move { inner.read_vec(&ranges, None).await },
)
.await?;
total_cache_metrics.merge(&cache_metrics);
pages.push(Bytes::from(page));
}
if let Some(m) = metrics {
m.total_bytes += total_cache_metrics.page_bytes;
m.total_ranges += total_cache_metrics.num_pages;
m.cache_hit += total_cache_metrics.cache_hit;
m.cache_miss += total_cache_metrics.cache_miss;
m.fetch_elapsed += start.unwrap().elapsed();
}
Ok(pages)
}
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
async fn metadata<'a>(
&self,
metrics: Option<&'a mut InvertedIndexReadMetrics>,
) -> Result<Arc<InvertedIndexMetas>> {
if let Some(cached) = self.cache.get_metadata(self.file_id) {
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
if let Some(m) = metrics {
m.cache_hit += 1;
}
Ok(cached)
} else {
let meta = self.inner.metadata().await?;
let meta = self.inner.metadata(metrics).await?;
self.cache.put_metadata(self.file_id, meta.clone());
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
Ok(meta)
@@ -277,7 +318,7 @@ mod test {
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
);
let metadata = cached_reader.metadata().await.unwrap();
let metadata = cached_reader.metadata(None).await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
@@ -292,13 +333,19 @@ mod test {
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -307,7 +354,12 @@ mod test {
);
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -316,7 +368,12 @@ mod test {
);
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag0.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -335,13 +392,19 @@ mod test {
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
None,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -350,7 +413,12 @@ mod test {
);
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -359,7 +427,12 @@ mod test {
);
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
.bitmap(
tag1.base_offset + offset as u64,
size,
BitmapType::Roaring,
None,
)
.await
.unwrap();
assert_eq!(
@@ -372,16 +445,16 @@ mod test {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.random_range(0..file_size);
let size = rng.random_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap();
let expected = cached_reader.range_read(offset, size, None).await.unwrap();
let inner = &cached_reader.inner;
let read = cached_reader
let (read, _cache_metrics) = cached_reader
.cache
.get_or_load(
cached_reader.file_id,
file_size,
offset,
size,
|ranges| async move { inner.read_vec(&ranges).await },
|ranges| async move { inner.read_vec(&ranges, None).await },
)
.await
.unwrap();

View File

@@ -233,7 +233,7 @@ async fn collect_inverted_entries(
InvertedIndexBlobReader::new(blob_reader),
cache.clone(),
);
match reader.metadata().await {
match reader.metadata(None).await {
Ok(metas) => metas,
Err(err) => {
warn!(
@@ -247,7 +247,7 @@ async fn collect_inverted_entries(
}
} else {
let reader = InvertedIndexBlobReader::new(blob_reader);
match reader.metadata().await {
match reader.metadata(None).await {
Ok(metas) => metas,
Err(err) => {
warn!(
@@ -318,10 +318,10 @@ async fn try_read_bloom_meta(
bloom_reader,
cache.clone(),
)
.metadata()
.metadata(None)
.await
}
_ => bloom_reader.metadata().await,
_ => bloom_reader.metadata(None).await,
};
match result {

View File

@@ -41,10 +41,14 @@ use crate::read::range::{RangeBuilderList, RangeMeta, RowGroupIndex};
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics;
use crate::sst::index::inverted_index::applier::InvertedIndexApplyMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::flat_format::time_index_column_index;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Verbose scan metrics for a partition.
#[derive(Default)]
@@ -81,6 +85,8 @@ pub(crate) struct ScanMetricsSet {
// SST related metrics:
/// Duration to build file ranges.
build_parts_cost: Duration,
/// Duration to scan SST files.
file_scan_cost: Duration,
/// Number of row groups before filtering.
rg_total: usize,
/// Number of row groups filtered by fulltext index.
@@ -126,6 +132,18 @@ pub(crate) struct ScanMetricsSet {
/// The stream reached EOF
stream_eof: bool,
// Optional verbose metrics:
/// Inverted index apply metrics.
inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Bloom filter index apply metrics.
bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Fulltext index apply metrics.
fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
/// Parquet fetch metrics.
fetch_metrics: Option<ParquetFetchMetrics>,
/// Metadata cache metrics.
metadata_cache_metrics: Option<MetadataCacheMetrics>,
}
impl fmt::Debug for ScanMetricsSet {
@@ -141,6 +159,7 @@ impl fmt::Debug for ScanMetricsSet {
num_mem_ranges,
num_file_ranges,
build_parts_cost,
file_scan_cost,
rg_total,
rg_fulltext_filtered,
rg_inverted_filtered,
@@ -166,6 +185,11 @@ impl fmt::Debug for ScanMetricsSet {
mem_rows,
mem_batches,
mem_series,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
fetch_metrics,
metadata_cache_metrics,
} = self;
// Write core metrics
@@ -181,6 +205,7 @@ impl fmt::Debug for ScanMetricsSet {
\"num_mem_ranges\":{num_mem_ranges}, \
\"num_file_ranges\":{num_file_ranges}, \
\"build_parts_cost\":\"{build_parts_cost:?}\", \
\"file_scan_cost\":\"{file_scan_cost:?}\", \
\"rg_total\":{rg_total}, \
\"rows_before_filter\":{rows_before_filter}, \
\"num_sst_record_batches\":{num_sst_record_batches}, \
@@ -255,6 +280,23 @@ impl fmt::Debug for ScanMetricsSet {
write!(f, ", \"mem_scan_cost\":\"{mem_scan_cost:?}\"")?;
}
// Write optional verbose metrics
if let Some(metrics) = inverted_index_apply_metrics {
write!(f, ", \"inverted_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = bloom_filter_apply_metrics {
write!(f, ", \"bloom_filter_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fulltext_index_apply_metrics {
write!(f, ", \"fulltext_index_apply_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = fetch_metrics {
write!(f, ", \"fetch_metrics\":{:?}", metrics)?;
}
if let Some(metrics) = metadata_cache_metrics {
write!(f, ", \"metadata_cache_metrics\":{:?}", metrics)?;
}
write!(f, ", \"stream_eof\":{stream_eof}}}")
}
}
@@ -304,14 +346,20 @@ impl ScanMetricsSet {
rows_inverted_filtered,
rows_bloom_filtered,
rows_precise_filtered,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
},
num_record_batches,
num_batches,
num_rows,
scan_cost: _,
scan_cost,
metadata_cache_metrics,
fetch_metrics,
} = other;
self.build_parts_cost += *build_cost;
self.file_scan_cost += *scan_cost;
self.rg_total += *rg_total;
self.rg_fulltext_filtered += *rg_fulltext_filtered;
@@ -328,6 +376,31 @@ impl ScanMetricsSet {
self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows;
// Merge optional verbose metrics
if let Some(metrics) = inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(InvertedIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(BloomFilterIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(FulltextIndexApplyMetrics::default)
.merge_from(metrics);
}
if let Some(metrics) = fetch_metrics {
self.fetch_metrics
.get_or_insert_with(ParquetFetchMetrics::default)
.merge_from(metrics);
}
self.metadata_cache_metrics
.get_or_insert_with(MetadataCacheMetrics::default)
.merge_from(metadata_cache_metrics);
}
/// Sets distributor metrics.
@@ -615,6 +688,11 @@ impl PartitionMetrics {
let mut metrics_set = self.0.metrics.lock().unwrap();
metrics_set.set_distributor_metrics(metrics);
}
/// Returns whether verbose explain is enabled.
pub(crate) fn explain_verbose(&self) -> bool {
self.0.explain_verbose
}
}
impl fmt::Debug for PartitionMetrics {
@@ -768,6 +846,21 @@ fn can_split_series(num_rows: u64, num_series: u64) -> bool {
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
}
/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
/// based on the `explain_verbose` flag.
fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
if explain_verbose {
ReaderFilterMetrics {
inverted_index_apply_metrics: Some(InvertedIndexApplyMetrics::default()),
bloom_filter_apply_metrics: Some(BloomFilterIndexApplyMetrics::default()),
fulltext_index_apply_metrics: Some(FulltextIndexApplyMetrics::default()),
..Default::default()
}
} else {
ReaderFilterMetrics::default()
}
}
/// Scans file ranges at `index`.
pub(crate) async fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
@@ -776,7 +869,10 @@ pub(crate) async fn scan_file_ranges(
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<Batch>>> {
let mut reader_metrics = ReaderMetrics::default();
let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
@@ -799,7 +895,10 @@ pub(crate) async fn scan_flat_file_ranges(
read_type: &'static str,
range_builder: Arc<RangeBuilderList>,
) -> Result<impl Stream<Item = Result<RecordBatch>>> {
let mut reader_metrics = ReaderMetrics::default();
let mut reader_metrics = ReaderMetrics {
filter_metrics: new_filter_metrics(part_metrics.explain_verbose()),
..Default::default()
};
let ranges = range_builder
.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics)
.await?;
@@ -822,10 +921,18 @@ pub fn build_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
let reader = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
@@ -857,10 +964,18 @@ pub fn build_flat_file_range_scan_stream(
ranges: SmallVec<[FileRange; 2]>,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
let reader_metrics = &mut ReaderMetrics::default();
let fetch_metrics = if part_metrics.explain_verbose() {
Some(Arc::new(ParquetFetchMetrics::default()))
} else {
None
};
let reader_metrics = &mut ReaderMetrics {
fetch_metrics: fetch_metrics.clone(),
..Default::default()
};
for range in ranges {
let build_reader_start = Instant::now();
let mut reader = range.flat_reader().await?;
let mut reader = range.flat_reader(fetch_metrics.as_deref()).await?;
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);

View File

@@ -17,11 +17,14 @@ mod builder;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use index::bloom_filter::reader::{
BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
};
use index::target::IndexTarget;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
@@ -47,6 +50,56 @@ use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Default, Clone)]
pub struct BloomFilterIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for bloom filter read operations.
pub read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
}
write!(f, "}}")
}
}
impl BloomFilterIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.read_metrics.merge_from(&other.read_metrics);
}
}
pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
@@ -133,15 +186,20 @@ impl BloomFilterIndexApplier {
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.start_timer();
let apply_start = Instant::now();
// Calculates row groups' ranges based on start of the file.
let mut input = Vec::with_capacity(row_groups.size_hint().0);
@@ -163,7 +221,7 @@ impl BloomFilterIndexApplier {
for (column_id, predicates) in self.predicates.iter() {
let blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
.await?
{
Some(blob) => blob,
@@ -173,6 +231,9 @@ impl BloomFilterIndexApplier {
// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
if let Some(m) = &mut metrics {
m.blob_read_bytes += blob_size;
}
let reader = CachedBloomFilterIndexBlobReader::new(
file_id.file_id(),
*column_id,
@@ -181,12 +242,12 @@ impl BloomFilterIndexApplier {
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
} else {
let reader = BloomFilterReaderImpl::new(blob);
self.apply_predicates(reader, predicates, &mut output)
self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
@@ -201,6 +262,16 @@ impl BloomFilterIndexApplier {
}
}
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(output)
}
@@ -212,6 +283,7 @@ impl BloomFilterIndexApplier {
file_id: RegionFileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> Result<Option<BlobReader>> {
let reader = match self
.cached_blob_reader(file_id, column_id, file_size_hint)
@@ -219,6 +291,9 @@ impl BloomFilterIndexApplier {
{
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Some(m) = metrics {
m.blob_cache_miss += 1;
}
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
@@ -320,6 +395,7 @@ impl BloomFilterIndexApplier {
reader: R,
predicates: &[InListPredicate],
output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
) -> std::result::Result<(), index::bloom_filter::error::Error> {
let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
@@ -329,7 +405,10 @@ impl BloomFilterIndexApplier {
continue;
}
*row_group_output = applier.search(predicates, row_group_output).await?;
let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
*row_group_output = applier
.search(predicates, row_group_output, read_metrics)
.await?;
}
Ok(())
@@ -393,7 +472,7 @@ mod tests {
let applier = builder.build(&exprs).unwrap().unwrap();
applier
.apply(file_id, None, row_groups.into_iter())
.apply(file_id, None, row_groups.into_iter(), None)
.await
.unwrap()
.into_iter()

View File

@@ -637,17 +637,17 @@ pub(crate) mod tests {
.unwrap();
let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap();
let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 10);
for i in 0..5 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag1"));
}
for i in 5..10 {
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
assert!(bf.contains(b"tag2"));
}
}
@@ -662,13 +662,13 @@ pub(crate) mod tests {
.unwrap();
let reader = blob_guard.reader().await.unwrap();
let bloom_filter = BloomFilterReaderImpl::new(reader);
let metadata = bloom_filter.metadata().await.unwrap();
let metadata = bloom_filter.metadata(None).await.unwrap();
assert_eq!(metadata.segment_count, 5);
for i in 0u64..20 {
let idx = i as usize / 4;
let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
let bf = bloom_filter.bloom_filter(loc).await.unwrap();
let bf = bloom_filter.bloom_filter(loc, None).await.unwrap();
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
.unwrap();

View File

@@ -16,11 +16,12 @@ use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::iter;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
use index::bloom_filter::reader::BloomFilterReaderImpl;
use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReaderImpl};
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::tokenizer::{ChineseTokenizer, EnglishTokenizer, Tokenizer};
use index::fulltext_index::{Analyzer, Config};
@@ -53,6 +54,91 @@ use crate::sst::index::puffin_manager::{
pub mod builder;
/// Metrics for tracking fulltext index apply operations.
#[derive(Default, Clone)]
pub struct FulltextIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses.
pub blob_cache_miss: usize,
/// Number of directory cache hits.
pub dir_cache_hit: usize,
/// Number of directory cache misses.
pub dir_cache_miss: usize,
/// Elapsed time to initialize directory data.
pub dir_init_elapsed: std::time::Duration,
/// Metrics for bloom filter reads.
pub bloom_filter_read_metrics: BloomFilterReadMetrics,
}
impl std::fmt::Debug for FulltextIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.dir_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_hit\":{}", self.dir_cache_hit)?;
first = false;
}
if self.dir_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_cache_miss\":{}", self.dir_cache_miss)?;
first = false;
}
if !self.dir_init_elapsed.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(f, "\"dir_init_elapsed\":\"{:?}\"", self.dir_init_elapsed)?;
}
write!(f, "}}")
}
}
impl FulltextIndexApplyMetrics {
/// Collects metrics from a directory read operation.
pub fn collect_dir_metrics(
&mut self,
elapsed: std::time::Duration,
dir_metrics: puffin::puffin_manager::DirMetrics,
) {
self.dir_init_elapsed += elapsed;
if dir_metrics.cache_hit {
self.dir_cache_hit += 1;
} else {
self.dir_cache_miss += 1;
}
}
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.dir_cache_hit += other.dir_cache_hit;
self.dir_cache_miss += other.dir_cache_miss;
self.dir_init_elapsed += other.dir_init_elapsed;
self.bloom_filter_read_metrics
.merge_from(&other.bloom_filter_read_metrics);
}
}
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier {
/// Requests to be applied.
@@ -124,14 +210,18 @@ impl FulltextIndexApplier {
impl FulltextIndexApplier {
/// Applies fine-grained fulltext index to the specified SST file.
/// Returns the row ids that match the queries.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_fine(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
let timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let apply_start = Instant::now();
let mut row_ids: Option<BTreeSet<RowId>> = None;
for (column_id, request) in self.requests.iter() {
@@ -140,7 +230,13 @@ impl FulltextIndexApplier {
}
let Some(result) = self
.apply_fine_one_column(file_size_hint, file_id, *column_id, request)
.apply_fine_one_column(
file_size_hint,
file_id,
*column_id,
request,
metrics.as_deref_mut(),
)
.await?
else {
continue;
@@ -159,9 +255,16 @@ impl FulltextIndexApplier {
}
}
if row_ids.is_none() {
timer.stop_and_discard();
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(row_ids)
}
@@ -171,6 +274,7 @@ impl FulltextIndexApplier {
file_id: RegionFileId,
column_id: ColumnId,
request: &FulltextRequest,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<BTreeSet<RowId>>> {
let blob_key = format!(
"{INDEX_BLOB_TYPE_TANTIVY}-{}",
@@ -178,7 +282,7 @@ impl FulltextIndexApplier {
);
let dir = self
.index_source
.dir(file_id, &blob_key, file_size_hint)
.dir(file_id, &blob_key, file_size_hint, metrics)
.await?;
let dir = match &dir {
@@ -240,15 +344,20 @@ impl FulltextIndexApplier {
///
/// Row group id existing in the returned result means that the row group is searched.
/// Empty ranges means that the row group is searched but no rows are found.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `row_groups` - Iterator of row group lengths and whether to search in the row group
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply_coarse(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
row_groups: impl Iterator<Item = (usize, bool)>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<Vec<(usize, Vec<Range<usize>>)>>> {
let timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let apply_start = Instant::now();
let (input, mut output) = Self::init_coarse_output(row_groups);
let mut applied = false;
@@ -266,16 +375,27 @@ impl FulltextIndexApplier {
*column_id,
&request.terms,
&mut output,
metrics.as_deref_mut(),
)
.await?;
}
if !applied {
timer.stop_and_discard();
return Ok(None);
}
Self::adjust_coarse_output(input, &mut output);
// Record elapsed time to histogram and collect metrics if requested
let elapsed = apply_start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(m) = metrics {
m.apply_elapsed += elapsed;
}
Ok(Some(output))
}
@@ -286,6 +406,7 @@ impl FulltextIndexApplier {
column_id: ColumnId,
terms: &[FulltextTerm],
output: &mut [(usize, Vec<Range<usize>>)],
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<bool> {
let blob_key = format!(
"{INDEX_BLOB_TYPE_BLOOM}-{}",
@@ -293,7 +414,7 @@ impl FulltextIndexApplier {
);
let Some(reader) = self
.index_source
.blob(file_id, &blob_key, file_size_hint)
.blob(file_id, &blob_key, file_size_hint, metrics.as_deref_mut())
.await?
else {
return Ok(false);
@@ -336,7 +457,13 @@ impl FulltextIndexApplier {
}
*row_group_output = applier
.search(&predicates, row_group_output)
.search(
&predicates,
row_group_output,
metrics
.as_deref_mut()
.map(|m| &mut m.bloom_filter_read_metrics),
)
.await
.context(ApplyBloomFilterIndexSnafu)?;
}
@@ -483,8 +610,15 @@ impl IndexSource {
file_id: RegionFileId,
key: &str,
file_size_hint: Option<u64>,
metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = metrics {
m.blob_cache_miss += 1;
}
let res = reader.blob(key).await;
match res {
Ok(blob) => Ok(Some(blob)),
@@ -514,11 +648,25 @@ impl IndexSource {
file_id: RegionFileId,
key: &str,
file_size_hint: Option<u64>,
mut metrics: Option<&mut FulltextIndexApplyMetrics>,
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
// Track cache miss if fallbacked to remote
if fallbacked && let Some(m) = &mut metrics {
m.blob_cache_miss += 1;
}
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => {
if fallbacked {
@@ -526,9 +674,16 @@ impl IndexSource {
} else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?;
let start = metrics.as_ref().map(|_| Instant::now());
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Ok((dir, dir_metrics)) => {
if let Some(m) = metrics {
// Safety: start is Some when metrics is Some
m.collect_dir_metrics(start.unwrap().elapsed(), dir_metrics);
}
Ok(Some(dir))
}
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}

View File

@@ -723,15 +723,16 @@ mod tests {
let backend = backend.clone();
async move {
match backend {
FulltextBackend::Tantivy => {
applier.apply_fine(region_file_id, None).await.unwrap()
}
FulltextBackend::Tantivy => applier
.apply_fine(region_file_id, None, None)
.await
.unwrap(),
FulltextBackend::Bloom => {
let coarse_mask = coarse_mask.unwrap_or_default();
let row_groups = (0..coarse_mask.len()).map(|i| (1, coarse_mask[i]));
// row group id == row id
let resp = applier
.apply_coarse(region_file_id, None, row_groups)
.apply_coarse(region_file_id, None, row_groups, None)
.await
.unwrap();
resp.map(|r| {

View File

@@ -16,10 +16,11 @@ pub mod builder;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Instant;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReadMetrics};
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
@@ -44,6 +45,57 @@ use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
/// Metrics for tracking inverted index apply operations.
#[derive(Default, Clone)]
pub struct InvertedIndexApplyMetrics {
/// Total time spent applying the index.
pub apply_elapsed: std::time::Duration,
/// Number of blob cache misses (0 or 1).
pub blob_cache_miss: usize,
/// Total size of blobs read (in bytes).
pub blob_read_bytes: u64,
/// Metrics for inverted index reads.
pub inverted_index_read_metrics: InvertedIndexReadMetrics,
}
impl std::fmt::Debug for InvertedIndexApplyMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if !self.apply_elapsed.is_zero() {
write!(f, "\"apply_elapsed\":\"{:?}\"", self.apply_elapsed)?;
first = false;
}
if self.blob_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_cache_miss\":{}", self.blob_cache_miss)?;
first = false;
}
if self.blob_read_bytes > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"blob_read_bytes\":{}", self.blob_read_bytes)?;
}
write!(f, "}}")
}
}
impl InvertedIndexApplyMetrics {
/// Merges another metrics into this one.
pub fn merge_from(&mut self, other: &Self) {
self.apply_elapsed += other.apply_elapsed;
self.blob_cache_miss += other.blob_cache_miss;
self.blob_read_bytes += other.blob_read_bytes;
self.inverted_index_read_metrics
.merge_from(&other.inverted_index_read_metrics);
}
}
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub(crate) struct InvertedIndexApplier {
@@ -124,24 +176,30 @@ impl InvertedIndexApplier {
self
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
/// Applies predicates to the provided SST file id and returns the relevant row group ids.
///
/// # Arguments
/// * `file_id` - The region file ID to apply predicates to
/// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
/// * `metrics` - Optional mutable reference to collect metrics on demand
pub async fn apply(
&self,
file_id: RegionFileId,
file_size_hint: Option<u64>,
mut metrics: Option<&mut InvertedIndexApplyMetrics>,
) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
let start = Instant::now();
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
let mut cache_miss = 0;
let blob = match self.cached_blob_reader(file_id, file_size_hint).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
cache_miss += 1;
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
@@ -149,8 +207,9 @@ impl InvertedIndexApplier {
}
};
if let Some(index_cache) = &self.inverted_index_cache {
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let result = if let Some(index_cache) = &self.inverted_index_cache {
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id.file_id(),
blob_size,
@@ -158,16 +217,42 @@ impl InvertedIndexApplier {
index_cache.clone(),
);
self.index_applier
.apply(context, &mut index_reader)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
} else {
let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier
.apply(context, &mut index_reader)
.apply(
context,
&mut index_reader,
metrics
.as_deref_mut()
.map(|m| &mut m.inverted_index_read_metrics),
)
.await
.context(ApplyInvertedIndexSnafu)
};
// Record elapsed time to histogram and collect metrics if requested
let elapsed = start.elapsed();
INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.observe(elapsed.as_secs_f64());
if let Some(metrics) = metrics {
metrics.apply_elapsed = elapsed;
metrics.blob_cache_miss = cache_miss;
metrics.blob_read_bytes = blob_size;
}
result
}
/// Creates a blob reader from the cached index file.
@@ -281,7 +366,7 @@ mod tests {
let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().returning(|_, _| {
mock_index_applier.expect_apply().returning(|_, _, _| {
Ok(ApplyOutput {
matched_segment_ids: Bitmap::new_bitvec(),
total_row_count: 100,
@@ -297,7 +382,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let output = sst_index_applier.apply(file_id, None).await.unwrap();
let output = sst_index_applier.apply(file_id, None, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -345,7 +430,7 @@ mod tests {
puffin_manager_factory,
Default::default(),
);
let res = sst_index_applier.apply(file_id, None).await;
let res = sst_index_applier.apply(file_id, None, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -615,7 +615,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id, None)
.apply(sst_file_id, None, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -245,7 +245,7 @@ mod tests {
let bs = blob_reader.read(0..meta.content_length).await.unwrap();
assert_eq!(&*bs, raw_data);
let dir_guard = reader.dir(dir_key).await.unwrap();
let (dir_guard, _metrics) = reader.dir(dir_key).await.unwrap();
let file = dir_guard.path().join("hello");
let data = tokio::fs::read(file).await.unwrap();
assert_eq!(data, raw_data);

View File

@@ -45,6 +45,7 @@ use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{
FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
};
use crate::sst::parquet::row_group::ParquetFetchMetrics;
/// Checks if a row group contains delete operations by examining the min value of op_type column.
///
@@ -117,11 +118,16 @@ impl FileRange {
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<PruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?;
let use_last_row_reader = if selector
@@ -168,11 +174,18 @@ impl FileRange {
}
/// Creates a flat reader that returns RecordBatch.
pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
pub(crate) async fn flat_reader(
&self,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<FlatPruneReader> {
let parquet_reader = self
.context
.reader_builder
.build(self.row_group_idx, self.row_selection.clone())
.build(
self.row_group_idx,
self.row_selection.clone(),
fetch_metrics,
)
.await?;
// Compute skip_fields once for this row group

View File

@@ -52,15 +52,21 @@ use crate::metrics::{
use crate::read::prune::{PruneReader, Source};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierRef, BloomFilterIndexApplyMetrics,
};
use crate::sst::index::fulltext_index::applier::{
FulltextIndexApplierRef, FulltextIndexApplyMetrics,
};
use crate::sst::index::inverted_index::applier::{
InvertedIndexApplierRef, InvertedIndexApplyMetrics,
};
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -253,7 +259,9 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?;
let parquet_meta = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata.
let key_value_meta = parquet_meta.file_metadata().key_value_metadata();
// Gets the metadata stored in the SST.
@@ -378,25 +386,34 @@ impl ParquetReaderBuilder {
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<Arc<ParquetMetaData>> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
.start_timer();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(metadata) = self.cache_strategy.get_parquet_meta_data(file_id).await {
// Tries to get from cache with metrics tracking.
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data_with_metrics(file_id, cache_metrics)
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok(metadata);
}
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
self.cache_strategy
.put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok(metadata)
}
@@ -527,7 +544,11 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
.apply_fine(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(Some(res)) => {
@@ -595,13 +616,17 @@ impl ParquetReaderBuilder {
// Slow path: apply the index from the file.
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint))
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
metrics.inverted_index_apply_metrics.as_mut(),
)
.await;
let selection = match apply_res {
Ok(output) => RowGroupSelection::from_inverted_index_apply_output(
Ok(apply_output) => RowGroupSelection::from_inverted_index_apply_output(
row_group_size,
num_row_groups,
output,
apply_output,
),
Err(err) => {
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
@@ -670,7 +695,12 @@ impl ParquetReaderBuilder {
)
});
let apply_res = index_applier
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
.apply(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.bloom_filter_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(apply_output) => {
@@ -748,7 +778,12 @@ impl ParquetReaderBuilder {
)
});
let apply_res = index_applier
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
.apply_coarse(
self.file_handle.file_id(),
Some(file_size_hint),
rgs,
metrics.fulltext_index_apply_metrics.as_mut(),
)
.await;
let mut selection = match apply_res {
Ok(Some(apply_output)) => {
@@ -892,7 +927,7 @@ fn all_required_row_groups_searched(
}
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) rg_total: usize,
@@ -915,6 +950,13 @@ pub(crate) struct ReaderFilterMetrics {
pub(crate) rows_bloom_filtered: usize,
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Optional metrics for bloom filter index applier.
pub(crate) bloom_filter_apply_metrics: Option<BloomFilterIndexApplyMetrics>,
/// Optional metrics for fulltext index applier.
pub(crate) fulltext_index_apply_metrics: Option<FulltextIndexApplyMetrics>,
}
impl ReaderFilterMetrics {
@@ -931,6 +973,23 @@ impl ReaderFilterMetrics {
self.rows_inverted_filtered += other.rows_inverted_filtered;
self.rows_bloom_filtered += other.rows_bloom_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
// Merge optional applier metrics
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
self.inverted_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.bloom_filter_apply_metrics {
self.bloom_filter_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
if let Some(other_metrics) = &other.fulltext_index_apply_metrics {
self.fulltext_index_apply_metrics
.get_or_insert_with(Default::default)
.merge_from(other_metrics);
}
}
/// Reports metrics.
@@ -987,6 +1046,77 @@ impl ReaderFilterMetrics {
}
}
/// Metrics for parquet metadata cache operations.
#[derive(Default, Clone, Copy)]
pub(crate) struct MetadataCacheMetrics {
/// Number of memory cache hits for parquet metadata.
pub(crate) mem_cache_hit: usize,
/// Number of memory cache misses for parquet metadata.
pub(crate) mem_cache_miss: usize,
/// Number of file cache hits for parquet metadata.
pub(crate) file_cache_hit: usize,
/// Number of file cache misses for parquet metadata.
pub(crate) file_cache_miss: usize,
/// Duration to load parquet metadata.
pub(crate) metadata_load_cost: Duration,
}
impl std::fmt::Debug for MetadataCacheMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
if self.mem_cache_hit > 0 {
write!(f, "\"mem_cache_hit\":{}", self.mem_cache_hit)?;
first = false;
}
if self.mem_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"mem_cache_miss\":{}", self.mem_cache_miss)?;
first = false;
}
if self.file_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_hit\":{}", self.file_cache_hit)?;
first = false;
}
if self.file_cache_miss > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"file_cache_miss\":{}", self.file_cache_miss)?;
first = false;
}
if !self.metadata_load_cost.is_zero() {
if !first {
write!(f, ", ")?;
}
write!(
f,
"\"metadata_load_cost\":\"{:?}\"",
self.metadata_load_cost
)?;
}
write!(f, "}}")
}
}
impl MetadataCacheMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &MetadataCacheMetrics) {
self.mem_cache_hit += other.mem_cache_hit;
self.mem_cache_miss += other.mem_cache_miss;
self.file_cache_hit += other.file_cache_hit;
self.file_cache_miss += other.file_cache_miss;
self.metadata_load_cost += other.metadata_load_cost;
}
}
/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub struct ReaderMetrics {
@@ -1002,6 +1132,10 @@ pub struct ReaderMetrics {
pub(crate) num_batches: usize,
/// Number of rows read.
pub(crate) num_rows: usize,
/// Metrics for parquet metadata cache.
pub(crate) metadata_cache_metrics: MetadataCacheMetrics,
/// Optional metrics for page/row group fetch operations.
pub(crate) fetch_metrics: Option<Arc<ParquetFetchMetrics>>,
}
impl ReaderMetrics {
@@ -1013,6 +1147,15 @@ impl ReaderMetrics {
self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
self.metadata_cache_metrics
.merge_from(&other.metadata_cache_metrics);
if let Some(other_fetch) = &other.fetch_metrics {
if let Some(self_fetch) = &self.fetch_metrics {
self_fetch.merge_from(other_fetch);
} else {
self.fetch_metrics = Some(other_fetch.clone());
}
}
}
/// Reports total rows.
@@ -1067,7 +1210,10 @@ impl RowGroupReaderBuilder {
&self,
row_group_idx: usize,
row_selection: Option<RowSelection>,
fetch_metrics: Option<&ParquetFetchMetrics>,
) -> Result<ParquetRecordBatchReader> {
let fetch_start = Instant::now();
let mut row_group = InMemoryRowGroup::create(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
@@ -1079,12 +1225,17 @@ impl RowGroupReaderBuilder {
);
// Fetches data into memory.
row_group
.fetch(&self.projection, row_selection.as_ref())
.fetch(&self.projection, row_selection.as_ref(), fetch_metrics)
.await
.context(ReadParquetSnafu {
path: &self.file_path,
})?;
// Record total fetch elapsed time.
if let Some(metrics) = fetch_metrics {
metrics.add_total_fetch_elapsed(fetch_start.elapsed().as_micros() as u64);
}
// Builds the parquet reader.
// Now the row selection is None.
ParquetRecordBatchReader::try_new_with_row_groups(
@@ -1228,6 +1379,8 @@ pub struct ParquetReader {
selection: RowGroupSelection,
/// Reader of current row group.
reader_state: ReaderState,
/// Metrics for tracking row group fetch operations.
fetch_metrics: ParquetFetchMetrics,
}
#[async_trait]
@@ -1247,7 +1400,11 @@ impl BatchReader for ParquetReader {
let parquet_reader = self
.context
.reader_builder()
.build(row_group_idx, Some(row_selection))
.build(
row_group_idx,
Some(row_selection),
Some(&self.fetch_metrics),
)
.await?;
// Resets the parquet reader.
@@ -1303,11 +1460,12 @@ impl ParquetReader {
context: FileRangeContextRef,
mut selection: RowGroupSelection,
) -> Result<Self> {
let fetch_metrics = ParquetFetchMetrics::default();
// No more items in current row group, reads next row group.
let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
let parquet_reader = context
.reader_builder()
.build(row_group_idx, Some(row_selection))
.build(row_group_idx, Some(row_selection), Some(&fetch_metrics))
.await?;
// Compute skip_fields once for this row group
let skip_fields = context.should_skip_fields(row_group_idx);
@@ -1324,6 +1482,7 @@ impl ParquetReader {
context,
selection,
reader_state,
fetch_metrics,
})
}

View File

@@ -35,6 +35,206 @@ use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::parquet::helper::{MERGE_GAP, fetch_byte_ranges};
/// Metrics for tracking page/row group fetch operations.
/// Uses atomic counters for thread-safe updates.
#[derive(Default)]
pub struct ParquetFetchMetrics {
/// Number of page cache hits.
page_cache_hit: std::sync::atomic::AtomicUsize,
/// Number of write cache hits.
write_cache_hit: std::sync::atomic::AtomicUsize,
/// Number of pages to fetch.
pages_to_fetch: std::sync::atomic::AtomicUsize,
/// Total size in bytes of pages to fetch.
page_size_to_fetch: std::sync::atomic::AtomicU64,
/// Elapsed time in microseconds fetching from write cache.
write_cache_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Elapsed time in microseconds fetching from object store.
store_fetch_elapsed: std::sync::atomic::AtomicU64,
/// Total elapsed time in microseconds for fetching row groups.
total_fetch_elapsed: std::sync::atomic::AtomicU64,
}
impl std::fmt::Debug for ParquetFetchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
let mut first = true;
let page_cache_hit = self.page_cache_hit();
let write_cache_hit = self.write_cache_hit();
let pages_to_fetch = self.pages_to_fetch();
let page_size_to_fetch = self.page_size_to_fetch();
let write_cache_elapsed = self.write_cache_fetch_elapsed();
let store_elapsed = self.store_fetch_elapsed();
let total_elapsed = self.total_fetch_elapsed();
if page_cache_hit > 0 {
write!(f, "\"page_cache_hit\":{}", page_cache_hit)?;
first = false;
}
if write_cache_hit > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"write_cache_hit\":{}", write_cache_hit)?;
first = false;
}
if pages_to_fetch > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"pages_to_fetch\":{}", pages_to_fetch)?;
first = false;
}
if page_size_to_fetch > 0 {
if !first {
write!(f, ", ")?;
}
write!(f, "\"page_size_to_fetch\":{}", page_size_to_fetch)?;
first = false;
}
if write_cache_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(write_cache_elapsed);
write!(f, "\"write_cache_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if store_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(store_elapsed);
write!(f, "\"store_fetch_elapsed\":\"{:?}\"", duration)?;
first = false;
}
if total_elapsed > 0 {
if !first {
write!(f, ", ")?;
}
let duration = std::time::Duration::from_micros(total_elapsed);
write!(f, "\"total_fetch_elapsed\":\"{:?}\"", duration)?;
}
write!(f, "}}")
}
}
impl ParquetFetchMetrics {
/// Increments page cache hit counter.
pub fn inc_page_cache_hit(&self) {
self.page_cache_hit
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
/// Increments write cache hit counter.
pub fn inc_write_cache_hit(&self) {
self.write_cache_hit
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
/// Adds to the pages to fetch counter.
pub fn add_pages_to_fetch(&self, count: usize) {
self.pages_to_fetch
.fetch_add(count, std::sync::atomic::Ordering::Relaxed);
}
/// Adds to the page size to fetch counter.
pub fn add_page_size_to_fetch(&self, size: u64) {
self.page_size_to_fetch
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the page cache hit count.
pub fn page_cache_hit(&self) -> usize {
self.page_cache_hit
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the write cache hit count.
pub fn write_cache_hit(&self) -> usize {
self.write_cache_hit
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the pages to fetch count.
pub fn pages_to_fetch(&self) -> usize {
self.pages_to_fetch
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Returns the page size to fetch in bytes.
pub fn page_size_to_fetch(&self) -> u64 {
self.page_size_to_fetch
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for write cache fetch.
pub fn add_write_cache_fetch_elapsed(&self, elapsed_us: u64) {
self.write_cache_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the elapsed time in microseconds for write cache fetch.
pub fn write_cache_fetch_elapsed(&self) -> u64 {
self.write_cache_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for object store fetch.
pub fn add_store_fetch_elapsed(&self, elapsed_us: u64) {
self.store_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the elapsed time in microseconds for object store fetch.
pub fn store_fetch_elapsed(&self) -> u64 {
self.store_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Adds elapsed time in microseconds for total fetch operation.
pub fn add_total_fetch_elapsed(&self, elapsed_us: u64) {
self.total_fetch_elapsed
.fetch_add(elapsed_us, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the total elapsed time in microseconds for fetch operations.
pub fn total_fetch_elapsed(&self) -> u64 {
self.total_fetch_elapsed
.load(std::sync::atomic::Ordering::Relaxed)
}
/// Merges metrics from another [ParquetFetchMetrics].
pub fn merge_from(&self, other: &ParquetFetchMetrics) {
self.page_cache_hit
.fetch_add(other.page_cache_hit(), std::sync::atomic::Ordering::Relaxed);
self.write_cache_hit.fetch_add(
other.write_cache_hit(),
std::sync::atomic::Ordering::Relaxed,
);
self.pages_to_fetch
.fetch_add(other.pages_to_fetch(), std::sync::atomic::Ordering::Relaxed);
self.page_size_to_fetch.fetch_add(
other.page_size_to_fetch(),
std::sync::atomic::Ordering::Relaxed,
);
self.write_cache_fetch_elapsed.fetch_add(
other.write_cache_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.store_fetch_elapsed.fetch_add(
other.store_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
self.total_fetch_elapsed.fetch_add(
other.total_fetch_elapsed(),
std::sync::atomic::Ordering::Relaxed,
);
}
}
pub(crate) struct RowGroupBase<'a> {
metadata: &'a RowGroupMetaData,
pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
@@ -244,13 +444,14 @@ impl<'a> InMemoryRowGroup<'a> {
&mut self,
projection: &ProjectionMask,
selection: Option<&RowSelection>,
metrics: Option<&ParquetFetchMetrics>,
) -> Result<()> {
if let Some((selection, offset_index)) = selection.zip(self.base.offset_index) {
let (fetch_ranges, page_start_offsets) =
self.base
.calc_sparse_read_ranges(projection, offset_index, selection);
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assign sparse chunk data to base.
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
@@ -268,7 +469,7 @@ impl<'a> InMemoryRowGroup<'a> {
}
// Fetch data with ranges
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
let chunk_data = self.fetch_bytes(&fetch_ranges, metrics).await?;
// Assigns fetched data to base.
self.base.assign_dense_chunk(projection, chunk_data);
@@ -279,31 +480,62 @@ impl<'a> InMemoryRowGroup<'a> {
/// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
async fn fetch_bytes(
&self,
ranges: &[Range<u64>],
metrics: Option<&ParquetFetchMetrics>,
) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
// Track pages to fetch regardless of cache hit/miss.
if let Some(metrics) = metrics {
metrics.add_pages_to_fetch(ranges.len());
}
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = metrics {
metrics.inc_page_cache_hit();
}
return Ok(pages.compressed.clone());
}
// Calculate total range size for metrics.
let total_range_size = compute_total_range_size(ranges);
if let Some(metrics) = metrics {
metrics.add_page_size_to_fetch(total_range_size);
}
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => data,
let start = std::time::Instant::now();
let write_cache_result = self.fetch_ranges_from_write_cache(key, ranges).await;
let pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = metrics {
metrics.add_write_cache_fetch_elapsed(start.elapsed().as_micros() as u64);
metrics.inc_write_cache_hit();
}
data
}
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
.with_label_values(&["cache_miss_read"])
.start_timer();
fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
let start = std::time::Instant::now();
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?
.map_err(|e| ParquetError::External(Box::new(e)))?;
if let Some(metrics) = metrics {
metrics.add_store_fetch_elapsed(start.elapsed().as_micros() as u64);
}
data
}
};
// Put pages back to the cache.
let total_range_size = compute_total_range_size(ranges);
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));

View File

@@ -32,6 +32,15 @@ use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::Result;
use crate::file_metadata::FileMetadata;
/// Metrics returned by `PuffinReader::dir` operations.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct DirMetrics {
/// Whether this was a cache hit (true) or cache miss (false).
pub cache_hit: bool,
/// Size of the directory in bytes.
pub dir_size: u64,
}
/// The `PuffinManager` trait provides a unified interface for creating `PuffinReader` and `PuffinWriter`.
#[async_trait]
pub trait PuffinManager {
@@ -106,9 +115,10 @@ pub trait PuffinReader {
/// Reads a directory from the Puffin file.
///
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata.
/// The returned tuple contains `GuardWithMetadata` and `DirMetrics`.
/// The `GuardWithMetadata` is used to access the directory data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the directory data.
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>;
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)>;
}
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.

View File

@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
use crate::puffin_manager::{BlobGuard, DirMetrics, GuardWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F>
@@ -130,10 +130,10 @@ where
Ok(GuardWithMetadata::new(blob, blob_metadata))
}
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
async fn dir(&self, key: &str) -> Result<(GuardWithMetadata<Self::Dir>, DirMetrics)> {
let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let dir = self
let (dir, metrics) = self
.stager
.get_dir(
&self.handle,
@@ -153,7 +153,7 @@ where
)
.await?;
Ok(GuardWithMetadata::new(dir, blob_metadata))
Ok((GuardWithMetadata::new(dir, blob_metadata), metrics))
}
}

View File

@@ -23,7 +23,7 @@ use futures::AsyncWrite;
use futures::future::BoxFuture;
use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
@@ -72,14 +72,15 @@ pub trait Stager: Send + Sync {
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The returned tuple contains the `DirGuard` and `DirMetrics`.
/// The `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
) -> Result<(Self::Dir, DirMetrics)>;
/// Stores a directory in the staging area.
async fn put_dir(

View File

@@ -41,7 +41,7 @@ use crate::error::{
use crate::puffin_manager::stager::{
BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
};
use crate::puffin_manager::{BlobGuard, DirGuard};
use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
@@ -203,7 +203,7 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
handle: &Self::FileHandle,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
) -> Result<(Self::Dir, DirMetrics)> {
let handle_str = handle.to_string();
let cache_key = Self::encode_cache_key(&handle_str, key);
@@ -242,15 +242,22 @@ impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
.await
.context(CacheGetSnafu)?;
let dir_size = v.size();
if let Some(notifier) = self.notifier.as_ref() {
if miss {
notifier.on_cache_miss(v.size());
notifier.on_cache_miss(dir_size);
} else {
notifier.on_cache_hit(v.size());
notifier.on_cache_hit(dir_size);
}
}
let metrics = DirMetrics {
cache_hit: !miss,
dir_size,
};
match v {
CacheValue::Dir(guard) => Ok(guard),
CacheValue::Dir(guard) => Ok((guard, metrics)),
_ => unreachable!(),
}
}
@@ -882,7 +889,7 @@ mod tests {
let puffin_file_name = "test_get_dir".to_string();
let key = "key";
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
key,
@@ -901,6 +908,9 @@ mod tests {
.await
.unwrap();
assert!(!metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -974,7 +984,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1016,7 +1026,7 @@ mod tests {
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"hello world");
let dir_path = stager
let (dir_path, metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1024,6 +1034,9 @@ mod tests {
)
.await
.unwrap();
assert!(metrics.cache_hit);
assert!(metrics.dir_size > 0);
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -1151,7 +1164,7 @@ mod tests {
];
// First time to get the directory
let guard_0 = stager
let (guard_0, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1198,7 +1211,7 @@ mod tests {
);
// Second time to get the directory
let guard_1 = stager
let (guard_1, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1237,7 +1250,7 @@ mod tests {
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
let guard_2 = stager
let (guard_2, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,
@@ -1390,7 +1403,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = stager
let (guard, _metrics) = stager
.get_dir(
&puffin_file_name,
dir_key,

View File

@@ -356,7 +356,7 @@ async fn check_dir(
stager: &BoundedStager<String>,
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();
let (res_dir, _metrics) = puffin_reader.dir(key).await.unwrap();
let metadata = res_dir.metadata();
assert_eq!(
metadata.properties,